Skip to main content

robin_sparkless/
functions.rs

1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5// -----------------------------------------------------------------------------
6// -----------------------------------------------------------------------------
7
8/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
9#[derive(Debug, Clone)]
10pub struct SortOrder {
11    pub(crate) expr: Expr,
12    pub(crate) descending: bool,
13    pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17    pub fn expr(&self) -> &Expr {
18        &self.expr
19    }
20}
21
22/// Ascending sort, nulls first (Spark default for ASC).
23pub fn asc(column: &Column) -> SortOrder {
24    SortOrder {
25        expr: column.expr().clone(),
26        descending: false,
27        nulls_last: false,
28    }
29}
30
31/// Ascending sort, nulls first.
32pub fn asc_nulls_first(column: &Column) -> SortOrder {
33    SortOrder {
34        expr: column.expr().clone(),
35        descending: false,
36        nulls_last: false,
37    }
38}
39
40/// Ascending sort, nulls last.
41pub fn asc_nulls_last(column: &Column) -> SortOrder {
42    SortOrder {
43        expr: column.expr().clone(),
44        descending: false,
45        nulls_last: true,
46    }
47}
48
49/// Descending sort, nulls last (Spark default for DESC).
50pub fn desc(column: &Column) -> SortOrder {
51    SortOrder {
52        expr: column.expr().clone(),
53        descending: true,
54        nulls_last: true,
55    }
56}
57
58/// Descending sort, nulls first.
59pub fn desc_nulls_first(column: &Column) -> SortOrder {
60    SortOrder {
61        expr: column.expr().clone(),
62        descending: true,
63        nulls_last: false,
64    }
65}
66
67/// Descending sort, nulls last.
68pub fn desc_nulls_last(column: &Column) -> SortOrder {
69    SortOrder {
70        expr: column.expr().clone(),
71        descending: true,
72        nulls_last: true,
73    }
74}
75
76// -----------------------------------------------------------------------------
77
78/// Parse PySpark-like type name to Polars DataType.
79/// Decimal(precision, scale) is mapped to Float64 for schema parity (Polars dtype-decimal not enabled).
80pub fn parse_type_name(name: &str) -> Result<DataType, String> {
81    let s = name.trim().to_lowercase();
82    if s.starts_with("decimal(") && s.contains(')') {
83        return Ok(DataType::Float64);
84    }
85    Ok(match s.as_str() {
86        "int" | "integer" => DataType::Int32,
87        "long" | "bigint" => DataType::Int64,
88        "float" => DataType::Float32,
89        "double" => DataType::Float64,
90        "string" | "str" => DataType::String,
91        "boolean" | "bool" => DataType::Boolean,
92        "date" => DataType::Date,
93        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
94        _ => return Err(format!("unknown type name: {name}")),
95    })
96}
97
98/// Get a column by name
99pub fn col(name: &str) -> Column {
100    Column::new(name.to_string())
101}
102
103/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
104pub fn grouping(column: &Column) -> Column {
105    let _ = column;
106    Column::from_expr(lit(0i32), Some("grouping".to_string()))
107}
108
109/// Grouping set id (PySpark grouping_id). Stub: returns 0.
110pub fn grouping_id(_columns: &[Column]) -> Column {
111    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
112}
113
114/// Create a literal column from a value
115pub fn lit_i32(value: i32) -> Column {
116    let expr: Expr = lit(value);
117    Column::from_expr(expr, None)
118}
119
120pub fn lit_i64(value: i64) -> Column {
121    let expr: Expr = lit(value);
122    Column::from_expr(expr, None)
123}
124
125pub fn lit_f64(value: f64) -> Column {
126    let expr: Expr = lit(value);
127    Column::from_expr(expr, None)
128}
129
130pub fn lit_bool(value: bool) -> Column {
131    let expr: Expr = lit(value);
132    Column::from_expr(expr, None)
133}
134
135pub fn lit_str(value: &str) -> Column {
136    let expr: Expr = lit(value);
137    Column::from_expr(expr, None)
138}
139
140/// Count aggregation
141pub fn count(col: &Column) -> Column {
142    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
143}
144
145/// Sum aggregation
146pub fn sum(col: &Column) -> Column {
147    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
148}
149
150/// Average aggregation
151pub fn avg(col: &Column) -> Column {
152    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
153}
154
155/// Alias for avg (PySpark mean).
156pub fn mean(col: &Column) -> Column {
157    avg(col)
158}
159
160/// Maximum aggregation
161pub fn max(col: &Column) -> Column {
162    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
163}
164
165/// Minimum aggregation
166pub fn min(col: &Column) -> Column {
167    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
168}
169
170/// First value in group (PySpark first). Use in groupBy.agg(). ignorenulls: when true, first non-null; Polars 0.45 uses .first() only (ignorenulls reserved for API compatibility).
171pub fn first(col: &Column, ignorenulls: bool) -> Column {
172    let _ = ignorenulls;
173    Column::from_expr(col.expr().clone().first(), None)
174}
175
176/// Any value from the group (PySpark any_value). Use in groupBy.agg(). ignorenulls reserved for API compatibility.
177pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
178    let _ = ignorenulls;
179    Column::from_expr(col.expr().clone().first(), None)
180}
181
182/// Count rows where condition is true (PySpark count_if). Use in groupBy.agg(); column should be boolean (true=1, false=0).
183pub fn count_if(col: &Column) -> Column {
184    use polars::prelude::DataType;
185    Column::from_expr(
186        col.expr().clone().cast(DataType::Int64).sum(),
187        Some("count_if".to_string()),
188    )
189}
190
191/// Sum aggregation; null on overflow (PySpark try_sum). Use in groupBy.agg(). Polars sum does not overflow; reserved for API.
192pub fn try_sum(col: &Column) -> Column {
193    Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
194}
195
196/// Average aggregation; null on invalid (PySpark try_avg). Use in groupBy.agg(). Maps to mean; reserved for API.
197pub fn try_avg(col: &Column) -> Column {
198    Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
199}
200
201/// Value of value_col in the row where ord_col is maximum (PySpark max_by). Use in groupBy.agg().
202pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
203    use polars::prelude::{as_struct, SortOptions};
204    let st = as_struct(vec![
205        ord_col.expr().clone().alias("_ord"),
206        value_col.expr().clone().alias("_val"),
207    ]);
208    let e = st
209        .sort(SortOptions::default().with_order_descending(true))
210        .first()
211        .struct_()
212        .field_by_name("_val");
213    Column::from_expr(e, None)
214}
215
216/// Value of value_col in the row where ord_col is minimum (PySpark min_by). Use in groupBy.agg().
217pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
218    use polars::prelude::{as_struct, SortOptions};
219    let st = as_struct(vec![
220        ord_col.expr().clone().alias("_ord"),
221        value_col.expr().clone().alias("_val"),
222    ]);
223    let e = st
224        .sort(SortOptions::default())
225        .first()
226        .struct_()
227        .field_by_name("_val");
228    Column::from_expr(e, None)
229}
230
231/// Collect column values into list per group (PySpark collect_list). Use in groupBy.agg().
232pub fn collect_list(col: &Column) -> Column {
233    Column::from_expr(
234        col.expr().clone().implode(),
235        Some("collect_list".to_string()),
236    )
237}
238
239/// Collect distinct column values into list per group (PySpark collect_set). Use in groupBy.agg().
240pub fn collect_set(col: &Column) -> Column {
241    Column::from_expr(
242        col.expr().clone().unique().implode(),
243        Some("collect_set".to_string()),
244    )
245}
246
247/// Boolean AND across group (PySpark bool_and). Use in groupBy.agg(); column should be boolean.
248pub fn bool_and(col: &Column) -> Column {
249    Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
250}
251
252/// Alias for bool_and (PySpark every). Use in groupBy.agg().
253pub fn every(col: &Column) -> Column {
254    Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
255}
256
257/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
258pub fn stddev(col: &Column) -> Column {
259    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
260}
261
262/// Variance (sample) aggregation (PySpark variance / var_samp)
263pub fn variance(col: &Column) -> Column {
264    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
265}
266
267/// Population standard deviation (ddof=0). PySpark stddev_pop.
268pub fn stddev_pop(col: &Column) -> Column {
269    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
270}
271
272/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
273pub fn stddev_samp(col: &Column) -> Column {
274    stddev(col)
275}
276
277/// Alias for stddev (PySpark std).
278pub fn std(col: &Column) -> Column {
279    stddev(col)
280}
281
282/// Population variance (ddof=0). PySpark var_pop.
283pub fn var_pop(col: &Column) -> Column {
284    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
285}
286
287/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
288pub fn var_samp(col: &Column) -> Column {
289    variance(col)
290}
291
292/// Median aggregation. PySpark median.
293pub fn median(col: &Column) -> Column {
294    use polars::prelude::QuantileMethod;
295    Column::from_expr(
296        col.expr()
297            .clone()
298            .quantile(lit(0.5), QuantileMethod::Linear),
299        Some("median".to_string()),
300    )
301}
302
303/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0. accuracy reserved for API compatibility.
304pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
305    use polars::prelude::QuantileMethod;
306    Column::from_expr(
307        col.expr()
308            .clone()
309            .quantile(lit(percentage), QuantileMethod::Linear),
310        Some(format!("approx_percentile({percentage})")),
311    )
312}
313
314/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
315pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
316    approx_percentile(col, percentage, accuracy)
317}
318
319/// Mode aggregation - most frequent value. PySpark mode.
320pub fn mode(col: &Column) -> Column {
321    col.clone().mode()
322}
323
324/// Count distinct aggregation (PySpark countDistinct)
325pub fn count_distinct(col: &Column) -> Column {
326    use polars::prelude::DataType;
327    Column::from_expr(
328        col.expr().clone().n_unique().cast(DataType::Int64),
329        Some("count_distinct".to_string()),
330    )
331}
332
333/// Approximate count distinct (PySpark approx_count_distinct). Use in groupBy.agg(). rsd reserved for API compatibility; Polars uses exact n_unique.
334pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
335    use polars::prelude::DataType;
336    Column::from_expr(
337        col.expr().clone().n_unique().cast(DataType::Int64),
338        Some("approx_count_distinct".to_string()),
339    )
340}
341
342/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
343pub fn kurtosis(col: &Column) -> Column {
344    Column::from_expr(
345        col.expr()
346            .clone()
347            .cast(DataType::Float64)
348            .kurtosis(true, true),
349        Some("kurtosis".to_string()),
350    )
351}
352
353/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
354pub fn skewness(col: &Column) -> Column {
355    Column::from_expr(
356        col.expr().clone().cast(DataType::Float64).skew(true),
357        Some("skewness".to_string()),
358    )
359}
360
361/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
362pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
363    use polars::prelude::{col as pl_col, len};
364    let c1 = pl_col(col1).cast(DataType::Float64);
365    let c2 = pl_col(col2).cast(DataType::Float64);
366    let n = len().cast(DataType::Float64);
367    let sum_ab = (c1.clone() * c2.clone()).sum();
368    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
369    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
370    (sum_ab - sum_a * sum_b / n.clone()) / n
371}
372
373/// Population covariance aggregation (PySpark covar_pop). Module-level; use in groupBy.agg() with two columns.
374pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
375    use polars::prelude::len;
376    let c1 = col1.expr().clone().cast(DataType::Float64);
377    let c2 = col2.expr().clone().cast(DataType::Float64);
378    let n = len().cast(DataType::Float64);
379    let sum_ab = (c1.clone() * c2.clone()).sum();
380    let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
381    let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
382    let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
383    Column::from_expr(e, Some("covar_pop".to_string()))
384}
385
386/// Pearson correlation aggregation (PySpark corr). Module-level; use in groupBy.agg() with two columns.
387pub fn corr(col1: &Column, col2: &Column) -> Column {
388    use polars::prelude::{len, lit, when};
389    let c1 = col1.expr().clone().cast(DataType::Float64);
390    let c2 = col2.expr().clone().cast(DataType::Float64);
391    let n = len().cast(DataType::Float64);
392    let n1 = (len() - lit(1)).cast(DataType::Float64);
393    let sum_ab = (c1.clone() * c2.clone()).sum();
394    let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
395    let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
396    let sum_a2 = (c1.clone() * c1).sum();
397    let sum_b2 = (c2.clone() * c2).sum();
398    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
399    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
400    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
401    let std_a = var_a.sqrt();
402    let std_b = var_b.sqrt();
403    let e = when(len().gt(lit(1)))
404        .then(cov_samp / (std_a * std_b))
405        .otherwise(lit(f64::NAN));
406    Column::from_expr(e, Some("corr".to_string()))
407}
408
409/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
410pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
411    use polars::prelude::{col as pl_col, len, lit, when};
412    let c1 = pl_col(col1).cast(DataType::Float64);
413    let c2 = pl_col(col2).cast(DataType::Float64);
414    let n = len().cast(DataType::Float64);
415    let sum_ab = (c1.clone() * c2.clone()).sum();
416    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
417    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
418    when(len().gt(lit(1)))
419        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
420        .otherwise(lit(f64::NAN))
421}
422
423/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
424pub fn corr_expr(col1: &str, col2: &str) -> Expr {
425    use polars::prelude::{col as pl_col, len, lit, when};
426    let c1 = pl_col(col1).cast(DataType::Float64);
427    let c2 = pl_col(col2).cast(DataType::Float64);
428    let n = len().cast(DataType::Float64);
429    let n1 = (len() - lit(1)).cast(DataType::Float64);
430    let sum_ab = (c1.clone() * c2.clone()).sum();
431    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
432    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
433    let sum_a2 = (c1.clone() * c1).sum();
434    let sum_b2 = (c2.clone() * c2).sum();
435    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
436    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
437    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
438    let std_a = var_a.sqrt();
439    let std_b = var_b.sqrt();
440    when(len().gt(lit(1)))
441        .then(cov_samp / (std_a * std_b))
442        .otherwise(lit(f64::NAN))
443}
444
445// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
446
447fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
448    use polars::prelude::col as pl_col;
449    let y = pl_col(y_col).cast(DataType::Float64);
450    let x = pl_col(x_col).cast(DataType::Float64);
451    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
452    let n = y
453        .clone()
454        .filter(cond.clone())
455        .count()
456        .cast(DataType::Float64);
457    let sum_x = x.clone().filter(cond.clone()).sum();
458    let sum_y = y.clone().filter(cond.clone()).sum();
459    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
460    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
461    let sum_xy = (x * y).filter(cond).sum();
462    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
463}
464
465/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
466pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
467    let (n, ..) = regr_cond_and_sums(y_col, x_col);
468    n
469}
470
471/// Regression: average of x (PySpark regr_avgx).
472pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
473    use polars::prelude::{lit, when};
474    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
475    when(n.clone().gt(lit(0.0)))
476        .then(sum_x / n)
477        .otherwise(lit(f64::NAN))
478}
479
480/// Regression: average of y (PySpark regr_avgy).
481pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
482    use polars::prelude::{lit, when};
483    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
484    when(n.clone().gt(lit(0.0)))
485        .then(sum_y / n)
486        .otherwise(lit(f64::NAN))
487}
488
489/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
490pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
491    use polars::prelude::{lit, when};
492    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
493    when(n.clone().gt(lit(0.0)))
494        .then(sum_xx - sum_x.clone() * sum_x / n)
495        .otherwise(lit(f64::NAN))
496}
497
498/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
499pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
500    use polars::prelude::{lit, when};
501    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
502    when(n.clone().gt(lit(0.0)))
503        .then(sum_yy - sum_y.clone() * sum_y / n)
504        .otherwise(lit(f64::NAN))
505}
506
507/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
508pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
509    use polars::prelude::{lit, when};
510    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
511    when(n.clone().gt(lit(0.0)))
512        .then(sum_xy - sum_x * sum_y / n)
513        .otherwise(lit(f64::NAN))
514}
515
516/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
517pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
518    use polars::prelude::{lit, when};
519    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
520    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
521    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
522    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
523        .then(regr_sxy / regr_sxx)
524        .otherwise(lit(f64::NAN))
525}
526
527/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
528pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
529    use polars::prelude::{lit, when};
530    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
531    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
532    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
533    let slope = regr_sxy.clone() / regr_sxx.clone();
534    let avg_y = sum_y / n.clone();
535    let avg_x = sum_x / n.clone();
536    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
537        .then(avg_y - slope * avg_x)
538        .otherwise(lit(f64::NAN))
539}
540
541/// Regression R-squared (PySpark regr_r2).
542pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
543    use polars::prelude::{lit, when};
544    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
545    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
546    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
547    let regr_sxy = sum_xy - sum_x * sum_y / n;
548    when(
549        regr_sxx
550            .clone()
551            .gt(lit(0.0))
552            .and(regr_syy.clone().gt(lit(0.0))),
553    )
554    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
555    .otherwise(lit(f64::NAN))
556}
557
558/// PySpark-style conditional expression builder.
559///
560/// # Example
561/// ```
562/// use robin_sparkless::{col, lit_i64, lit_str, when};
563///
564/// // when(condition).then(value).otherwise(fallback)
565/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
566///     .then(&lit_str("adult"))
567///     .otherwise(&lit_str("minor"));
568/// ```
569pub fn when(condition: &Column) -> WhenBuilder {
570    WhenBuilder::new(condition.expr().clone())
571}
572
573/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
574pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
575    use polars::prelude::*;
576    let null_expr = Expr::Literal(LiteralValue::Null);
577    let expr = polars::prelude::when(condition.expr().clone())
578        .then(value.expr().clone())
579        .otherwise(null_expr);
580    crate::column::Column::from_expr(expr, None)
581}
582
583/// Builder for when-then-otherwise expressions
584pub struct WhenBuilder {
585    condition: Expr,
586}
587
588impl WhenBuilder {
589    fn new(condition: Expr) -> Self {
590        WhenBuilder { condition }
591    }
592
593    /// Specify the value when condition is true
594    pub fn then(self, value: &Column) -> ThenBuilder {
595        use polars::prelude::*;
596        let when_then = when(self.condition).then(value.expr().clone());
597        ThenBuilder::new(when_then)
598    }
599
600    /// Specify the value when condition is false
601    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
602    /// For this implementation, we require .then() to be called explicitly.
603    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
604    pub fn otherwise(self, _value: &Column) -> Column {
605        // This should not be called directly - when().otherwise() without .then() is not supported
606        // Users should use when(cond).then(val1).otherwise(val2)
607        panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
608    }
609}
610
611/// Builder for chaining when-then clauses before finalizing with otherwise
612pub struct ThenBuilder {
613    state: WhenThenState,
614}
615
616enum WhenThenState {
617    Single(Box<polars::prelude::Then>),
618    Chained(Box<polars::prelude::ChainedThen>),
619}
620
621/// Builder for an additional when-then clause (returned by ThenBuilder::when).
622pub struct ChainedWhenBuilder {
623    inner: polars::prelude::ChainedWhen,
624}
625
626impl ThenBuilder {
627    fn new(when_then: polars::prelude::Then) -> Self {
628        ThenBuilder {
629            state: WhenThenState::Single(Box::new(when_then)),
630        }
631    }
632
633    fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
634        ThenBuilder {
635            state: WhenThenState::Chained(Box::new(chained)),
636        }
637    }
638
639    /// Chain an additional when-then clause (PySpark: when(a).then(x).when(b).then(y).otherwise(z)).
640    pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
641        let chained_when = match self.state {
642            WhenThenState::Single(t) => t.when(condition.expr().clone()),
643            WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
644        };
645        ChainedWhenBuilder {
646            inner: chained_when,
647        }
648    }
649
650    /// Finalize the expression with the fallback value
651    pub fn otherwise(self, value: &Column) -> Column {
652        let expr = match self.state {
653            WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
654            WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
655        };
656        crate::column::Column::from_expr(expr, None)
657    }
658}
659
660impl ChainedWhenBuilder {
661    /// Set the value for the current when clause.
662    pub fn then(self, value: &Column) -> ThenBuilder {
663        ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
664    }
665}
666
667/// Convert string column to uppercase (PySpark upper)
668pub fn upper(column: &Column) -> Column {
669    column.clone().upper()
670}
671
672/// Convert string column to lowercase (PySpark lower)
673pub fn lower(column: &Column) -> Column {
674    column.clone().lower()
675}
676
677/// Substring with 1-based start (PySpark substring semantics)
678pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
679    column.clone().substr(start, length)
680}
681
682/// String length in characters (PySpark length)
683pub fn length(column: &Column) -> Column {
684    column.clone().length()
685}
686
687/// Trim leading and trailing whitespace (PySpark trim)
688pub fn trim(column: &Column) -> Column {
689    column.clone().trim()
690}
691
692/// Trim leading whitespace (PySpark ltrim)
693pub fn ltrim(column: &Column) -> Column {
694    column.clone().ltrim()
695}
696
697/// Trim trailing whitespace (PySpark rtrim)
698pub fn rtrim(column: &Column) -> Column {
699    column.clone().rtrim()
700}
701
702/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
703pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
704    column.clone().btrim(trim_str)
705}
706
707/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
708pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
709    column.clone().locate(substr, pos)
710}
711
712/// Base conversion (PySpark conv). num from from_base to to_base.
713pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
714    column.clone().conv(from_base, to_base)
715}
716
717/// Convert to hex string (PySpark hex).
718pub fn hex(column: &Column) -> Column {
719    column.clone().hex()
720}
721
722/// Convert hex string to binary/string (PySpark unhex).
723pub fn unhex(column: &Column) -> Column {
724    column.clone().unhex()
725}
726
727/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
728pub fn encode(column: &Column, charset: &str) -> Column {
729    column.clone().encode(charset)
730}
731
732/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
733pub fn decode(column: &Column, charset: &str) -> Column {
734    column.clone().decode(charset)
735}
736
737/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
738pub fn to_binary(column: &Column, fmt: &str) -> Column {
739    column.clone().to_binary(fmt)
740}
741
742/// Try convert to binary; null on failure (PySpark try_to_binary).
743pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
744    column.clone().try_to_binary(fmt)
745}
746
747/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
748pub fn aes_encrypt(column: &Column, key: &str) -> Column {
749    column.clone().aes_encrypt(key)
750}
751
752/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
753pub fn aes_decrypt(column: &Column, key: &str) -> Column {
754    column.clone().aes_decrypt(key)
755}
756
757/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
758pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
759    column.clone().try_aes_decrypt(key)
760}
761
762/// Convert integer to binary string (PySpark bin).
763pub fn bin(column: &Column) -> Column {
764    column.clone().bin()
765}
766
767/// Get bit at 0-based position (PySpark getbit).
768pub fn getbit(column: &Column, pos: i64) -> Column {
769    column.clone().getbit(pos)
770}
771
772/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
773pub fn bit_and(left: &Column, right: &Column) -> Column {
774    left.clone().bit_and(right)
775}
776
777/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
778pub fn bit_or(left: &Column, right: &Column) -> Column {
779    left.clone().bit_or(right)
780}
781
782/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
783pub fn bit_xor(left: &Column, right: &Column) -> Column {
784    left.clone().bit_xor(right)
785}
786
787/// Count of set bits in the integer representation (PySpark bit_count).
788pub fn bit_count(column: &Column) -> Column {
789    column.clone().bit_count()
790}
791
792/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
793pub fn bitwise_not(column: &Column) -> Column {
794    column.clone().bitwise_not()
795}
796
797// --- Bitmap (PySpark 3.5+) ---
798
799/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
800pub fn bitmap_bit_position(column: &Column) -> Column {
801    use polars::prelude::DataType;
802    let expr = column.expr().clone().cast(DataType::Int32);
803    Column::from_expr(expr, None)
804}
805
806/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
807pub fn bitmap_bucket_number(column: &Column) -> Column {
808    use polars::prelude::DataType;
809    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
810    Column::from_expr(expr, None)
811}
812
813/// Count set bits in a bitmap binary column (PySpark bitmap_count).
814pub fn bitmap_count(column: &Column) -> Column {
815    use polars::prelude::{DataType, GetOutput};
816    let expr = column.expr().clone().map(
817        crate::udfs::apply_bitmap_count,
818        GetOutput::from_type(DataType::Int64),
819    );
820    Column::from_expr(expr, None)
821}
822
823/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
824/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
825pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
826    use polars::prelude::{DataType, GetOutput};
827    column.expr().clone().implode().map(
828        crate::udfs::apply_bitmap_construct_agg,
829        GetOutput::from_type(DataType::Binary),
830    )
831}
832
833/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
834pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
835    use polars::prelude::{DataType, GetOutput};
836    column.expr().clone().implode().map(
837        crate::udfs::apply_bitmap_or_agg,
838        GetOutput::from_type(DataType::Binary),
839    )
840}
841
842/// Alias for getbit (PySpark bit_get).
843pub fn bit_get(column: &Column, pos: i64) -> Column {
844    getbit(column, pos)
845}
846
847/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
848/// When err_msg is Some, it is used in the error message when assertion fails.
849pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
850    column.clone().assert_true(err_msg)
851}
852
853/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
854pub fn raise_error(message: &str) -> Column {
855    let msg = message.to_string();
856    let expr = lit(0i64).map(
857        move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
858            Err(PolarsError::ComputeError(msg.clone().into()))
859        },
860        GetOutput::from_type(DataType::Int64),
861    );
862    Column::from_expr(expr, Some("raise_error".to_string()))
863}
864
865/// Broadcast hint - no-op that returns the same DataFrame (PySpark broadcast).
866pub fn broadcast(df: &DataFrame) -> DataFrame {
867    df.clone()
868}
869
870/// Stub partition id - always 0 (PySpark spark_partition_id).
871pub fn spark_partition_id() -> Column {
872    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
873}
874
875/// Stub input file name - empty string (PySpark input_file_name).
876pub fn input_file_name() -> Column {
877    Column::from_expr(lit(""), Some("input_file_name".to_string()))
878}
879
880/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
881/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
882pub fn monotonically_increasing_id() -> Column {
883    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
884}
885
886/// Current catalog name stub (PySpark current_catalog).
887pub fn current_catalog() -> Column {
888    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
889}
890
891/// Current database/schema name stub (PySpark current_database).
892pub fn current_database() -> Column {
893    Column::from_expr(lit("default"), Some("current_database".to_string()))
894}
895
896/// Current schema name stub (PySpark current_schema).
897pub fn current_schema() -> Column {
898    Column::from_expr(lit("default"), Some("current_schema".to_string()))
899}
900
901/// Current user stub (PySpark current_user).
902pub fn current_user() -> Column {
903    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
904}
905
906/// User stub (PySpark user).
907pub fn user() -> Column {
908    Column::from_expr(lit("unknown"), Some("user".to_string()))
909}
910
911/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
912/// When added via with_column, generates one distinct value per row (PySpark-like).
913pub fn rand(seed: Option<u64>) -> Column {
914    Column::from_rand(seed)
915}
916
917/// Random standard normal per row, with optional seed (PySpark randn).
918/// When added via with_column, generates one distinct value per row (PySpark-like).
919pub fn randn(seed: Option<u64>) -> Column {
920    Column::from_randn(seed)
921}
922
923/// Call a registered UDF by name. PySpark: F.call_udf(udfName, *cols).
924/// Requires a session (set by get_or_create). Raises if UDF not found.
925pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
926    use polars::prelude::Column as PlColumn;
927
928    let session = crate::session::get_thread_udf_session().ok_or_else(|| {
929        PolarsError::InvalidOperation(
930            "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
931        )
932    })?;
933    let case_sensitive = session.is_case_sensitive();
934
935    // Rust UDF: build lazy Expr
936    let udf = session
937        .udf_registry
938        .get_rust_udf(name, case_sensitive)
939        .ok_or_else(|| {
940            PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
941        })?;
942
943    let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
944    let output_type = DataType::String; // PySpark default
945
946    let expr = if exprs.len() == 1 {
947        let udf = udf.clone();
948        exprs.into_iter().next().unwrap().map(
949            move |c| {
950                let s = c.take_materialized_series();
951                udf.apply(&[s])
952                    .map(|out| Some(PlColumn::new("_".into(), out)))
953            },
954            GetOutput::from_type(output_type),
955        )
956    } else {
957        let udf = udf.clone();
958        let first = exprs[0].clone();
959        let rest: Vec<Expr> = exprs[1..].to_vec();
960        first.map_many(
961            move |columns| {
962                let series: Vec<Series> = columns
963                    .iter_mut()
964                    .map(|c| std::mem::take(c).take_materialized_series())
965                    .collect();
966                udf.apply(&series)
967                    .map(|out| Some(PlColumn::new("_".into(), out)))
968            },
969            &rest,
970            GetOutput::from_type(output_type),
971        )
972    };
973
974    Ok(Column::from_expr(expr, Some(format!("{name}()"))))
975}
976
977/// True if two arrays have any element in common (PySpark arrays_overlap).
978pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
979    left.clone().arrays_overlap(right)
980}
981
982/// Zip arrays into array of structs (PySpark arrays_zip).
983pub fn arrays_zip(left: &Column, right: &Column) -> Column {
984    left.clone().arrays_zip(right)
985}
986
987/// Explode; null/empty yields one row with null (PySpark explode_outer).
988pub fn explode_outer(column: &Column) -> Column {
989    column.clone().explode_outer()
990}
991
992/// Posexplode with null preservation (PySpark posexplode_outer).
993pub fn posexplode_outer(column: &Column) -> (Column, Column) {
994    column.clone().posexplode_outer()
995}
996
997/// Collect to array (PySpark array_agg).
998pub fn array_agg(column: &Column) -> Column {
999    column.clone().array_agg()
1000}
1001
1002/// Transform map keys by expr (PySpark transform_keys).
1003pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
1004    column.clone().transform_keys(key_expr)
1005}
1006
1007/// Transform map values by expr (PySpark transform_values).
1008pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
1009    column.clone().transform_values(value_expr)
1010}
1011
1012/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
1013pub fn str_to_map(
1014    column: &Column,
1015    pair_delim: Option<&str>,
1016    key_value_delim: Option<&str>,
1017) -> Column {
1018    let pd = pair_delim.unwrap_or(",");
1019    let kvd = key_value_delim.unwrap_or(":");
1020    column.clone().str_to_map(pd, kvd)
1021}
1022
1023/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
1024pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
1025    column.clone().regexp_extract(pattern, group_index)
1026}
1027
1028/// Replace first match of regex (PySpark regexp_replace)
1029pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
1030    column.clone().regexp_replace(pattern, replacement)
1031}
1032
1033/// Split string by delimiter (PySpark split). Optional limit: at most that many parts (remainder in last).
1034pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
1035    column.clone().split(delimiter, limit)
1036}
1037
1038/// Title case (PySpark initcap)
1039pub fn initcap(column: &Column) -> Column {
1040    column.clone().initcap()
1041}
1042
1043/// Extract all matches of regex (PySpark regexp_extract_all).
1044pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
1045    column.clone().regexp_extract_all(pattern)
1046}
1047
1048/// Check if string matches regex (PySpark regexp_like / rlike).
1049pub fn regexp_like(column: &Column, pattern: &str) -> Column {
1050    column.clone().regexp_like(pattern)
1051}
1052
1053/// Count of non-overlapping regex matches (PySpark regexp_count).
1054pub fn regexp_count(column: &Column, pattern: &str) -> Column {
1055    column.clone().regexp_count(pattern)
1056}
1057
1058/// First substring matching regex (PySpark regexp_substr). Null if no match.
1059pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
1060    column.clone().regexp_substr(pattern)
1061}
1062
1063/// Split by delimiter and return 1-based part (PySpark split_part).
1064pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
1065    column.clone().split_part(delimiter, part_num)
1066}
1067
1068/// 1-based position of first regex match (PySpark regexp_instr).
1069pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
1070    column.clone().regexp_instr(pattern, group_idx)
1071}
1072
1073/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
1074pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
1075    str_column.clone().find_in_set(set_column)
1076}
1077
1078/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
1079pub fn format_string(format: &str, columns: &[&Column]) -> Column {
1080    use polars::prelude::*;
1081    if columns.is_empty() {
1082        panic!("format_string needs at least one column");
1083    }
1084    let format_owned = format.to_string();
1085    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
1086    let expr = columns[0].expr().clone().map_many(
1087        move |cols| crate::udfs::apply_format_string(cols, &format_owned),
1088        &args,
1089        GetOutput::from_type(DataType::String),
1090    );
1091    crate::column::Column::from_expr(expr, None)
1092}
1093
1094/// Alias for format_string (PySpark printf).
1095pub fn printf(format: &str, columns: &[&Column]) -> Column {
1096    format_string(format, columns)
1097}
1098
1099/// Repeat string n times (PySpark repeat).
1100pub fn repeat(column: &Column, n: i32) -> Column {
1101    column.clone().repeat(n)
1102}
1103
1104/// Reverse string (PySpark reverse).
1105pub fn reverse(column: &Column) -> Column {
1106    column.clone().reverse()
1107}
1108
1109/// Find substring position 1-based; 0 if not found (PySpark instr).
1110pub fn instr(column: &Column, substr: &str) -> Column {
1111    column.clone().instr(substr)
1112}
1113
1114/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
1115pub fn position(substr: &str, column: &Column) -> Column {
1116    column.clone().instr(substr)
1117}
1118
1119/// ASCII value of first character (PySpark ascii). Returns Int32.
1120pub fn ascii(column: &Column) -> Column {
1121    column.clone().ascii()
1122}
1123
1124/// Format numeric as string with fixed decimal places (PySpark format_number).
1125pub fn format_number(column: &Column, decimals: u32) -> Column {
1126    column.clone().format_number(decimals)
1127}
1128
1129/// Replace substring at 1-based position (PySpark overlay). replace is literal.
1130pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
1131    column.clone().overlay(replace, pos, length)
1132}
1133
1134/// Int to single-character string (PySpark char). Valid codepoint only.
1135pub fn char(column: &Column) -> Column {
1136    column.clone().char()
1137}
1138
1139/// Alias for char (PySpark chr).
1140pub fn chr(column: &Column) -> Column {
1141    column.clone().chr()
1142}
1143
1144/// Base64 encode string bytes (PySpark base64).
1145pub fn base64(column: &Column) -> Column {
1146    column.clone().base64()
1147}
1148
1149/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1150pub fn unbase64(column: &Column) -> Column {
1151    column.clone().unbase64()
1152}
1153
1154/// SHA1 hash of string bytes, return hex string (PySpark sha1).
1155pub fn sha1(column: &Column) -> Column {
1156    column.clone().sha1()
1157}
1158
1159/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
1160pub fn sha2(column: &Column, bit_length: i32) -> Column {
1161    column.clone().sha2(bit_length)
1162}
1163
1164/// MD5 hash of string bytes, return hex string (PySpark md5).
1165pub fn md5(column: &Column) -> Column {
1166    column.clone().md5()
1167}
1168
1169/// Left-pad string to length with pad char (PySpark lpad).
1170pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1171    column.clone().lpad(length, pad)
1172}
1173
1174/// Right-pad string to length with pad char (PySpark rpad).
1175pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1176    column.clone().rpad(length, pad)
1177}
1178
1179/// Character-by-character translation (PySpark translate).
1180pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1181    column.clone().translate(from_str, to_str)
1182}
1183
1184/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
1185pub fn mask(
1186    column: &Column,
1187    upper_char: Option<char>,
1188    lower_char: Option<char>,
1189    digit_char: Option<char>,
1190    other_char: Option<char>,
1191) -> Column {
1192    column
1193        .clone()
1194        .mask(upper_char, lower_char, digit_char, other_char)
1195}
1196
1197/// Substring before/after nth delimiter (PySpark substring_index).
1198pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1199    column.clone().substring_index(delimiter, count)
1200}
1201
1202/// Leftmost n characters (PySpark left).
1203pub fn left(column: &Column, n: i64) -> Column {
1204    column.clone().left(n)
1205}
1206
1207/// Rightmost n characters (PySpark right).
1208pub fn right(column: &Column, n: i64) -> Column {
1209    column.clone().right(n)
1210}
1211
1212/// Replace all occurrences of search with replacement (literal). PySpark replace.
1213pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1214    column.clone().replace(search, replacement)
1215}
1216
1217/// True if string starts with prefix (PySpark startswith).
1218pub fn startswith(column: &Column, prefix: &str) -> Column {
1219    column.clone().startswith(prefix)
1220}
1221
1222/// True if string ends with suffix (PySpark endswith).
1223pub fn endswith(column: &Column, suffix: &str) -> Column {
1224    column.clone().endswith(suffix)
1225}
1226
1227/// True if string contains substring (literal). PySpark contains.
1228pub fn contains(column: &Column, substring: &str) -> Column {
1229    column.clone().contains(substring)
1230}
1231
1232/// SQL LIKE pattern (% any, _ one char). PySpark like.
1233/// When escape_char is Some(esc), esc + char treats that char as literal.
1234pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1235    column.clone().like(pattern, escape_char)
1236}
1237
1238/// Case-insensitive LIKE. PySpark ilike.
1239/// When escape_char is Some(esc), esc + char treats that char as literal.
1240pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1241    column.clone().ilike(pattern, escape_char)
1242}
1243
1244/// Alias for regexp_like. PySpark rlike / regexp.
1245pub fn rlike(column: &Column, pattern: &str) -> Column {
1246    column.clone().regexp_like(pattern)
1247}
1248
1249/// Alias for rlike (PySpark regexp).
1250pub fn regexp(column: &Column, pattern: &str) -> Column {
1251    rlike(column, pattern)
1252}
1253
1254/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1255pub fn soundex(column: &Column) -> Column {
1256    column.clone().soundex()
1257}
1258
1259/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1260pub fn levenshtein(column: &Column, other: &Column) -> Column {
1261    column.clone().levenshtein(other)
1262}
1263
1264/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1265pub fn crc32(column: &Column) -> Column {
1266    column.clone().crc32()
1267}
1268
1269/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1270pub fn xxhash64(column: &Column) -> Column {
1271    column.clone().xxhash64()
1272}
1273
1274/// Absolute value (PySpark abs)
1275pub fn abs(column: &Column) -> Column {
1276    column.clone().abs()
1277}
1278
1279/// Ceiling (PySpark ceil)
1280pub fn ceil(column: &Column) -> Column {
1281    column.clone().ceil()
1282}
1283
1284/// Floor (PySpark floor)
1285pub fn floor(column: &Column) -> Column {
1286    column.clone().floor()
1287}
1288
1289/// Round (PySpark round)
1290pub fn round(column: &Column, decimals: u32) -> Column {
1291    column.clone().round(decimals)
1292}
1293
1294/// Banker's rounding - round half to even (PySpark bround).
1295pub fn bround(column: &Column, scale: i32) -> Column {
1296    column.clone().bround(scale)
1297}
1298
1299/// Unary minus / negate (PySpark negate, negative).
1300pub fn negate(column: &Column) -> Column {
1301    column.clone().negate()
1302}
1303
1304/// Alias for negate. PySpark negative.
1305pub fn negative(column: &Column) -> Column {
1306    negate(column)
1307}
1308
1309/// Unary plus - no-op, returns column as-is (PySpark positive).
1310pub fn positive(column: &Column) -> Column {
1311    column.clone()
1312}
1313
1314/// Cotangent: 1/tan (PySpark cot).
1315pub fn cot(column: &Column) -> Column {
1316    column.clone().cot()
1317}
1318
1319/// Cosecant: 1/sin (PySpark csc).
1320pub fn csc(column: &Column) -> Column {
1321    column.clone().csc()
1322}
1323
1324/// Secant: 1/cos (PySpark sec).
1325pub fn sec(column: &Column) -> Column {
1326    column.clone().sec()
1327}
1328
1329/// Constant e = 2.718... (PySpark e).
1330pub fn e() -> Column {
1331    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1332}
1333
1334/// Constant pi = 3.14159... (PySpark pi).
1335pub fn pi() -> Column {
1336    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1337}
1338
1339/// Square root (PySpark sqrt)
1340pub fn sqrt(column: &Column) -> Column {
1341    column.clone().sqrt()
1342}
1343
1344/// Power (PySpark pow)
1345pub fn pow(column: &Column, exp: i64) -> Column {
1346    column.clone().pow(exp)
1347}
1348
1349/// Exponential (PySpark exp)
1350pub fn exp(column: &Column) -> Column {
1351    column.clone().exp()
1352}
1353
1354/// Natural logarithm (PySpark log with one arg)
1355pub fn log(column: &Column) -> Column {
1356    column.clone().log()
1357}
1358
1359/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1360pub fn log_with_base(column: &Column, base: f64) -> Column {
1361    crate::column::Column::from_expr(column.expr().clone().log(base), None)
1362}
1363
1364/// Sine in radians (PySpark sin)
1365pub fn sin(column: &Column) -> Column {
1366    column.clone().sin()
1367}
1368
1369/// Cosine in radians (PySpark cos)
1370pub fn cos(column: &Column) -> Column {
1371    column.clone().cos()
1372}
1373
1374/// Tangent in radians (PySpark tan)
1375pub fn tan(column: &Column) -> Column {
1376    column.clone().tan()
1377}
1378
1379/// Arc sine (PySpark asin)
1380pub fn asin(column: &Column) -> Column {
1381    column.clone().asin()
1382}
1383
1384/// Arc cosine (PySpark acos)
1385pub fn acos(column: &Column) -> Column {
1386    column.clone().acos()
1387}
1388
1389/// Arc tangent (PySpark atan)
1390pub fn atan(column: &Column) -> Column {
1391    column.clone().atan()
1392}
1393
1394/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1395pub fn atan2(y: &Column, x: &Column) -> Column {
1396    y.clone().atan2(x)
1397}
1398
1399/// Convert radians to degrees (PySpark degrees)
1400pub fn degrees(column: &Column) -> Column {
1401    column.clone().degrees()
1402}
1403
1404/// Convert degrees to radians (PySpark radians)
1405pub fn radians(column: &Column) -> Column {
1406    column.clone().radians()
1407}
1408
1409/// Sign of the number: -1, 0, or 1 (PySpark signum)
1410pub fn signum(column: &Column) -> Column {
1411    column.clone().signum()
1412}
1413
1414/// Alias for signum (PySpark sign).
1415pub fn sign(column: &Column) -> Column {
1416    signum(column)
1417}
1418
1419/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1420/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1421/// String-to-date accepts date and datetime strings (e.g. "2025-01-01 10:30:00" truncates to date) for Spark parity.
1422pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1423    let dtype = parse_type_name(type_name)?;
1424    if dtype == DataType::Boolean {
1425        use polars::prelude::GetOutput;
1426        let expr = column.expr().clone().map(
1427            |col| crate::udfs::apply_string_to_boolean(col, true),
1428            GetOutput::from_type(DataType::Boolean),
1429        );
1430        return Ok(Column::from_expr(expr, None));
1431    }
1432    if dtype == DataType::Date {
1433        use polars::prelude::GetOutput;
1434        let expr = column.expr().clone().map(
1435            |col| crate::udfs::apply_string_to_date(col, true),
1436            GetOutput::from_type(DataType::Date),
1437        );
1438        return Ok(Column::from_expr(expr, None));
1439    }
1440    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1441        use polars::prelude::GetOutput;
1442        let target = dtype.clone();
1443        // cast: strict=true – invalid strings should error (PySpark parity).
1444        let expr = column.expr().clone().map(
1445            move |col| crate::udfs::apply_string_to_int(col, true, target.clone()),
1446            GetOutput::from_type(dtype),
1447        );
1448        return Ok(Column::from_expr(expr, None));
1449    }
1450    if dtype == DataType::Float64 {
1451        use polars::prelude::GetOutput;
1452        // String-to-double uses custom parsing for Spark-style to_number semantics.
1453        let expr = column.expr().clone().map(
1454            |col| crate::udfs::apply_string_to_double(col, true),
1455            GetOutput::from_type(DataType::Float64),
1456        );
1457        return Ok(Column::from_expr(expr, None));
1458    }
1459    Ok(Column::from_expr(
1460        column.expr().clone().strict_cast(dtype),
1461        None,
1462    ))
1463}
1464
1465/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1466/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1467/// String-to-date accepts date and datetime strings; invalid strings become null.
1468pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1469    let dtype = parse_type_name(type_name)?;
1470    if dtype == DataType::Boolean {
1471        use polars::prelude::GetOutput;
1472        let expr = column.expr().clone().map(
1473            |col| crate::udfs::apply_string_to_boolean(col, false),
1474            GetOutput::from_type(DataType::Boolean),
1475        );
1476        return Ok(Column::from_expr(expr, None));
1477    }
1478    if dtype == DataType::Date {
1479        use polars::prelude::GetOutput;
1480        let expr = column.expr().clone().map(
1481            |col| crate::udfs::apply_string_to_date(col, false),
1482            GetOutput::from_type(DataType::Date),
1483        );
1484        return Ok(Column::from_expr(expr, None));
1485    }
1486    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1487        use polars::prelude::GetOutput;
1488        let target = dtype.clone();
1489        let expr = column.expr().clone().map(
1490            move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
1491            GetOutput::from_type(dtype),
1492        );
1493        return Ok(Column::from_expr(expr, None));
1494    }
1495    if dtype == DataType::Float64 {
1496        use polars::prelude::GetOutput;
1497        let expr = column.expr().clone().map(
1498            |col| crate::udfs::apply_string_to_double(col, false),
1499            GetOutput::from_type(DataType::Float64),
1500        );
1501        return Ok(Column::from_expr(expr, None));
1502    }
1503    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1504}
1505
1506/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1507/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1508/// Returns Err if the cast to string fails (invalid type name or unsupported column type).
1509pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1510    match format {
1511        Some(fmt) => Ok(column
1512            .clone()
1513            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1514        None => cast(column, "string"),
1515    }
1516}
1517
1518/// Alias for to_char (PySpark to_varchar).
1519pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1520    to_char(column, format)
1521}
1522
1523/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1524/// Returns Err if the cast to double fails (invalid type name or unsupported column type).
1525pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1526    cast(column, "double")
1527}
1528
1529/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1530/// Returns Err if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1531pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1532    try_cast(column, "double")
1533}
1534
1535/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1536/// When format is None, parses string columns with default format "%Y-%m-%d %H:%M:%S" (PySpark parity #273).
1537pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1538    use polars::prelude::{DataType, GetOutput, TimeUnit};
1539    let fmt_owned = format.map(|s| s.to_string());
1540    let expr = column.expr().clone().map(
1541        move |s| crate::udfs::apply_to_timestamp_format(s, fmt_owned.as_deref(), true),
1542        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1543    );
1544    Ok(crate::column::Column::from_expr(expr, None))
1545}
1546
1547/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1548/// When format is None, parses string columns with default format (null on invalid). #273
1549pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1550    use polars::prelude::*;
1551    let fmt_owned = format.map(|s| s.to_string());
1552    let expr = column.expr().clone().map(
1553        move |s| crate::udfs::apply_to_timestamp_format(s, fmt_owned.as_deref(), false),
1554        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1555    );
1556    Ok(crate::column::Column::from_expr(expr, None))
1557}
1558
1559/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1560pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1561    use polars::prelude::{DataType, GetOutput, TimeUnit};
1562    match format {
1563        None => crate::cast(column, "timestamp"),
1564        Some(fmt) => {
1565            let fmt_owned = fmt.to_string();
1566            let expr = column.expr().clone().map(
1567                move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1568                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1569            );
1570            Ok(crate::column::Column::from_expr(expr, None))
1571        }
1572    }
1573}
1574
1575/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1576pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1577    use polars::prelude::{DataType, GetOutput, TimeUnit};
1578    match format {
1579        None => crate::cast(column, "timestamp"),
1580        Some(fmt) => {
1581            let fmt_owned = fmt.to_string();
1582            let expr = column.expr().clone().map(
1583                move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1584                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1585            );
1586            Ok(crate::column::Column::from_expr(expr, None))
1587        }
1588    }
1589}
1590
1591/// Division that returns null on divide-by-zero (PySpark try_divide).
1592pub fn try_divide(left: &Column, right: &Column) -> Column {
1593    use polars::prelude::*;
1594    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1595    let null_expr = Expr::Literal(LiteralValue::Null);
1596    let div_expr =
1597        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1598    let expr = polars::prelude::when(zero_cond)
1599        .then(null_expr)
1600        .otherwise(div_expr);
1601    crate::column::Column::from_expr(expr, None)
1602}
1603
1604/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1605pub fn try_add(left: &Column, right: &Column) -> Column {
1606    let args = [right.expr().clone()];
1607    let expr =
1608        left.expr()
1609            .clone()
1610            .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1611    Column::from_expr(expr, None)
1612}
1613
1614/// Subtract that returns null on overflow (PySpark try_subtract).
1615pub fn try_subtract(left: &Column, right: &Column) -> Column {
1616    let args = [right.expr().clone()];
1617    let expr = left.expr().clone().map_many(
1618        crate::udfs::apply_try_subtract,
1619        &args,
1620        GetOutput::same_type(),
1621    );
1622    Column::from_expr(expr, None)
1623}
1624
1625/// Multiply that returns null on overflow (PySpark try_multiply).
1626pub fn try_multiply(left: &Column, right: &Column) -> Column {
1627    let args = [right.expr().clone()];
1628    let expr = left.expr().clone().map_many(
1629        crate::udfs::apply_try_multiply,
1630        &args,
1631        GetOutput::same_type(),
1632    );
1633    Column::from_expr(expr, None)
1634}
1635
1636/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1637pub fn try_element_at(column: &Column, index: i64) -> Column {
1638    column.clone().element_at(index)
1639}
1640
1641/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1642pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1643    if num_bucket <= 0 {
1644        panic!(
1645            "width_bucket: num_bucket must be positive, got {}",
1646            num_bucket
1647        );
1648    }
1649    use polars::prelude::*;
1650    let v = value.expr().clone().cast(DataType::Float64);
1651    let min_expr = lit(min_val);
1652    let max_expr = lit(max_val);
1653    let nb = num_bucket as f64;
1654    let width = (max_val - min_val) / nb;
1655    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1656    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1657    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1658    let expr = polars::prelude::when(v.clone().lt(min_expr))
1659        .then(lit(0i64))
1660        .when(v.gt_eq(max_expr))
1661        .then(lit(num_bucket + 1))
1662        .otherwise(bucket_clamped);
1663    crate::column::Column::from_expr(expr, None)
1664}
1665
1666/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1667pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1668    use polars::prelude::*;
1669    if columns.is_empty() {
1670        panic!("elt requires at least one column");
1671    }
1672    let idx_expr = index.expr().clone();
1673    let null_expr = Expr::Literal(LiteralValue::Null);
1674    let mut expr = null_expr;
1675    for (i, c) in columns.iter().enumerate().rev() {
1676        let n = (i + 1) as i64;
1677        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1678            .then(c.expr().clone())
1679            .otherwise(expr);
1680    }
1681    crate::column::Column::from_expr(expr, None)
1682}
1683
1684/// Bit length of string (bytes * 8) (PySpark bit_length).
1685pub fn bit_length(column: &Column) -> Column {
1686    column.clone().bit_length()
1687}
1688
1689/// Length of string in bytes (PySpark octet_length).
1690pub fn octet_length(column: &Column) -> Column {
1691    column.clone().octet_length()
1692}
1693
1694/// Length of string in characters (PySpark char_length). Alias of length().
1695pub fn char_length(column: &Column) -> Column {
1696    column.clone().char_length()
1697}
1698
1699/// Length of string in characters (PySpark character_length). Alias of length().
1700pub fn character_length(column: &Column) -> Column {
1701    column.clone().character_length()
1702}
1703
1704/// Data type of column as string (PySpark typeof). Constant per column from schema.
1705pub fn typeof_(column: &Column) -> Column {
1706    column.clone().typeof_()
1707}
1708
1709/// True where the float value is NaN (PySpark isnan).
1710pub fn isnan(column: &Column) -> Column {
1711    column.clone().is_nan()
1712}
1713
1714/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1715pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1716    if columns.is_empty() {
1717        return Err("greatest requires at least one column".to_string());
1718    }
1719    if columns.len() == 1 {
1720        return Ok((*columns[0]).clone());
1721    }
1722    let mut expr = columns[0].expr().clone();
1723    for c in columns.iter().skip(1) {
1724        let args = [c.expr().clone()];
1725        expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1726    }
1727    Ok(Column::from_expr(expr, None))
1728}
1729
1730/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1731pub fn least(columns: &[&Column]) -> Result<Column, String> {
1732    if columns.is_empty() {
1733        return Err("least requires at least one column".to_string());
1734    }
1735    if columns.len() == 1 {
1736        return Ok((*columns[0]).clone());
1737    }
1738    let mut expr = columns[0].expr().clone();
1739    for c in columns.iter().skip(1) {
1740        let args = [c.expr().clone()];
1741        expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1742    }
1743    Ok(Column::from_expr(expr, None))
1744}
1745
1746/// Extract year from datetime column (PySpark year)
1747pub fn year(column: &Column) -> Column {
1748    column.clone().year()
1749}
1750
1751/// Extract month from datetime column (PySpark month)
1752pub fn month(column: &Column) -> Column {
1753    column.clone().month()
1754}
1755
1756/// Extract day of month from datetime column (PySpark day)
1757pub fn day(column: &Column) -> Column {
1758    column.clone().day()
1759}
1760
1761/// Cast or parse to date (PySpark to_date). When format is None: cast date/datetime to date, parse string with default formats. When format is Some: parse string with given format.
1762pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1763    use polars::prelude::GetOutput;
1764    let fmt = format.map(|s| s.to_string());
1765    let expr = column.expr().clone().map(
1766        move |col| crate::udfs::apply_string_to_date_format(col, fmt.as_deref(), false),
1767        GetOutput::from_type(DataType::Date),
1768    );
1769    Ok(Column::from_expr(expr, None))
1770}
1771
1772/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1773pub fn date_format(column: &Column, format: &str) -> Column {
1774    column
1775        .clone()
1776        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1777}
1778
1779/// Current date (evaluation time). PySpark current_date.
1780pub fn current_date() -> Column {
1781    use polars::prelude::*;
1782    let today = chrono::Utc::now().date_naive();
1783    let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1784    crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1785}
1786
1787/// Current timestamp (evaluation time). PySpark current_timestamp.
1788pub fn current_timestamp() -> Column {
1789    use polars::prelude::*;
1790    let ts = chrono::Utc::now().timestamp_micros();
1791    crate::column::Column::from_expr(
1792        Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1793        None,
1794    )
1795}
1796
1797/// Alias for current_date (PySpark curdate).
1798pub fn curdate() -> Column {
1799    current_date()
1800}
1801
1802/// Alias for current_timestamp (PySpark now).
1803pub fn now() -> Column {
1804    current_timestamp()
1805}
1806
1807/// Alias for current_timestamp (PySpark localtimestamp).
1808pub fn localtimestamp() -> Column {
1809    current_timestamp()
1810}
1811
1812/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1813pub fn date_diff(end: &Column, start: &Column) -> Column {
1814    datediff(end, start)
1815}
1816
1817/// Alias for date_add (PySpark dateadd).
1818pub fn dateadd(column: &Column, n: i32) -> Column {
1819    date_add(column, n)
1820}
1821
1822/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1823pub fn extract(column: &Column, field: &str) -> Column {
1824    column.clone().extract(field)
1825}
1826
1827/// Alias for extract (PySpark date_part).
1828pub fn date_part(column: &Column, field: &str) -> Column {
1829    extract(column, field)
1830}
1831
1832/// Alias for extract (PySpark datepart).
1833pub fn datepart(column: &Column, field: &str) -> Column {
1834    extract(column, field)
1835}
1836
1837/// Timestamp to microseconds since epoch (PySpark unix_micros).
1838pub fn unix_micros(column: &Column) -> Column {
1839    column.clone().unix_micros()
1840}
1841
1842/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1843pub fn unix_millis(column: &Column) -> Column {
1844    column.clone().unix_millis()
1845}
1846
1847/// Timestamp to seconds since epoch (PySpark unix_seconds).
1848pub fn unix_seconds(column: &Column) -> Column {
1849    column.clone().unix_seconds()
1850}
1851
1852/// Weekday name "Mon","Tue",... (PySpark dayname).
1853pub fn dayname(column: &Column) -> Column {
1854    column.clone().dayname()
1855}
1856
1857/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1858pub fn weekday(column: &Column) -> Column {
1859    column.clone().weekday()
1860}
1861
1862/// Extract hour from datetime column (PySpark hour).
1863pub fn hour(column: &Column) -> Column {
1864    column.clone().hour()
1865}
1866
1867/// Extract minute from datetime column (PySpark minute).
1868pub fn minute(column: &Column) -> Column {
1869    column.clone().minute()
1870}
1871
1872/// Extract second from datetime column (PySpark second).
1873pub fn second(column: &Column) -> Column {
1874    column.clone().second()
1875}
1876
1877/// Add n days to date column (PySpark date_add).
1878pub fn date_add(column: &Column, n: i32) -> Column {
1879    column.clone().date_add(n)
1880}
1881
1882/// Subtract n days from date column (PySpark date_sub).
1883pub fn date_sub(column: &Column, n: i32) -> Column {
1884    column.clone().date_sub(n)
1885}
1886
1887/// Number of days between two date columns (PySpark datediff).
1888pub fn datediff(end: &Column, start: &Column) -> Column {
1889    start.clone().datediff(end)
1890}
1891
1892/// Last day of month for date column (PySpark last_day).
1893pub fn last_day(column: &Column) -> Column {
1894    column.clone().last_day()
1895}
1896
1897/// Truncate date/datetime to unit (PySpark trunc).
1898pub fn trunc(column: &Column, format: &str) -> Column {
1899    column.clone().trunc(format)
1900}
1901
1902/// Alias for trunc (PySpark date_trunc).
1903pub fn date_trunc(format: &str, column: &Column) -> Column {
1904    trunc(column, format)
1905}
1906
1907/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1908pub fn quarter(column: &Column) -> Column {
1909    column.clone().quarter()
1910}
1911
1912/// Extract ISO week of year (1-53) (PySpark weekofyear).
1913pub fn weekofyear(column: &Column) -> Column {
1914    column.clone().weekofyear()
1915}
1916
1917/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1918pub fn dayofweek(column: &Column) -> Column {
1919    column.clone().dayofweek()
1920}
1921
1922/// Extract day of year (1-366) (PySpark dayofyear).
1923pub fn dayofyear(column: &Column) -> Column {
1924    column.clone().dayofyear()
1925}
1926
1927/// Add n months to date column (PySpark add_months).
1928pub fn add_months(column: &Column, n: i32) -> Column {
1929    column.clone().add_months(n)
1930}
1931
1932/// Months between end and start dates as fractional (PySpark months_between).
1933/// When round_off is true, rounds to 8 decimal places (PySpark default).
1934pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1935    end.clone().months_between(start, round_off)
1936}
1937
1938/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
1939pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1940    column.clone().next_day(day_of_week)
1941}
1942
1943/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
1944pub fn unix_timestamp_now() -> Column {
1945    use polars::prelude::*;
1946    let secs = chrono::Utc::now().timestamp();
1947    crate::column::Column::from_expr(lit(secs), None)
1948}
1949
1950/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
1951pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1952    column.clone().unix_timestamp(format)
1953}
1954
1955/// Alias for unix_timestamp.
1956pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1957    unix_timestamp(column, format)
1958}
1959
1960/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1961pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1962    column.clone().from_unixtime(format)
1963}
1964
1965/// Build date from year, month, day columns (PySpark make_date).
1966pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1967    use polars::prelude::*;
1968    let args = [month.expr().clone(), day.expr().clone()];
1969    let expr = year.expr().clone().map_many(
1970        crate::udfs::apply_make_date,
1971        &args,
1972        GetOutput::from_type(DataType::Date),
1973    );
1974    crate::column::Column::from_expr(expr, None)
1975}
1976
1977/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
1978/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
1979pub fn make_timestamp(
1980    year: &Column,
1981    month: &Column,
1982    day: &Column,
1983    hour: &Column,
1984    minute: &Column,
1985    sec: &Column,
1986    timezone: Option<&str>,
1987) -> Column {
1988    use polars::prelude::*;
1989    let tz_owned = timezone.map(|s| s.to_string());
1990    let args = [
1991        month.expr().clone(),
1992        day.expr().clone(),
1993        hour.expr().clone(),
1994        minute.expr().clone(),
1995        sec.expr().clone(),
1996    ];
1997    let expr = year.expr().clone().map_many(
1998        move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1999        &args,
2000        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
2001    );
2002    crate::column::Column::from_expr(expr, None)
2003}
2004
2005/// Add amount of unit to timestamp (PySpark timestampadd).
2006pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
2007    ts.clone().timestampadd(unit, amount)
2008}
2009
2010/// Difference between timestamps in unit (PySpark timestampdiff).
2011pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
2012    start.clone().timestampdiff(unit, end)
2013}
2014
2015/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
2016pub fn days(n: i64) -> Column {
2017    make_interval(0, 0, 0, n, 0, 0, 0)
2018}
2019
2020/// Interval of n hours (PySpark hours).
2021pub fn hours(n: i64) -> Column {
2022    make_interval(0, 0, 0, 0, n, 0, 0)
2023}
2024
2025/// Interval of n minutes (PySpark minutes).
2026pub fn minutes(n: i64) -> Column {
2027    make_interval(0, 0, 0, 0, 0, n, 0)
2028}
2029
2030/// Interval of n months (PySpark months). Approximated as 30*n days.
2031pub fn months(n: i64) -> Column {
2032    make_interval(0, n, 0, 0, 0, 0, 0)
2033}
2034
2035/// Interval of n years (PySpark years). Approximated as 365*n days.
2036pub fn years(n: i64) -> Column {
2037    make_interval(n, 0, 0, 0, 0, 0, 0)
2038}
2039
2040/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
2041pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
2042    column.clone().from_utc_timestamp(tz)
2043}
2044
2045/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
2046pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
2047    column.clone().to_utc_timestamp(tz)
2048}
2049
2050/// Convert timestamp between timezones (PySpark convert_timezone).
2051pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
2052    let source_tz = source_tz.to_string();
2053    let target_tz = target_tz.to_string();
2054    let expr = column.expr().clone().map(
2055        move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
2056        GetOutput::same_type(),
2057    );
2058    crate::column::Column::from_expr(expr, None)
2059}
2060
2061/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
2062pub fn current_timezone() -> Column {
2063    use polars::prelude::*;
2064    crate::column::Column::from_expr(lit("UTC"), None)
2065}
2066
2067/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
2068pub fn make_interval(
2069    years: i64,
2070    months: i64,
2071    weeks: i64,
2072    days: i64,
2073    hours: i64,
2074    mins: i64,
2075    secs: i64,
2076) -> Column {
2077    use polars::prelude::*;
2078    // Approximate: 1 year = 365 days, 1 month = 30 days
2079    let total_days = years * 365 + months * 30 + weeks * 7 + days;
2080    let args = DurationArgs::new()
2081        .with_days(lit(total_days))
2082        .with_hours(lit(hours))
2083        .with_minutes(lit(mins))
2084        .with_seconds(lit(secs));
2085    let dur = duration(args);
2086    crate::column::Column::from_expr(dur, None)
2087}
2088
2089/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
2090pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2091    use polars::prelude::*;
2092    let args = DurationArgs::new()
2093        .with_days(lit(days))
2094        .with_hours(lit(hours))
2095        .with_minutes(lit(minutes))
2096        .with_seconds(lit(seconds));
2097    let dur = duration(args);
2098    crate::column::Column::from_expr(dur, None)
2099}
2100
2101/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
2102pub fn make_ym_interval(years: i32, months: i32) -> Column {
2103    use polars::prelude::*;
2104    let total_months = years * 12 + months;
2105    crate::column::Column::from_expr(lit(total_months), None)
2106}
2107
2108/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
2109pub fn make_timestamp_ntz(
2110    year: &Column,
2111    month: &Column,
2112    day: &Column,
2113    hour: &Column,
2114    minute: &Column,
2115    sec: &Column,
2116) -> Column {
2117    make_timestamp(year, month, day, hour, minute, sec, None)
2118}
2119
2120/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
2121pub fn timestamp_seconds(column: &Column) -> Column {
2122    column.clone().timestamp_seconds()
2123}
2124
2125/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
2126pub fn timestamp_millis(column: &Column) -> Column {
2127    column.clone().timestamp_millis()
2128}
2129
2130/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
2131pub fn timestamp_micros(column: &Column) -> Column {
2132    column.clone().timestamp_micros()
2133}
2134
2135/// Date to days since 1970-01-01 (PySpark unix_date).
2136pub fn unix_date(column: &Column) -> Column {
2137    column.clone().unix_date()
2138}
2139
2140/// Days since epoch to date (PySpark date_from_unix_date).
2141pub fn date_from_unix_date(column: &Column) -> Column {
2142    column.clone().date_from_unix_date()
2143}
2144
2145/// Positive modulus (PySpark pmod).
2146pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2147    dividend.clone().pmod(divisor)
2148}
2149
2150/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
2151pub fn factorial(column: &Column) -> Column {
2152    column.clone().factorial()
2153}
2154
2155/// Concatenate string columns without separator (PySpark concat)
2156pub fn concat(columns: &[&Column]) -> Column {
2157    use polars::prelude::*;
2158    if columns.is_empty() {
2159        panic!("concat requires at least one column");
2160    }
2161    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2162    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2163}
2164
2165/// Concatenate string columns with separator (PySpark concat_ws)
2166pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2167    use polars::prelude::*;
2168    if columns.is_empty() {
2169        panic!("concat_ws requires at least one column");
2170    }
2171    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2172    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2173}
2174
2175/// Row number window function (1, 2, 3 by order within partition).
2176/// Use with `.over(partition_by)` after ranking by an order column.
2177///
2178/// # Example
2179/// ```
2180/// use robin_sparkless::{col, Column};
2181/// let salary_col = col("salary");
2182/// let rn = salary_col.row_number(true).over(&["dept"]);
2183/// ```
2184pub fn row_number(column: &Column) -> Column {
2185    column.clone().row_number(false)
2186}
2187
2188/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
2189pub fn rank(column: &Column, descending: bool) -> Column {
2190    column.clone().rank(descending)
2191}
2192
2193/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
2194pub fn dense_rank(column: &Column, descending: bool) -> Column {
2195    column.clone().dense_rank(descending)
2196}
2197
2198/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
2199pub fn lag(column: &Column, n: i64) -> Column {
2200    column.clone().lag(n)
2201}
2202
2203/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
2204pub fn lead(column: &Column, n: i64) -> Column {
2205    column.clone().lead(n)
2206}
2207
2208/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2209pub fn first_value(column: &Column) -> Column {
2210    column.clone().first_value()
2211}
2212
2213/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2214pub fn last_value(column: &Column) -> Column {
2215    column.clone().last_value()
2216}
2217
2218/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
2219pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2220    column.clone().percent_rank(partition_by, descending)
2221}
2222
2223/// Cumulative distribution in partition: row_number / count. Window is applied.
2224pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2225    column.clone().cume_dist(partition_by, descending)
2226}
2227
2228/// Ntile: bucket 1..n by rank within partition. Window is applied.
2229pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2230    column.clone().ntile(n, partition_by, descending)
2231}
2232
2233/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
2234pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2235    column.clone().nth_value(n, partition_by, descending)
2236}
2237
2238/// Coalesce - returns the first non-null value from multiple columns.
2239///
2240/// # Example
2241/// ```
2242/// use robin_sparkless::{col, lit_i64, coalesce};
2243///
2244/// // coalesce(col("a"), col("b"), lit(0))
2245/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
2246/// ```
2247pub fn coalesce(columns: &[&Column]) -> Column {
2248    use polars::prelude::*;
2249    if columns.is_empty() {
2250        panic!("coalesce requires at least one column");
2251    }
2252    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2253    let expr = coalesce(&exprs);
2254    crate::column::Column::from_expr(expr, None)
2255}
2256
2257/// Alias for coalesce(col, value). PySpark nvl / ifnull.
2258pub fn nvl(column: &Column, value: &Column) -> Column {
2259    coalesce(&[column, value])
2260}
2261
2262/// Alias for nvl. PySpark ifnull.
2263pub fn ifnull(column: &Column, value: &Column) -> Column {
2264    nvl(column, value)
2265}
2266
2267/// Return null if column equals value, else column. PySpark nullif.
2268pub fn nullif(column: &Column, value: &Column) -> Column {
2269    use polars::prelude::*;
2270    let cond = column.expr().clone().eq(value.expr().clone());
2271    let null_lit = Expr::Literal(LiteralValue::Null);
2272    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2273    crate::column::Column::from_expr(expr, None)
2274}
2275
2276/// Replace NaN with value. PySpark nanvl.
2277pub fn nanvl(column: &Column, value: &Column) -> Column {
2278    use polars::prelude::*;
2279    let cond = column.expr().clone().is_nan();
2280    let expr = when(cond)
2281        .then(value.expr().clone())
2282        .otherwise(column.expr().clone());
2283    crate::column::Column::from_expr(expr, None)
2284}
2285
2286/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
2287pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2288    use polars::prelude::*;
2289    let cond = col1.expr().clone().is_not_null();
2290    let expr = when(cond)
2291        .then(col2.expr().clone())
2292        .otherwise(col3.expr().clone());
2293    crate::column::Column::from_expr(expr, None)
2294}
2295
2296/// Alias for substring. PySpark substr.
2297pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2298    substring(column, start, length)
2299}
2300
2301/// Alias for pow. PySpark power.
2302pub fn power(column: &Column, exp: i64) -> Column {
2303    pow(column, exp)
2304}
2305
2306/// Alias for log (natural log). PySpark ln.
2307pub fn ln(column: &Column) -> Column {
2308    log(column)
2309}
2310
2311/// Alias for ceil. PySpark ceiling.
2312pub fn ceiling(column: &Column) -> Column {
2313    ceil(column)
2314}
2315
2316/// Alias for lower. PySpark lcase.
2317pub fn lcase(column: &Column) -> Column {
2318    lower(column)
2319}
2320
2321/// Alias for upper. PySpark ucase.
2322pub fn ucase(column: &Column) -> Column {
2323    upper(column)
2324}
2325
2326/// Alias for day. PySpark dayofmonth.
2327pub fn dayofmonth(column: &Column) -> Column {
2328    day(column)
2329}
2330
2331/// Alias for degrees. PySpark toDegrees.
2332pub fn to_degrees(column: &Column) -> Column {
2333    degrees(column)
2334}
2335
2336/// Alias for radians. PySpark toRadians.
2337pub fn to_radians(column: &Column) -> Column {
2338    radians(column)
2339}
2340
2341/// Hyperbolic cosine (PySpark cosh).
2342pub fn cosh(column: &Column) -> Column {
2343    column.clone().cosh()
2344}
2345/// Hyperbolic sine (PySpark sinh).
2346pub fn sinh(column: &Column) -> Column {
2347    column.clone().sinh()
2348}
2349/// Hyperbolic tangent (PySpark tanh).
2350pub fn tanh(column: &Column) -> Column {
2351    column.clone().tanh()
2352}
2353/// Inverse hyperbolic cosine (PySpark acosh).
2354pub fn acosh(column: &Column) -> Column {
2355    column.clone().acosh()
2356}
2357/// Inverse hyperbolic sine (PySpark asinh).
2358pub fn asinh(column: &Column) -> Column {
2359    column.clone().asinh()
2360}
2361/// Inverse hyperbolic tangent (PySpark atanh).
2362pub fn atanh(column: &Column) -> Column {
2363    column.clone().atanh()
2364}
2365/// Cube root (PySpark cbrt).
2366pub fn cbrt(column: &Column) -> Column {
2367    column.clone().cbrt()
2368}
2369/// exp(x) - 1 (PySpark expm1).
2370pub fn expm1(column: &Column) -> Column {
2371    column.clone().expm1()
2372}
2373/// log(1 + x) (PySpark log1p).
2374pub fn log1p(column: &Column) -> Column {
2375    column.clone().log1p()
2376}
2377/// Base-10 log (PySpark log10).
2378pub fn log10(column: &Column) -> Column {
2379    column.clone().log10()
2380}
2381/// Base-2 log (PySpark log2).
2382pub fn log2(column: &Column) -> Column {
2383    column.clone().log2()
2384}
2385/// Round to nearest integer (PySpark rint).
2386pub fn rint(column: &Column) -> Column {
2387    column.clone().rint()
2388}
2389/// sqrt(x*x + y*y) (PySpark hypot).
2390pub fn hypot(x: &Column, y: &Column) -> Column {
2391    let xx = x.expr().clone() * x.expr().clone();
2392    let yy = y.expr().clone() * y.expr().clone();
2393    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2394}
2395
2396/// True if column is null. PySpark isnull.
2397pub fn isnull(column: &Column) -> Column {
2398    column.clone().is_null()
2399}
2400
2401/// True if column is not null. PySpark isnotnull.
2402pub fn isnotnull(column: &Column) -> Column {
2403    column.clone().is_not_null()
2404}
2405
2406/// Create an array column from multiple columns (PySpark array).
2407/// With no arguments, returns a column of empty arrays (one per row); PySpark parity.
2408pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2409    use polars::prelude::*;
2410    if columns.is_empty() {
2411        // PySpark F.array() with no args: one empty list per row (broadcast literal).
2412        // Use .first() so the single-row literal is treated as a scalar and broadcasts to frame height.
2413        let empty_inner = Series::new("".into(), Vec::<i64>::new());
2414        let list_series = ListChunked::from_iter([Some(empty_inner)])
2415            .with_name("array".into())
2416            .into_series();
2417        let expr = lit(list_series).first();
2418        return Ok(crate::column::Column::from_expr(expr, None));
2419    }
2420    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2421    let expr = concat_list(exprs)
2422        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2423    Ok(crate::column::Column::from_expr(expr, None))
2424}
2425
2426/// Number of elements in list (PySpark size / array_size). Returns Int32.
2427pub fn array_size(column: &Column) -> Column {
2428    column.clone().array_size()
2429}
2430
2431/// Alias for array_size (PySpark size).
2432pub fn size(column: &Column) -> Column {
2433    column.clone().array_size()
2434}
2435
2436/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2437pub fn cardinality(column: &Column) -> Column {
2438    column.clone().cardinality()
2439}
2440
2441/// Check if list contains value (PySpark array_contains).
2442pub fn array_contains(column: &Column, value: &Column) -> Column {
2443    column.clone().array_contains(value.expr().clone())
2444}
2445
2446/// Join list of strings with separator (PySpark array_join).
2447pub fn array_join(column: &Column, separator: &str) -> Column {
2448    column.clone().array_join(separator)
2449}
2450
2451/// Maximum element in list (PySpark array_max).
2452pub fn array_max(column: &Column) -> Column {
2453    column.clone().array_max()
2454}
2455
2456/// Minimum element in list (PySpark array_min).
2457pub fn array_min(column: &Column) -> Column {
2458    column.clone().array_min()
2459}
2460
2461/// Get element at 1-based index (PySpark element_at).
2462pub fn element_at(column: &Column, index: i64) -> Column {
2463    column.clone().element_at(index)
2464}
2465
2466/// Sort list elements (PySpark array_sort).
2467pub fn array_sort(column: &Column) -> Column {
2468    column.clone().array_sort()
2469}
2470
2471/// Distinct elements in list (PySpark array_distinct).
2472pub fn array_distinct(column: &Column) -> Column {
2473    column.clone().array_distinct()
2474}
2475
2476/// Slice list from 1-based start with optional length (PySpark slice).
2477pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2478    column.clone().array_slice(start, length)
2479}
2480
2481/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2482/// step defaults to 1.
2483pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2484    use polars::prelude::{as_struct, lit, DataType, GetOutput};
2485    let step_expr = step
2486        .map(|c| c.expr().clone().alias("2"))
2487        .unwrap_or_else(|| lit(1i64).alias("2"));
2488    let struct_expr = as_struct(vec![
2489        start.expr().clone().alias("0"),
2490        stop.expr().clone().alias("1"),
2491        step_expr,
2492    ]);
2493    let out_dtype = DataType::List(Box::new(DataType::Int64));
2494    let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2495    crate::column::Column::from_expr(expr, None)
2496}
2497
2498/// Random permutation of list elements (PySpark shuffle).
2499pub fn shuffle(column: &Column) -> Column {
2500    use polars::prelude::GetOutput;
2501    let expr = column
2502        .expr()
2503        .clone()
2504        .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2505    crate::column::Column::from_expr(expr, None)
2506}
2507
2508/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2509/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2510pub fn inline(column: &Column) -> Column {
2511    column.clone().explode()
2512}
2513
2514/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2515pub fn inline_outer(column: &Column) -> Column {
2516    column.clone().explode_outer()
2517}
2518
2519/// Explode list into one row per element (PySpark explode).
2520pub fn explode(column: &Column) -> Column {
2521    column.clone().explode()
2522}
2523
2524/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2525/// Implemented via Polars list.eval with col("") as element.
2526pub fn array_position(column: &Column, value: &Column) -> Column {
2527    column.clone().array_position(value.expr().clone())
2528}
2529
2530/// Remove null elements from list (PySpark array_compact).
2531pub fn array_compact(column: &Column) -> Column {
2532    column.clone().array_compact()
2533}
2534
2535/// New list with all elements equal to value removed (PySpark array_remove).
2536/// Implemented via Polars list.eval + list.drop_nulls.
2537pub fn array_remove(column: &Column, value: &Column) -> Column {
2538    column.clone().array_remove(value.expr().clone())
2539}
2540
2541/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2542pub fn array_repeat(column: &Column, n: i64) -> Column {
2543    column.clone().array_repeat(n)
2544}
2545
2546/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2547pub fn array_flatten(column: &Column) -> Column {
2548    column.clone().array_flatten()
2549}
2550
2551/// True if any list element satisfies the predicate (PySpark exists).
2552pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2553    column.clone().array_exists(predicate)
2554}
2555
2556/// True if all list elements satisfy the predicate (PySpark forall).
2557pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2558    column.clone().array_forall(predicate)
2559}
2560
2561/// Filter list elements by predicate (PySpark filter).
2562pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2563    column.clone().array_filter(predicate)
2564}
2565
2566/// Transform list elements by expression (PySpark transform).
2567pub fn array_transform(column: &Column, f: Expr) -> Column {
2568    column.clone().array_transform(f)
2569}
2570
2571/// Sum of list elements (PySpark aggregate sum).
2572pub fn array_sum(column: &Column) -> Column {
2573    column.clone().array_sum()
2574}
2575
2576/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2577pub fn aggregate(column: &Column, zero: &Column) -> Column {
2578    column.clone().array_aggregate(zero)
2579}
2580
2581/// Mean of list elements (PySpark aggregate avg).
2582pub fn array_mean(column: &Column) -> Column {
2583    column.clone().array_mean()
2584}
2585
2586/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2587/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2588pub fn posexplode(column: &Column) -> (Column, Column) {
2589    column.clone().posexplode()
2590}
2591
2592/// Build a map column from alternating key/value expressions (PySpark create_map).
2593/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2594/// With no args (or empty slice), returns a column of empty maps per row (PySpark parity #275).
2595pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2596    use polars::chunked_array::StructChunked;
2597    use polars::prelude::{as_struct, concat_list, lit, IntoSeries, ListChunked};
2598    if key_values.is_empty() {
2599        // PySpark F.create_map() with no args: one empty map {} per row (broadcast literal).
2600        let key_s = Series::new("key".into(), Vec::<String>::new());
2601        let value_s = Series::new("value".into(), Vec::<String>::new());
2602        let fields: [&Series; 2] = [&key_s, &value_s];
2603        let empty_struct = StructChunked::from_series(
2604            polars::prelude::PlSmallStr::EMPTY,
2605            0,
2606            fields.iter().copied(),
2607        )
2608        .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2609        .into_series();
2610        let list_series = ListChunked::from_iter([Some(empty_struct)])
2611            .with_name("create_map".into())
2612            .into_series();
2613        let expr = lit(list_series).first();
2614        return Ok(crate::column::Column::from_expr(expr, None));
2615    }
2616    let mut struct_exprs: Vec<Expr> = Vec::new();
2617    for i in (0..key_values.len()).step_by(2) {
2618        if i + 1 < key_values.len() {
2619            let k = key_values[i].expr().clone().alias("key");
2620            let v = key_values[i + 1].expr().clone().alias("value");
2621            struct_exprs.push(as_struct(vec![k, v]));
2622        }
2623    }
2624    let expr = concat_list(struct_exprs)
2625        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2626    Ok(crate::column::Column::from_expr(expr, None))
2627}
2628
2629/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2630pub fn map_keys(column: &Column) -> Column {
2631    column.clone().map_keys()
2632}
2633
2634/// Extract values from a map column (PySpark map_values).
2635pub fn map_values(column: &Column) -> Column {
2636    column.clone().map_values()
2637}
2638
2639/// Return map as list of structs {key, value} (PySpark map_entries).
2640pub fn map_entries(column: &Column) -> Column {
2641    column.clone().map_entries()
2642}
2643
2644/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2645pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2646    keys.clone().map_from_arrays(values)
2647}
2648
2649/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2650pub fn map_concat(a: &Column, b: &Column) -> Column {
2651    a.clone().map_concat(b)
2652}
2653
2654/// Array of structs {key, value} to map (PySpark map_from_entries).
2655pub fn map_from_entries(column: &Column) -> Column {
2656    column.clone().map_from_entries()
2657}
2658
2659/// True if map contains key (PySpark map_contains_key).
2660pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2661    map_col.clone().map_contains_key(key)
2662}
2663
2664/// Get value for key from map, or null (PySpark get).
2665pub fn get(map_col: &Column, key: &Column) -> Column {
2666    map_col.clone().get(key)
2667}
2668
2669/// Filter map entries by predicate (PySpark map_filter).
2670pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2671    map_col.clone().map_filter(predicate)
2672}
2673
2674/// Merge two maps by key with merge function (PySpark map_zip_with).
2675pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2676    map1.clone().map_zip_with(map2, merge)
2677}
2678
2679/// Convenience: zip_with with coalesce(left, right) merge.
2680pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2681    use polars::prelude::col;
2682    let left_field = col("").struct_().field_by_name("left");
2683    let right_field = col("").struct_().field_by_name("right");
2684    let merge = crate::column::Column::from_expr(
2685        coalesce(&[
2686            &crate::column::Column::from_expr(left_field, None),
2687            &crate::column::Column::from_expr(right_field, None),
2688        ])
2689        .into_expr(),
2690        None,
2691    );
2692    left.clone().zip_with(right, merge.into_expr())
2693}
2694
2695/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2696pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2697    use polars::prelude::col;
2698    let v1 = col("").struct_().field_by_name("value1");
2699    let v2 = col("").struct_().field_by_name("value2");
2700    let merge = coalesce(&[
2701        &crate::column::Column::from_expr(v1, None),
2702        &crate::column::Column::from_expr(v2, None),
2703    ])
2704    .into_expr();
2705    map1.clone().map_zip_with(map2, merge)
2706}
2707
2708/// Convenience: map_filter with value > threshold predicate.
2709pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2710    use polars::prelude::{col, lit};
2711    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2712    map_col.clone().map_filter(pred)
2713}
2714
2715/// Create struct from columns using column names as field names (PySpark struct).
2716pub fn struct_(columns: &[&Column]) -> Column {
2717    use polars::prelude::as_struct;
2718    if columns.is_empty() {
2719        panic!("struct requires at least one column");
2720    }
2721    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2722    crate::column::Column::from_expr(as_struct(exprs), None)
2723}
2724
2725/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2726pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2727    use polars::prelude::as_struct;
2728    if pairs.is_empty() {
2729        panic!("named_struct requires at least one (name, column) pair");
2730    }
2731    let exprs: Vec<Expr> = pairs
2732        .iter()
2733        .map(|(name, col)| col.expr().clone().alias(*name))
2734        .collect();
2735    crate::column::Column::from_expr(as_struct(exprs), None)
2736}
2737
2738/// Append element to end of list (PySpark array_append).
2739pub fn array_append(array: &Column, elem: &Column) -> Column {
2740    array.clone().array_append(elem)
2741}
2742
2743/// Prepend element to start of list (PySpark array_prepend).
2744pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2745    array.clone().array_prepend(elem)
2746}
2747
2748/// Insert element at 1-based position (PySpark array_insert).
2749pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2750    array.clone().array_insert(pos, elem)
2751}
2752
2753/// Elements in first array not in second (PySpark array_except).
2754pub fn array_except(a: &Column, b: &Column) -> Column {
2755    a.clone().array_except(b)
2756}
2757
2758/// Elements in both arrays (PySpark array_intersect).
2759pub fn array_intersect(a: &Column, b: &Column) -> Column {
2760    a.clone().array_intersect(b)
2761}
2762
2763/// Distinct elements from both arrays (PySpark array_union).
2764pub fn array_union(a: &Column, b: &Column) -> Column {
2765    a.clone().array_union(b)
2766}
2767
2768/// Zip two arrays element-wise with merge function (PySpark zip_with).
2769pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2770    left.clone().zip_with(right, merge)
2771}
2772
2773/// Extract JSON path from string column (PySpark get_json_object).
2774pub fn get_json_object(column: &Column, path: &str) -> Column {
2775    column.clone().get_json_object(path)
2776}
2777
2778/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2779pub fn json_object_keys(column: &Column) -> Column {
2780    column.clone().json_object_keys()
2781}
2782
2783/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2784pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2785    column.clone().json_tuple(keys)
2786}
2787
2788/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2789pub fn from_csv(column: &Column) -> Column {
2790    column.clone().from_csv()
2791}
2792
2793/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2794pub fn to_csv(column: &Column) -> Column {
2795    column.clone().to_csv()
2796}
2797
2798/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2799pub fn schema_of_csv(_column: &Column) -> Column {
2800    Column::from_expr(
2801        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2802        Some("schema_of_csv".to_string()),
2803    )
2804}
2805
2806/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2807pub fn schema_of_json(_column: &Column) -> Column {
2808    Column::from_expr(
2809        lit("STRUCT<>".to_string()),
2810        Some("schema_of_json".to_string()),
2811    )
2812}
2813
2814/// Parse string column as JSON into struct (PySpark from_json).
2815pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2816    column.clone().from_json(schema)
2817}
2818
2819/// Serialize struct column to JSON string (PySpark to_json).
2820pub fn to_json(column: &Column) -> Column {
2821    column.clone().to_json()
2822}
2823
2824/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2825pub fn isin(column: &Column, other: &Column) -> Column {
2826    column.clone().isin(other)
2827}
2828
2829/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2830pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2831    let s = Series::from_iter(values.iter().cloned());
2832    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2833}
2834
2835/// Check if column values are in the given string slice (PySpark isin with literal list).
2836pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2837    let s: Series = Series::from_iter(values.iter().copied());
2838    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2839}
2840
2841/// Percent-decode URL-encoded string (PySpark url_decode).
2842pub fn url_decode(column: &Column) -> Column {
2843    column.clone().url_decode()
2844}
2845
2846/// Percent-encode string for URL (PySpark url_encode).
2847pub fn url_encode(column: &Column) -> Column {
2848    column.clone().url_encode()
2849}
2850
2851/// Bitwise left shift (PySpark shiftLeft). col << n.
2852pub fn shift_left(column: &Column, n: i32) -> Column {
2853    column.clone().shift_left(n)
2854}
2855
2856/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2857pub fn shift_right(column: &Column, n: i32) -> Column {
2858    column.clone().shift_right(n)
2859}
2860
2861/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2862pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2863    column.clone().shift_right_unsigned(n)
2864}
2865
2866/// Session/library version string (PySpark version).
2867pub fn version() -> Column {
2868    Column::from_expr(
2869        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2870        None,
2871    )
2872}
2873
2874/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2875pub fn equal_null(left: &Column, right: &Column) -> Column {
2876    left.clone().eq_null_safe(right)
2877}
2878
2879/// Length of JSON array at path (PySpark json_array_length).
2880pub fn json_array_length(column: &Column, path: &str) -> Column {
2881    column.clone().json_array_length(path)
2882}
2883
2884/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2885/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2886pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2887    column.clone().parse_url(part, key)
2888}
2889
2890/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2891pub fn hash(columns: &[&Column]) -> Column {
2892    use polars::prelude::*;
2893    if columns.is_empty() {
2894        return crate::column::Column::from_expr(lit(0i64), None);
2895    }
2896    if columns.len() == 1 {
2897        return columns[0].clone().hash();
2898    }
2899    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2900    let struct_expr = polars::prelude::as_struct(exprs);
2901    let name = columns[0].name().to_string();
2902    let expr = struct_expr.map(
2903        crate::udfs::apply_hash_struct,
2904        GetOutput::from_type(DataType::Int64),
2905    );
2906    crate::column::Column::from_expr(expr, Some(name))
2907}
2908
2909/// Stack columns into struct (PySpark stack). Alias for struct_.
2910pub fn stack(columns: &[&Column]) -> Column {
2911    struct_(columns)
2912}
2913
2914#[cfg(test)]
2915mod tests {
2916    use super::*;
2917    use polars::prelude::{df, IntoLazy};
2918
2919    #[test]
2920    fn test_col_creates_column() {
2921        let column = col("test");
2922        assert_eq!(column.name(), "test");
2923    }
2924
2925    #[test]
2926    fn test_lit_i32() {
2927        let column = lit_i32(42);
2928        // The column should have a default name since it's a literal
2929        assert_eq!(column.name(), "<expr>");
2930    }
2931
2932    #[test]
2933    fn test_lit_i64() {
2934        let column = lit_i64(123456789012345i64);
2935        assert_eq!(column.name(), "<expr>");
2936    }
2937
2938    #[test]
2939    fn test_lit_f64() {
2940        let column = lit_f64(std::f64::consts::PI);
2941        assert_eq!(column.name(), "<expr>");
2942    }
2943
2944    #[test]
2945    fn test_lit_bool() {
2946        let column = lit_bool(true);
2947        assert_eq!(column.name(), "<expr>");
2948    }
2949
2950    #[test]
2951    fn test_lit_str() {
2952        let column = lit_str("hello");
2953        assert_eq!(column.name(), "<expr>");
2954    }
2955
2956    #[test]
2957    fn test_create_map_empty() {
2958        // PySpark F.create_map() with no args: column of empty maps (#275).
2959        let empty_col = create_map(&[]).unwrap();
2960        let df = df!("id" => &[1i64, 2i64]).unwrap();
2961        let out = df
2962            .lazy()
2963            .with_columns([empty_col.into_expr().alias("m")])
2964            .collect()
2965            .unwrap();
2966        assert_eq!(out.height(), 2);
2967        let m = out.column("m").unwrap();
2968        assert_eq!(m.len(), 2);
2969        let list = m.list().unwrap();
2970        for i in 0..2 {
2971            let row = list.get(i).unwrap();
2972            assert_eq!(row.len(), 0);
2973        }
2974    }
2975
2976    #[test]
2977    fn test_count_aggregation() {
2978        let column = col("value");
2979        let result = count(&column);
2980        assert_eq!(result.name(), "count");
2981    }
2982
2983    #[test]
2984    fn test_sum_aggregation() {
2985        let column = col("value");
2986        let result = sum(&column);
2987        assert_eq!(result.name(), "sum");
2988    }
2989
2990    #[test]
2991    fn test_avg_aggregation() {
2992        let column = col("value");
2993        let result = avg(&column);
2994        assert_eq!(result.name(), "avg");
2995    }
2996
2997    #[test]
2998    fn test_max_aggregation() {
2999        let column = col("value");
3000        let result = max(&column);
3001        assert_eq!(result.name(), "max");
3002    }
3003
3004    #[test]
3005    fn test_min_aggregation() {
3006        let column = col("value");
3007        let result = min(&column);
3008        assert_eq!(result.name(), "min");
3009    }
3010
3011    #[test]
3012    fn test_when_then_otherwise() {
3013        // Create a simple DataFrame
3014        let df = df!(
3015            "age" => &[15, 25, 35]
3016        )
3017        .unwrap();
3018
3019        // Build a when-then-otherwise expression
3020        let age_col = col("age");
3021        let condition = age_col.gt(polars::prelude::lit(18));
3022        let result = when(&condition)
3023            .then(&lit_str("adult"))
3024            .otherwise(&lit_str("minor"));
3025
3026        // Apply the expression
3027        let result_df = df
3028            .lazy()
3029            .with_column(result.into_expr().alias("status"))
3030            .collect()
3031            .unwrap();
3032
3033        // Verify the result
3034        let status_col = result_df.column("status").unwrap();
3035        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
3036
3037        assert_eq!(values[0], Some("minor")); // age 15 < 18
3038        assert_eq!(values[1], Some("adult")); // age 25 > 18
3039        assert_eq!(values[2], Some("adult")); // age 35 > 18
3040    }
3041
3042    #[test]
3043    fn test_coalesce_returns_first_non_null() {
3044        // Create a DataFrame with some nulls
3045        let df = df!(
3046            "a" => &[Some(1), None, None],
3047            "b" => &[None, Some(2), None],
3048            "c" => &[None, None, Some(3)]
3049        )
3050        .unwrap();
3051
3052        let col_a = col("a");
3053        let col_b = col("b");
3054        let col_c = col("c");
3055        let result = coalesce(&[&col_a, &col_b, &col_c]);
3056
3057        // Apply the expression
3058        let result_df = df
3059            .lazy()
3060            .with_column(result.into_expr().alias("coalesced"))
3061            .collect()
3062            .unwrap();
3063
3064        // Verify the result
3065        let coalesced_col = result_df.column("coalesced").unwrap();
3066        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3067
3068        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3069        assert_eq!(values[1], Some(2)); // First non-null is 'b'
3070        assert_eq!(values[2], Some(3)); // First non-null is 'c'
3071    }
3072
3073    #[test]
3074    fn test_coalesce_with_literal_fallback() {
3075        // Create a DataFrame with all nulls in one row
3076        let df = df!(
3077            "a" => &[Some(1), None],
3078            "b" => &[None::<i32>, None::<i32>]
3079        )
3080        .unwrap();
3081
3082        let col_a = col("a");
3083        let col_b = col("b");
3084        let fallback = lit_i32(0);
3085        let result = coalesce(&[&col_a, &col_b, &fallback]);
3086
3087        // Apply the expression
3088        let result_df = df
3089            .lazy()
3090            .with_column(result.into_expr().alias("coalesced"))
3091            .collect()
3092            .unwrap();
3093
3094        // Verify the result
3095        let coalesced_col = result_df.column("coalesced").unwrap();
3096        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3097
3098        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3099        assert_eq!(values[1], Some(0)); // All nulls, use fallback
3100    }
3101
3102    #[test]
3103    #[should_panic(expected = "coalesce requires at least one column")]
3104    fn test_coalesce_empty_panics() {
3105        let columns: [&Column; 0] = [];
3106        let _ = coalesce(&columns);
3107    }
3108
3109    #[test]
3110    fn test_cast_double_string_column_strict_ok() {
3111        // All values parse as doubles, so strict cast should succeed.
3112        let df = df!(
3113            "s" => &["123", " 45.5 ", "0"]
3114        )
3115        .unwrap();
3116
3117        let s_col = col("s");
3118        let cast_col = cast(&s_col, "double").unwrap();
3119
3120        let out = df
3121            .lazy()
3122            .with_column(cast_col.into_expr().alias("v"))
3123            .collect()
3124            .unwrap();
3125
3126        let v = out.column("v").unwrap();
3127        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3128        assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3129    }
3130
3131    #[test]
3132    fn test_try_cast_double_string_column_invalid_to_null() {
3133        // Invalid numeric strings should become null under try_cast / try_to_number.
3134        let df = df!(
3135            "s" => &["123", " 45.5 ", "abc", ""]
3136        )
3137        .unwrap();
3138
3139        let s_col = col("s");
3140        let try_cast_col = try_cast(&s_col, "double").unwrap();
3141
3142        let out = df
3143            .lazy()
3144            .with_column(try_cast_col.into_expr().alias("v"))
3145            .collect()
3146            .unwrap();
3147
3148        let v = out.column("v").unwrap();
3149        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3150        assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3151    }
3152
3153    #[test]
3154    fn test_to_number_and_try_to_number_numerics_and_strings() {
3155        // Mixed numeric types should be cast to double; invalid strings become null only for try_to_number.
3156        let df = df!(
3157            "i" => &[1i32, 2, 3],
3158            "f" => &[1.5f64, 2.5, 3.5],
3159            "s" => &["10", "20.5", "xyz"]
3160        )
3161        .unwrap();
3162
3163        let i_col = col("i");
3164        let f_col = col("f");
3165        let s_col = col("s");
3166
3167        let to_number_i = to_number(&i_col, None).unwrap();
3168        let to_number_f = to_number(&f_col, None).unwrap();
3169        let try_to_number_s = try_to_number(&s_col, None).unwrap();
3170
3171        let out = df
3172            .lazy()
3173            .with_columns([
3174                to_number_i.into_expr().alias("i_num"),
3175                to_number_f.into_expr().alias("f_num"),
3176                try_to_number_s.into_expr().alias("s_num"),
3177            ])
3178            .collect()
3179            .unwrap();
3180
3181        let i_num = out.column("i_num").unwrap();
3182        let f_num = out.column("f_num").unwrap();
3183        let s_num = out.column("s_num").unwrap();
3184
3185        let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3186        let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3187        let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3188
3189        assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3190        assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3191        assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3192    }
3193}