Skip to main content

robin_sparkless_polars/functions/
rest.rs

1//! Remaining expression builders: agg, when, string, bit, datetime, struct/map/array, cast, hash, misc.
2//!
3//! Some builders (e.g. `format_string`, `concat`, `coalesce`, `struct_`, `named_struct`) **panic** if given
4//! an empty column list; this is intentional programmer-error handling. Use non-empty slices at call sites.
5use super::types::parse_type_name;
6use crate::column::Column;
7use polars::prelude::*;
8
9/// Count aggregation
10pub fn count(col: &Column) -> Column {
11    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
12}
13
14/// Sum aggregation
15pub fn sum(col: &Column) -> Column {
16    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
17}
18
19/// Average aggregation
20pub fn avg(col: &Column) -> Column {
21    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
22}
23
24/// Alias for avg (PySpark mean).
25pub fn mean(col: &Column) -> Column {
26    avg(col)
27}
28
29/// Maximum aggregation
30pub fn max(col: &Column) -> Column {
31    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
32}
33
34/// Minimum aggregation
35pub fn min(col: &Column) -> Column {
36    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
37}
38
39/// First value in group (PySpark first). Use in groupBy.agg(). ignorenulls: when true, first non-null; Polars 0.45 uses .first() only (ignorenulls reserved for API compatibility).
40pub fn first(col: &Column, ignorenulls: bool) -> Column {
41    let _ = ignorenulls;
42    Column::from_expr(col.expr().clone().first(), None)
43}
44
45/// Any value from the group (PySpark any_value). Use in groupBy.agg(). ignorenulls reserved for API compatibility.
46pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
47    let _ = ignorenulls;
48    Column::from_expr(col.expr().clone().first(), None)
49}
50
51/// Count rows where condition is true (PySpark count_if). Use in groupBy.agg(); column should be boolean (true=1, false=0).
52pub fn count_if(col: &Column) -> Column {
53    use polars::prelude::DataType;
54    Column::from_expr(
55        col.expr().clone().cast(DataType::Int64).sum(),
56        Some("count_if".to_string()),
57    )
58}
59
60/// Sum aggregation; null on overflow (PySpark try_sum). Use in groupBy.agg(). Polars sum does not overflow; reserved for API.
61pub fn try_sum(col: &Column) -> Column {
62    Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
63}
64
65/// Average aggregation; null on invalid (PySpark try_avg). Use in groupBy.agg(). Maps to mean; reserved for API.
66pub fn try_avg(col: &Column) -> Column {
67    Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
68}
69
70/// Value of value_col in the row where ord_col is maximum (PySpark max_by). Use in groupBy.agg().
71pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
72    use polars::prelude::{SortOptions, as_struct};
73    let st = as_struct(vec![
74        ord_col.expr().clone().alias("_ord"),
75        value_col.expr().clone().alias("_val"),
76    ]);
77    let e = st
78        .sort(SortOptions::default().with_order_descending(true))
79        .first()
80        .struct_()
81        .field_by_name("_val");
82    Column::from_expr(e, None)
83}
84
85/// Value of value_col in the row where ord_col is minimum (PySpark min_by). Use in groupBy.agg().
86pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
87    use polars::prelude::{SortOptions, as_struct};
88    let st = as_struct(vec![
89        ord_col.expr().clone().alias("_ord"),
90        value_col.expr().clone().alias("_val"),
91    ]);
92    let e = st
93        .sort(SortOptions::default())
94        .first()
95        .struct_()
96        .field_by_name("_val");
97    Column::from_expr(e, None)
98}
99
100/// Collect column values into list per group (PySpark collect_list). Use in groupBy.agg().
101pub fn collect_list(col: &Column) -> Column {
102    Column::from_expr(
103        col.expr().clone().implode(),
104        Some("collect_list".to_string()),
105    )
106}
107
108/// Collect distinct column values into list per group (PySpark collect_set). Use in groupBy.agg().
109pub fn collect_set(col: &Column) -> Column {
110    Column::from_expr(
111        col.expr().clone().unique().implode(),
112        Some("collect_set".to_string()),
113    )
114}
115
116/// Boolean AND across group (PySpark bool_and). Use in groupBy.agg(); column should be boolean.
117pub fn bool_and(col: &Column) -> Column {
118    Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
119}
120
121/// Alias for bool_and (PySpark every). Use in groupBy.agg().
122pub fn every(col: &Column) -> Column {
123    Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
124}
125
126/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
127pub fn stddev(col: &Column) -> Column {
128    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
129}
130
131/// Variance (sample) aggregation (PySpark variance / var_samp)
132pub fn variance(col: &Column) -> Column {
133    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
134}
135
136/// Population standard deviation (ddof=0). PySpark stddev_pop.
137pub fn stddev_pop(col: &Column) -> Column {
138    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
139}
140
141/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
142pub fn stddev_samp(col: &Column) -> Column {
143    stddev(col)
144}
145
146/// Alias for stddev (PySpark std).
147pub fn std(col: &Column) -> Column {
148    stddev(col)
149}
150
151/// Population variance (ddof=0). PySpark var_pop.
152pub fn var_pop(col: &Column) -> Column {
153    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
154}
155
156/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
157pub fn var_samp(col: &Column) -> Column {
158    variance(col)
159}
160
161/// Median aggregation. PySpark median.
162pub fn median(col: &Column) -> Column {
163    use polars::prelude::QuantileMethod;
164    Column::from_expr(
165        col.expr()
166            .clone()
167            .quantile(lit(0.5), QuantileMethod::Linear),
168        Some("median".to_string()),
169    )
170}
171
172/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0. accuracy reserved for API compatibility.
173pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
174    use polars::prelude::QuantileMethod;
175    Column::from_expr(
176        col.expr()
177            .clone()
178            .quantile(lit(percentage), QuantileMethod::Linear),
179        Some(format!("approx_percentile({percentage})")),
180    )
181}
182
183/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
184pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
185    approx_percentile(col, percentage, accuracy)
186}
187
188/// Mode aggregation - most frequent value. PySpark mode.
189pub fn mode(col: &Column) -> Column {
190    col.clone().mode()
191}
192
193/// Count distinct aggregation (PySpark countDistinct)
194pub fn count_distinct(col: &Column) -> Column {
195    use polars::prelude::DataType;
196    Column::from_expr(
197        col.expr().clone().n_unique().cast(DataType::Int64),
198        Some("count_distinct".to_string()),
199    )
200}
201
202/// Approximate count distinct (PySpark approx_count_distinct). Use in groupBy.agg(). rsd reserved for API compatibility; Polars uses exact n_unique.
203pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
204    use polars::prelude::DataType;
205    Column::from_expr(
206        col.expr().clone().n_unique().cast(DataType::Int64),
207        Some("approx_count_distinct".to_string()),
208    )
209}
210
211/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
212pub fn kurtosis(col: &Column) -> Column {
213    Column::from_expr(
214        col.expr()
215            .clone()
216            .cast(DataType::Float64)
217            .kurtosis(true, true),
218        Some("kurtosis".to_string()),
219    )
220}
221
222/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
223pub fn skewness(col: &Column) -> Column {
224    Column::from_expr(
225        col.expr().clone().cast(DataType::Float64).skew(true),
226        Some("skewness".to_string()),
227    )
228}
229
230fn covar_pop_expr_impl(e1: Expr, e2: Expr) -> Expr {
231    use polars::prelude::len;
232    let c1 = e1.clone().cast(DataType::Float64);
233    let c2 = e2.clone().cast(DataType::Float64);
234    let n = len().cast(DataType::Float64);
235    let sum_ab = (c1.clone() * c2.clone()).sum();
236    let sum_a = e1.sum().cast(DataType::Float64);
237    let sum_b = e2.sum().cast(DataType::Float64);
238    (sum_ab - sum_a * sum_b / n.clone()) / n
239}
240
241fn corr_expr_impl(e1: Expr, e2: Expr) -> Expr {
242    use polars::prelude::{len, lit, when};
243    let c1 = e1.clone().cast(DataType::Float64);
244    let c2 = e2.clone().cast(DataType::Float64);
245    let n = len().cast(DataType::Float64);
246    let n1 = (len() - lit(1)).cast(DataType::Float64);
247    let sum_ab = (c1.clone() * c2.clone()).sum();
248    let sum_a = e1.sum().cast(DataType::Float64);
249    let sum_b = e2.sum().cast(DataType::Float64);
250    let sum_a2 = (c1.clone() * c1).sum();
251    let sum_b2 = (c2.clone() * c2).sum();
252    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
253    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
254    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
255    let std_a = var_a.sqrt();
256    let std_b = var_b.sqrt();
257    when(len().gt(lit(1)))
258        .then(cov_samp / (std_a * std_b))
259        .otherwise(lit(f64::NAN))
260}
261
262fn covar_samp_expr_impl(e1: Expr, e2: Expr) -> Expr {
263    use polars::prelude::{len, lit, when};
264    let c1 = e1.clone().cast(DataType::Float64);
265    let c2 = e2.clone().cast(DataType::Float64);
266    let n = len().cast(DataType::Float64);
267    let sum_ab = (c1.clone() * c2.clone()).sum();
268    let sum_a = e1.sum().cast(DataType::Float64);
269    let sum_b = e2.sum().cast(DataType::Float64);
270    when(len().gt(lit(1)))
271        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
272        .otherwise(lit(f64::NAN))
273}
274
275/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
276pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
277    use polars::prelude::col as pl_col;
278    covar_pop_expr_impl(pl_col(col1), pl_col(col2))
279}
280
281/// Population covariance aggregation (PySpark covar_pop). Module-level; use in groupBy.agg() with two columns.
282pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
283    Column::from_expr(
284        covar_pop_expr_impl(col1.expr().clone(), col2.expr().clone()),
285        Some("covar_pop".to_string()),
286    )
287}
288
289/// Pearson correlation aggregation (PySpark corr). Module-level; use in groupBy.agg() with two columns.
290pub fn corr(col1: &Column, col2: &Column) -> Column {
291    Column::from_expr(
292        corr_expr_impl(col1.expr().clone(), col2.expr().clone()),
293        Some("corr".to_string()),
294    )
295}
296
297/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
298pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
299    use polars::prelude::col as pl_col;
300    covar_samp_expr_impl(pl_col(col1), pl_col(col2))
301}
302
303/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
304pub fn corr_expr(col1: &str, col2: &str) -> Expr {
305    use polars::prelude::col as pl_col;
306    corr_expr_impl(pl_col(col1), pl_col(col2))
307}
308
309// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
310
311fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312    use polars::prelude::col as pl_col;
313    let y = pl_col(y_col).cast(DataType::Float64);
314    let x = pl_col(x_col).cast(DataType::Float64);
315    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316    let n = y
317        .clone()
318        .filter(cond.clone())
319        .count()
320        .cast(DataType::Float64);
321    let sum_x = x.clone().filter(cond.clone()).sum();
322    let sum_y = y.clone().filter(cond.clone()).sum();
323    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325    let sum_xy = (x * y).filter(cond).sum();
326    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
330pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331    let (n, ..) = regr_cond_and_sums(y_col, x_col);
332    n
333}
334
335/// Regression: average of x (PySpark regr_avgx).
336pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337    use polars::prelude::{lit, when};
338    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339    when(n.clone().gt(lit(0.0)))
340        .then(sum_x / n)
341        .otherwise(lit(f64::NAN))
342}
343
344/// Regression: average of y (PySpark regr_avgy).
345pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346    use polars::prelude::{lit, when};
347    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348    when(n.clone().gt(lit(0.0)))
349        .then(sum_y / n)
350        .otherwise(lit(f64::NAN))
351}
352
353/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
354pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355    use polars::prelude::{lit, when};
356    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357    when(n.clone().gt(lit(0.0)))
358        .then(sum_xx - sum_x.clone() * sum_x / n)
359        .otherwise(lit(f64::NAN))
360}
361
362/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
363pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364    use polars::prelude::{lit, when};
365    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366    when(n.clone().gt(lit(0.0)))
367        .then(sum_yy - sum_y.clone() * sum_y / n)
368        .otherwise(lit(f64::NAN))
369}
370
371/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
372pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373    use polars::prelude::{lit, when};
374    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375    when(n.clone().gt(lit(0.0)))
376        .then(sum_xy - sum_x * sum_y / n)
377        .otherwise(lit(f64::NAN))
378}
379
380/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
381pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382    use polars::prelude::{lit, when};
383    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387        .then(regr_sxy / regr_sxx)
388        .otherwise(lit(f64::NAN))
389}
390
391/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
392pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393    use polars::prelude::{lit, when};
394    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397    let slope = regr_sxy.clone() / regr_sxx.clone();
398    let avg_y = sum_y / n.clone();
399    let avg_x = sum_x / n.clone();
400    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401        .then(avg_y - slope * avg_x)
402        .otherwise(lit(f64::NAN))
403}
404
405/// Regression R-squared (PySpark regr_r2).
406pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407    use polars::prelude::{lit, when};
408    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411    let regr_sxy = sum_xy - sum_x * sum_y / n;
412    when(
413        regr_sxx
414            .clone()
415            .gt(lit(0.0))
416            .and(regr_syy.clone().gt(lit(0.0))),
417    )
418    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419    .otherwise(lit(f64::NAN))
420}
421
422/// PySpark-style conditional expression builder.
423///
424/// # Example
425/// ```
426/// use robin_sparkless_polars::functions::{col, lit_i64, lit_str, when};
427///
428/// // when(condition).then(value).otherwise(fallback)
429/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
430///     .then(&lit_str("adult"))
431///     .otherwise(&lit_str("minor"));
432/// ```
433pub fn when(condition: &Column) -> WhenBuilder {
434    WhenBuilder::new(condition.expr().clone())
435}
436
437/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
438pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439    use polars::prelude::*;
440    let null_expr = lit(NULL);
441    let expr = polars::prelude::when(condition.expr().clone())
442        .then(value.expr().clone())
443        .otherwise(null_expr);
444    crate::column::Column::from_expr(expr, None)
445}
446
447/// Builder for when-then-otherwise expressions
448pub struct WhenBuilder {
449    condition: Expr,
450}
451
452impl WhenBuilder {
453    fn new(condition: Expr) -> Self {
454        WhenBuilder { condition }
455    }
456
457    /// Specify the value when condition is true
458    pub fn then(self, value: &Column) -> ThenBuilder {
459        use polars::prelude::*;
460        let when_then = when(self.condition).then(value.expr().clone());
461        ThenBuilder::new(when_then)
462    }
463
464    /// Specify the value when condition is false
465    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
466    /// For this implementation, we require .then() to be called explicitly.
467    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
468    pub fn otherwise(self, _value: &Column) -> Column {
469        // This should not be called directly - when().otherwise() without .then() is not supported
470        // Users should use when(cond).then(val1).otherwise(val2)
471        panic!(
472            "when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)"
473        );
474    }
475}
476
477/// Builder for chaining when-then clauses before finalizing with otherwise
478pub struct ThenBuilder {
479    state: WhenThenState,
480}
481
482enum WhenThenState {
483    Single(Box<polars::prelude::Then>),
484    Chained(Box<polars::prelude::ChainedThen>),
485}
486
487/// Builder for an additional when-then clause (returned by ThenBuilder::when).
488pub struct ChainedWhenBuilder {
489    inner: polars::prelude::ChainedWhen,
490}
491
492impl ThenBuilder {
493    fn new(when_then: polars::prelude::Then) -> Self {
494        ThenBuilder {
495            state: WhenThenState::Single(Box::new(when_then)),
496        }
497    }
498
499    fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
500        ThenBuilder {
501            state: WhenThenState::Chained(Box::new(chained)),
502        }
503    }
504
505    /// Chain an additional when-then clause (PySpark: when(a).then(x).when(b).then(y).otherwise(z)).
506    pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
507        let chained_when = match self.state {
508            WhenThenState::Single(t) => t.when(condition.expr().clone()),
509            WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
510        };
511        ChainedWhenBuilder {
512            inner: chained_when,
513        }
514    }
515
516    /// Finalize the expression with the fallback value
517    pub fn otherwise(self, value: &Column) -> Column {
518        let expr = match self.state {
519            WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
520            WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
521        };
522        crate::column::Column::from_expr(expr, None)
523    }
524}
525
526impl ChainedWhenBuilder {
527    /// Set the value for the current when clause.
528    pub fn then(self, value: &Column) -> ThenBuilder {
529        ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
530    }
531}
532
533/// Convert string column to uppercase (PySpark upper)
534pub fn upper(column: &Column) -> Column {
535    column.clone().upper()
536}
537
538/// Convert string column to lowercase (PySpark lower)
539pub fn lower(column: &Column) -> Column {
540    column.clone().lower()
541}
542
543/// Substring with 1-based start (PySpark substring semantics)
544pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
545    column.clone().substr(start, length)
546}
547
548/// String length in characters (PySpark length)
549pub fn length(column: &Column) -> Column {
550    column.clone().length()
551}
552
553/// Trim leading and trailing whitespace (PySpark trim)
554pub fn trim(column: &Column) -> Column {
555    column.clone().trim()
556}
557
558/// Trim leading whitespace (PySpark ltrim)
559pub fn ltrim(column: &Column) -> Column {
560    column.clone().ltrim()
561}
562
563/// Trim trailing whitespace (PySpark rtrim)
564pub fn rtrim(column: &Column) -> Column {
565    column.clone().rtrim()
566}
567
568/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
569pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
570    column.clone().btrim(trim_str)
571}
572
573/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
574pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
575    column.clone().locate(substr, pos)
576}
577
578/// Base conversion (PySpark conv). num from from_base to to_base.
579pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
580    column.clone().conv(from_base, to_base)
581}
582
583/// Convert to hex string (PySpark hex).
584pub fn hex(column: &Column) -> Column {
585    column.clone().hex()
586}
587
588/// Convert hex string to binary/string (PySpark unhex).
589pub fn unhex(column: &Column) -> Column {
590    column.clone().unhex()
591}
592
593/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
594pub fn encode(column: &Column, charset: &str) -> Column {
595    column.clone().encode(charset)
596}
597
598/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
599pub fn decode(column: &Column, charset: &str) -> Column {
600    column.clone().decode(charset)
601}
602
603/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
604pub fn to_binary(column: &Column, fmt: &str) -> Column {
605    column.clone().to_binary(fmt)
606}
607
608/// Try convert to binary; null on failure (PySpark try_to_binary).
609pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
610    column.clone().try_to_binary(fmt)
611}
612
613/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
614pub fn aes_encrypt(column: &Column, key: &str) -> Column {
615    column.clone().aes_encrypt(key)
616}
617
618/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
619pub fn aes_decrypt(column: &Column, key: &str) -> Column {
620    column.clone().aes_decrypt(key)
621}
622
623/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
624pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
625    column.clone().try_aes_decrypt(key)
626}
627
628/// Convert integer to binary string (PySpark bin).
629pub fn bin(column: &Column) -> Column {
630    column.clone().bin()
631}
632
633/// Get bit at 0-based position (PySpark getbit).
634pub fn getbit(column: &Column, pos: i64) -> Column {
635    column.clone().getbit(pos)
636}
637
638/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
639pub fn bit_and(left: &Column, right: &Column) -> Column {
640    left.clone().bit_and(right)
641}
642
643/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
644pub fn bit_or(left: &Column, right: &Column) -> Column {
645    left.clone().bit_or(right)
646}
647
648/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
649pub fn bit_xor(left: &Column, right: &Column) -> Column {
650    left.clone().bit_xor(right)
651}
652
653/// Count of set bits in the integer representation (PySpark bit_count).
654pub fn bit_count(column: &Column) -> Column {
655    column.clone().bit_count()
656}
657
658/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
659pub fn bitwise_not(column: &Column) -> Column {
660    column.clone().bitwise_not()
661}
662
663// --- Bitmap (PySpark 3.5+) ---
664
665/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
666pub fn bitmap_bit_position(column: &Column) -> Column {
667    use polars::prelude::DataType;
668    let expr = column.expr().clone().cast(DataType::Int32);
669    Column::from_expr(expr, None)
670}
671
672/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
673pub fn bitmap_bucket_number(column: &Column) -> Column {
674    use polars::prelude::DataType;
675    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
676    Column::from_expr(expr, None)
677}
678
679/// Count set bits in a bitmap binary column (PySpark bitmap_count).
680pub fn bitmap_count(column: &Column) -> Column {
681    use polars::prelude::{DataType, Field};
682    let expr = column.expr().clone().map(
683        |s| crate::column::expect_col(crate::udfs::apply_bitmap_count(s)),
684        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
685    );
686    Column::from_expr(expr, None)
687}
688
689/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
690/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
691pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
692    use polars::prelude::{DataType, Field};
693    column.expr().clone().implode().map(
694        |s| crate::column::expect_col(crate::udfs::apply_bitmap_construct_agg(s)),
695        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
696    )
697}
698
699/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
700pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
701    use polars::prelude::{DataType, Field};
702    column.expr().clone().implode().map(
703        |s| crate::column::expect_col(crate::udfs::apply_bitmap_or_agg(s)),
704        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
705    )
706}
707
708/// Alias for getbit (PySpark bit_get).
709pub fn bit_get(column: &Column, pos: i64) -> Column {
710    getbit(column, pos)
711}
712
713/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
714/// When err_msg is Some, it is used in the error message when assertion fails.
715pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
716    column.clone().assert_true(err_msg)
717}
718
719/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
720pub fn raise_error(message: &str) -> Column {
721    let msg = message.to_string();
722    let expr = lit(0i64).map(
723        move |_col| -> PolarsResult<polars::prelude::Column> {
724            Err(PolarsError::ComputeError(msg.clone().into()))
725        },
726        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
727    );
728    Column::from_expr(expr, Some("raise_error".to_string()))
729}
730
731/// Stub partition id - always 0 (PySpark spark_partition_id).
732pub fn spark_partition_id() -> Column {
733    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
734}
735
736/// Stub input file name - empty string (PySpark input_file_name).
737pub fn input_file_name() -> Column {
738    Column::from_expr(lit(""), Some("input_file_name".to_string()))
739}
740
741/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
742/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
743pub fn monotonically_increasing_id() -> Column {
744    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
745}
746
747/// Current catalog name stub (PySpark current_catalog).
748pub fn current_catalog() -> Column {
749    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
750}
751
752/// Current database/schema name stub (PySpark current_database).
753pub fn current_database() -> Column {
754    Column::from_expr(lit("default"), Some("current_database".to_string()))
755}
756
757/// Current schema name stub (PySpark current_schema).
758pub fn current_schema() -> Column {
759    Column::from_expr(lit("default"), Some("current_schema".to_string()))
760}
761
762/// Current user stub (PySpark current_user).
763pub fn current_user() -> Column {
764    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
765}
766
767/// User stub (PySpark user).
768pub fn user() -> Column {
769    Column::from_expr(lit("unknown"), Some("user".to_string()))
770}
771
772/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
773/// When added via with_column, generates one distinct value per row (PySpark-like).
774pub fn rand(seed: Option<u64>) -> Column {
775    Column::from_rand(seed)
776}
777
778/// Random standard normal per row, with optional seed (PySpark randn).
779/// When added via with_column, generates one distinct value per row (PySpark-like).
780pub fn randn(seed: Option<u64>) -> Column {
781    Column::from_randn(seed)
782}
783
784/// Call a registered UDF by name. PySpark: F.call_udf(udfName, *cols).
785/// Requires a session (set by get_or_create). Raises if UDF not found.
786pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
787    use polars::prelude::Column as PlColumn;
788
789    let (registry, case_sensitive) =
790        crate::udf_context::get_thread_udf_context().ok_or_else(|| {
791            PolarsError::InvalidOperation(
792                "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
793            )
794        })?;
795
796    // Rust UDF: build lazy Expr
797    let udf = registry
798        .get_rust_udf(name, case_sensitive)?
799        .ok_or_else(|| {
800            PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
801        })?;
802
803    let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
804    let output_type = DataType::String; // PySpark default
805
806    let expr = if exprs.len() == 1 {
807        let udf = udf.clone();
808        exprs.into_iter().next().unwrap().map(
809            move |c| {
810                let s = c.take_materialized_series();
811                udf.apply(&[s]).map(|out| PlColumn::new("_".into(), out))
812            },
813            move |_schema, field| Ok(Field::new(field.name().clone(), output_type.clone())),
814        )
815    } else {
816        let udf = udf.clone();
817        let first = exprs[0].clone();
818        let rest: Vec<Expr> = exprs[1..].to_vec();
819        first.map_many(
820            move |columns| {
821                let series: Vec<Series> = columns
822                    .iter_mut()
823                    .map(|c| std::mem::take(c).take_materialized_series())
824                    .collect();
825                udf.apply(&series).map(|out| PlColumn::new("_".into(), out))
826            },
827            &rest,
828            move |_schema, fields| Ok(Field::new(fields[0].name().clone(), output_type.clone())),
829        )
830    };
831
832    Ok(Column::from_expr(expr, Some(format!("{name}()"))))
833}
834
835/// True if two arrays have any element in common (PySpark arrays_overlap).
836pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
837    left.clone().arrays_overlap(right)
838}
839
840/// Zip arrays into array of structs (PySpark arrays_zip).
841pub fn arrays_zip(left: &Column, right: &Column) -> Column {
842    left.clone().arrays_zip(right)
843}
844
845/// Explode; null/empty yields one row with null (PySpark explode_outer).
846pub fn explode_outer(column: &Column) -> Column {
847    column.clone().explode_outer()
848}
849
850/// Posexplode with null preservation (PySpark posexplode_outer).
851pub fn posexplode_outer(column: &Column) -> (Column, Column) {
852    column.clone().posexplode_outer()
853}
854
855/// Collect to array (PySpark array_agg).
856pub fn array_agg(column: &Column) -> Column {
857    column.clone().array_agg()
858}
859
860/// Transform map keys by expr (PySpark transform_keys).
861pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
862    column.clone().transform_keys(key_expr)
863}
864
865/// Transform map values by expr (PySpark transform_values).
866pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
867    column.clone().transform_values(value_expr)
868}
869
870/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
871pub fn str_to_map(
872    column: &Column,
873    pair_delim: Option<&str>,
874    key_value_delim: Option<&str>,
875) -> Column {
876    let pd = pair_delim.unwrap_or(",");
877    let kvd = key_value_delim.unwrap_or(":");
878    column.clone().str_to_map(pd, kvd)
879}
880
881/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
882pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
883    column.clone().regexp_extract(pattern, group_index)
884}
885
886/// Replace first match of regex (PySpark regexp_replace)
887pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
888    column.clone().regexp_replace(pattern, replacement)
889}
890
891/// Split string by delimiter (PySpark split). Optional limit: at most that many parts (remainder in last).
892pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
893    column.clone().split(delimiter, limit)
894}
895
896/// Title case (PySpark initcap)
897pub fn initcap(column: &Column) -> Column {
898    column.clone().initcap()
899}
900
901/// Extract all matches of regex (PySpark regexp_extract_all).
902pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
903    column.clone().regexp_extract_all(pattern)
904}
905
906/// Check if string matches regex (PySpark regexp_like / rlike).
907pub fn regexp_like(column: &Column, pattern: &str) -> Column {
908    column.clone().regexp_like(pattern)
909}
910
911/// Count of non-overlapping regex matches (PySpark regexp_count).
912pub fn regexp_count(column: &Column, pattern: &str) -> Column {
913    column.clone().regexp_count(pattern)
914}
915
916/// First substring matching regex (PySpark regexp_substr). Null if no match.
917pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
918    column.clone().regexp_substr(pattern)
919}
920
921/// Split by delimiter and return 1-based part (PySpark split_part).
922pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
923    column.clone().split_part(delimiter, part_num)
924}
925
926/// 1-based position of first regex match (PySpark regexp_instr).
927pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
928    column.clone().regexp_instr(pattern, group_idx)
929}
930
931/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
932pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
933    str_column.clone().find_in_set(set_column)
934}
935
936/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%. **Panics** if `columns` is empty.
937pub fn format_string(format: &str, columns: &[&Column]) -> Column {
938    use polars::prelude::*;
939    if columns.is_empty() {
940        panic!("format_string needs at least one column");
941    }
942    let format_owned = format.to_string();
943    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
944    let expr = columns[0].expr().clone().map_many(
945        move |cols| {
946            crate::column::expect_col(crate::udfs::apply_format_string(cols, &format_owned))
947        },
948        &args,
949        |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::String)),
950    );
951    crate::column::Column::from_expr(expr, None)
952}
953
954/// Alias for format_string (PySpark printf).
955pub fn printf(format: &str, columns: &[&Column]) -> Column {
956    format_string(format, columns)
957}
958
959/// Repeat string n times (PySpark repeat).
960pub fn repeat(column: &Column, n: i32) -> Column {
961    column.clone().repeat(n)
962}
963
964/// Reverse string (PySpark reverse).
965pub fn reverse(column: &Column) -> Column {
966    column.clone().reverse()
967}
968
969/// Find substring position 1-based; 0 if not found (PySpark instr).
970pub fn instr(column: &Column, substr: &str) -> Column {
971    column.clone().instr(substr)
972}
973
974/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
975pub fn position(substr: &str, column: &Column) -> Column {
976    column.clone().instr(substr)
977}
978
979/// ASCII value of first character (PySpark ascii). Returns Int32.
980pub fn ascii(column: &Column) -> Column {
981    column.clone().ascii()
982}
983
984/// Format numeric as string with fixed decimal places (PySpark format_number).
985pub fn format_number(column: &Column, decimals: u32) -> Column {
986    column.clone().format_number(decimals)
987}
988
989/// Replace substring at 1-based position (PySpark overlay). replace is literal.
990pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
991    column.clone().overlay(replace, pos, length)
992}
993
994/// Int to single-character string (PySpark char). Valid codepoint only.
995pub fn char(column: &Column) -> Column {
996    column.clone().char()
997}
998
999/// Alias for char (PySpark chr).
1000pub fn chr(column: &Column) -> Column {
1001    column.clone().chr()
1002}
1003
1004/// Base64 encode string bytes (PySpark base64).
1005pub fn base64(column: &Column) -> Column {
1006    column.clone().base64()
1007}
1008
1009/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1010pub fn unbase64(column: &Column) -> Column {
1011    column.clone().unbase64()
1012}
1013
1014/// SHA1 hash of string bytes, return hex string (PySpark sha1).
1015pub fn sha1(column: &Column) -> Column {
1016    column.clone().sha1()
1017}
1018
1019/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
1020pub fn sha2(column: &Column, bit_length: i32) -> Column {
1021    column.clone().sha2(bit_length)
1022}
1023
1024/// MD5 hash of string bytes, return hex string (PySpark md5).
1025pub fn md5(column: &Column) -> Column {
1026    column.clone().md5()
1027}
1028
1029/// Left-pad string to length with pad char (PySpark lpad).
1030pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1031    column.clone().lpad(length, pad)
1032}
1033
1034/// Right-pad string to length with pad char (PySpark rpad).
1035pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1036    column.clone().rpad(length, pad)
1037}
1038
1039/// Character-by-character translation (PySpark translate).
1040pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1041    column.clone().translate(from_str, to_str)
1042}
1043
1044/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
1045pub fn mask(
1046    column: &Column,
1047    upper_char: Option<char>,
1048    lower_char: Option<char>,
1049    digit_char: Option<char>,
1050    other_char: Option<char>,
1051) -> Column {
1052    column
1053        .clone()
1054        .mask(upper_char, lower_char, digit_char, other_char)
1055}
1056
1057/// Substring before/after nth delimiter (PySpark substring_index).
1058pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1059    column.clone().substring_index(delimiter, count)
1060}
1061
1062/// Leftmost n characters (PySpark left).
1063pub fn left(column: &Column, n: i64) -> Column {
1064    column.clone().left(n)
1065}
1066
1067/// Rightmost n characters (PySpark right).
1068pub fn right(column: &Column, n: i64) -> Column {
1069    column.clone().right(n)
1070}
1071
1072/// Replace all occurrences of search with replacement (literal). PySpark replace.
1073pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1074    column.clone().replace(search, replacement)
1075}
1076
1077/// True if string starts with prefix (PySpark startswith).
1078pub fn startswith(column: &Column, prefix: &str) -> Column {
1079    column.clone().startswith(prefix)
1080}
1081
1082/// True if string ends with suffix (PySpark endswith).
1083pub fn endswith(column: &Column, suffix: &str) -> Column {
1084    column.clone().endswith(suffix)
1085}
1086
1087/// True if string contains substring (literal). PySpark contains.
1088pub fn contains(column: &Column, substring: &str) -> Column {
1089    column.clone().contains(substring)
1090}
1091
1092/// SQL LIKE pattern (% any, _ one char). PySpark like.
1093/// When escape_char is Some(esc), esc + char treats that char as literal.
1094pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1095    column.clone().like(pattern, escape_char)
1096}
1097
1098/// Case-insensitive LIKE. PySpark ilike.
1099/// When escape_char is Some(esc), esc + char treats that char as literal.
1100pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1101    column.clone().ilike(pattern, escape_char)
1102}
1103
1104/// Alias for regexp_like. PySpark rlike / regexp.
1105pub fn rlike(column: &Column, pattern: &str) -> Column {
1106    column.clone().regexp_like(pattern)
1107}
1108
1109/// Alias for rlike (PySpark regexp).
1110pub fn regexp(column: &Column, pattern: &str) -> Column {
1111    rlike(column, pattern)
1112}
1113
1114/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1115pub fn soundex(column: &Column) -> Column {
1116    column.clone().soundex()
1117}
1118
1119/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1120pub fn levenshtein(column: &Column, other: &Column) -> Column {
1121    column.clone().levenshtein(other)
1122}
1123
1124/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1125pub fn crc32(column: &Column) -> Column {
1126    column.clone().crc32()
1127}
1128
1129/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1130pub fn xxhash64(column: &Column) -> Column {
1131    column.clone().xxhash64()
1132}
1133
1134/// Absolute value (PySpark abs)
1135pub fn abs(column: &Column) -> Column {
1136    column.clone().abs()
1137}
1138
1139/// Ceiling (PySpark ceil)
1140pub fn ceil(column: &Column) -> Column {
1141    column.clone().ceil()
1142}
1143
1144/// Floor (PySpark floor)
1145pub fn floor(column: &Column) -> Column {
1146    column.clone().floor()
1147}
1148
1149/// Round (PySpark round)
1150pub fn round(column: &Column, decimals: u32) -> Column {
1151    column.clone().round(decimals)
1152}
1153
1154/// Banker's rounding - round half to even (PySpark bround).
1155pub fn bround(column: &Column, scale: i32) -> Column {
1156    column.clone().bround(scale)
1157}
1158
1159/// Unary minus / negate (PySpark negate, negative).
1160pub fn negate(column: &Column) -> Column {
1161    column.clone().negate()
1162}
1163
1164/// Alias for negate. PySpark negative.
1165pub fn negative(column: &Column) -> Column {
1166    negate(column)
1167}
1168
1169/// Unary plus - no-op, returns column as-is (PySpark positive).
1170pub fn positive(column: &Column) -> Column {
1171    column.clone()
1172}
1173
1174/// Cotangent: 1/tan (PySpark cot).
1175pub fn cot(column: &Column) -> Column {
1176    column.clone().cot()
1177}
1178
1179/// Cosecant: 1/sin (PySpark csc).
1180pub fn csc(column: &Column) -> Column {
1181    column.clone().csc()
1182}
1183
1184/// Secant: 1/cos (PySpark sec).
1185pub fn sec(column: &Column) -> Column {
1186    column.clone().sec()
1187}
1188
1189/// Constant e = 2.718... (PySpark e).
1190pub fn e() -> Column {
1191    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1192}
1193
1194/// Constant pi = 3.14159... (PySpark pi).
1195pub fn pi() -> Column {
1196    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1197}
1198
1199/// Square root (PySpark sqrt)
1200pub fn sqrt(column: &Column) -> Column {
1201    column.clone().sqrt()
1202}
1203
1204/// Power (PySpark pow)
1205pub fn pow(column: &Column, exp: i64) -> Column {
1206    column.clone().pow(exp)
1207}
1208
1209/// Exponential (PySpark exp)
1210pub fn exp(column: &Column) -> Column {
1211    column.clone().exp()
1212}
1213
1214/// Natural logarithm (PySpark log with one arg)
1215pub fn log(column: &Column) -> Column {
1216    column.clone().log()
1217}
1218
1219/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1220pub fn log_with_base(column: &Column, base: f64) -> Column {
1221    crate::column::Column::from_expr(column.expr().clone().log(lit(base)), None)
1222}
1223
1224/// Sine in radians (PySpark sin)
1225pub fn sin(column: &Column) -> Column {
1226    column.clone().sin()
1227}
1228
1229/// Cosine in radians (PySpark cos)
1230pub fn cos(column: &Column) -> Column {
1231    column.clone().cos()
1232}
1233
1234/// Tangent in radians (PySpark tan)
1235pub fn tan(column: &Column) -> Column {
1236    column.clone().tan()
1237}
1238
1239/// Arc sine (PySpark asin)
1240pub fn asin(column: &Column) -> Column {
1241    column.clone().asin()
1242}
1243
1244/// Arc cosine (PySpark acos)
1245pub fn acos(column: &Column) -> Column {
1246    column.clone().acos()
1247}
1248
1249/// Arc tangent (PySpark atan)
1250pub fn atan(column: &Column) -> Column {
1251    column.clone().atan()
1252}
1253
1254/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1255pub fn atan2(y: &Column, x: &Column) -> Column {
1256    y.clone().atan2(x)
1257}
1258
1259/// Convert radians to degrees (PySpark degrees)
1260pub fn degrees(column: &Column) -> Column {
1261    column.clone().degrees()
1262}
1263
1264/// Convert degrees to radians (PySpark radians)
1265pub fn radians(column: &Column) -> Column {
1266    column.clone().radians()
1267}
1268
1269/// Sign of the number: -1, 0, or 1 (PySpark signum)
1270pub fn signum(column: &Column) -> Column {
1271    column.clone().signum()
1272}
1273
1274/// Alias for signum (PySpark sign).
1275pub fn sign(column: &Column) -> Column {
1276    signum(column)
1277}
1278
1279fn cast_impl(column: &Column, type_name: &str, strict: bool) -> Result<Column, String> {
1280    let dtype = parse_type_name(type_name)?;
1281    if dtype == DataType::Boolean {
1282        let expr = column.expr().clone().map(
1283            move |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, strict)),
1284            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1285        );
1286        return Ok(Column::from_expr(expr, None));
1287    }
1288    if dtype == DataType::Date {
1289        let expr = column.expr().clone().map(
1290            move |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, strict)),
1291            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1292        );
1293        return Ok(Column::from_expr(expr, None));
1294    }
1295    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1296        let target = dtype.clone();
1297        let expr = column.expr().clone().map(
1298            move |col| {
1299                crate::column::expect_col(crate::udfs::apply_string_to_int(
1300                    col,
1301                    strict,
1302                    target.clone(),
1303                ))
1304            },
1305            move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1306        );
1307        return Ok(Column::from_expr(expr, None));
1308    }
1309    if dtype == DataType::Float64 {
1310        let expr = column.expr().clone().map(
1311            move |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, strict)),
1312            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1313        );
1314        return Ok(Column::from_expr(expr, None));
1315    }
1316    let expr = if strict {
1317        column.expr().clone().strict_cast(dtype)
1318    } else {
1319        column.expr().clone().cast(dtype)
1320    };
1321    Ok(Column::from_expr(expr, None))
1322}
1323
1324/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1325/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1326/// String-to-date accepts date and datetime strings (e.g. "2025-01-01 10:30:00" truncates to date) for Spark parity.
1327pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1328    cast_impl(column, type_name, true)
1329}
1330
1331/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1332/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1333/// String-to-date accepts date and datetime strings; invalid strings become null.
1334pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1335    cast_impl(column, type_name, false)
1336}
1337
1338/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1339/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1340/// Returns Err if the cast to string fails (invalid type name or unsupported column type).
1341pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1342    match format {
1343        Some(fmt) => Ok(column
1344            .clone()
1345            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1346        None => cast(column, "string"),
1347    }
1348}
1349
1350/// Alias for to_char (PySpark to_varchar).
1351pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1352    to_char(column, format)
1353}
1354
1355/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1356/// Returns Err if the cast to double fails (invalid type name or unsupported column type).
1357pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1358    cast(column, "double")
1359}
1360
1361/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1362/// Returns Err if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1363pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1364    try_cast(column, "double")
1365}
1366
1367/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1368/// When format is None, parses string columns with default format "%Y-%m-%d %H:%M:%S" (PySpark parity #273).
1369pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1370    use polars::prelude::{DataType, Field, TimeUnit};
1371    let fmt_owned = format.map(|s| s.to_string());
1372    let expr = column.expr().clone().map(
1373        move |s| {
1374            crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1375                s,
1376                fmt_owned.as_deref(),
1377                true,
1378            ))
1379        },
1380        |_schema, field| {
1381            Ok(Field::new(
1382                field.name().clone(),
1383                DataType::Datetime(TimeUnit::Microseconds, None),
1384            ))
1385        },
1386    );
1387    Ok(crate::column::Column::from_expr(expr, None))
1388}
1389
1390/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1391/// When format is None, parses string columns with default format (null on invalid). #273
1392pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1393    use polars::prelude::*;
1394    let fmt_owned = format.map(|s| s.to_string());
1395    let expr = column.expr().clone().map(
1396        move |s| {
1397            crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1398                s,
1399                fmt_owned.as_deref(),
1400                false,
1401            ))
1402        },
1403        |_schema, field| {
1404            Ok(Field::new(
1405                field.name().clone(),
1406                DataType::Datetime(TimeUnit::Microseconds, None),
1407            ))
1408        },
1409    );
1410    Ok(crate::column::Column::from_expr(expr, None))
1411}
1412
1413/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1414pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1415    use polars::prelude::{DataType, Field, TimeUnit};
1416    match format {
1417        None => crate::cast(column, "timestamp"),
1418        Some(fmt) => {
1419            let fmt_owned = fmt.to_string();
1420            let expr = column.expr().clone().map(
1421                move |s| {
1422                    crate::column::expect_col(crate::udfs::apply_to_timestamp_ltz_format(
1423                        s,
1424                        Some(&fmt_owned),
1425                        true,
1426                    ))
1427                },
1428                |_schema, field| {
1429                    Ok(Field::new(
1430                        field.name().clone(),
1431                        DataType::Datetime(TimeUnit::Microseconds, None),
1432                    ))
1433                },
1434            );
1435            Ok(crate::column::Column::from_expr(expr, None))
1436        }
1437    }
1438}
1439
1440/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1441pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1442    use polars::prelude::{DataType, Field, TimeUnit};
1443    match format {
1444        None => crate::cast(column, "timestamp"),
1445        Some(fmt) => {
1446            let fmt_owned = fmt.to_string();
1447            let expr = column.expr().clone().map(
1448                move |s| {
1449                    crate::column::expect_col(crate::udfs::apply_to_timestamp_ntz_format(
1450                        s,
1451                        Some(&fmt_owned),
1452                        true,
1453                    ))
1454                },
1455                |_schema, field| {
1456                    Ok(Field::new(
1457                        field.name().clone(),
1458                        DataType::Datetime(TimeUnit::Microseconds, None),
1459                    ))
1460                },
1461            );
1462            Ok(crate::column::Column::from_expr(expr, None))
1463        }
1464    }
1465}
1466
1467/// Division that returns null on divide-by-zero (PySpark try_divide).
1468pub fn try_divide(left: &Column, right: &Column) -> Column {
1469    use polars::prelude::*;
1470    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1471    let null_expr = lit(NULL);
1472    let div_expr =
1473        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1474    let expr = polars::prelude::when(zero_cond)
1475        .then(null_expr)
1476        .otherwise(div_expr);
1477    crate::column::Column::from_expr(expr, None)
1478}
1479
1480/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1481pub fn try_add(left: &Column, right: &Column) -> Column {
1482    let args = [right.expr().clone()];
1483    let expr = left.expr().clone().map_many(
1484        |cols| crate::column::expect_col(crate::udfs::apply_try_add(cols)),
1485        &args,
1486        |_schema, fields| Ok(fields[0].clone()),
1487    );
1488    Column::from_expr(expr, None)
1489}
1490
1491/// Subtract that returns null on overflow (PySpark try_subtract).
1492pub fn try_subtract(left: &Column, right: &Column) -> Column {
1493    let args = [right.expr().clone()];
1494    let expr = left.expr().clone().map_many(
1495        |cols| crate::column::expect_col(crate::udfs::apply_try_subtract(cols)),
1496        &args,
1497        |_schema, fields| Ok(fields[0].clone()),
1498    );
1499    Column::from_expr(expr, None)
1500}
1501
1502/// Multiply that returns null on overflow (PySpark try_multiply).
1503pub fn try_multiply(left: &Column, right: &Column) -> Column {
1504    let args = [right.expr().clone()];
1505    let expr = left.expr().clone().map_many(
1506        |cols| crate::column::expect_col(crate::udfs::apply_try_multiply(cols)),
1507        &args,
1508        |_schema, fields| Ok(fields[0].clone()),
1509    );
1510    Column::from_expr(expr, None)
1511}
1512
1513/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1514pub fn try_element_at(column: &Column, index: i64) -> Column {
1515    column.clone().element_at(index)
1516}
1517
1518/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1519pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1520    if num_bucket <= 0 {
1521        panic!(
1522            "width_bucket: num_bucket must be positive, got {}",
1523            num_bucket
1524        );
1525    }
1526    use polars::prelude::*;
1527    let v = value.expr().clone().cast(DataType::Float64);
1528    let min_expr = lit(min_val);
1529    let max_expr = lit(max_val);
1530    let nb = num_bucket as f64;
1531    let width = (max_val - min_val) / nb;
1532    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1533    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1534    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1535    let expr = polars::prelude::when(v.clone().lt(min_expr))
1536        .then(lit(0i64))
1537        .when(v.gt_eq(max_expr))
1538        .then(lit(num_bucket + 1))
1539        .otherwise(bucket_clamped);
1540    crate::column::Column::from_expr(expr, None)
1541}
1542
1543/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1544/// Element at 1-based index from list of columns (PySpark elt). **Panics** if `columns` is empty.
1545pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1546    use polars::prelude::*;
1547    if columns.is_empty() {
1548        panic!("elt requires at least one column");
1549    }
1550    let idx_expr = index.expr().clone();
1551    let null_expr = lit(NULL);
1552    let mut expr = null_expr;
1553    for (i, c) in columns.iter().enumerate().rev() {
1554        let n = (i + 1) as i64;
1555        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1556            .then(c.expr().clone())
1557            .otherwise(expr);
1558    }
1559    crate::column::Column::from_expr(expr, None)
1560}
1561
1562/// Bit length of string (bytes * 8) (PySpark bit_length).
1563pub fn bit_length(column: &Column) -> Column {
1564    column.clone().bit_length()
1565}
1566
1567/// Length of string in bytes (PySpark octet_length).
1568pub fn octet_length(column: &Column) -> Column {
1569    column.clone().octet_length()
1570}
1571
1572/// Length of string in characters (PySpark char_length). Alias of length().
1573pub fn char_length(column: &Column) -> Column {
1574    column.clone().char_length()
1575}
1576
1577/// Length of string in characters (PySpark character_length). Alias of length().
1578pub fn character_length(column: &Column) -> Column {
1579    column.clone().character_length()
1580}
1581
1582/// Data type of column as string (PySpark typeof). Constant per column from schema.
1583pub fn typeof_(column: &Column) -> Column {
1584    column.clone().typeof_()
1585}
1586
1587/// True where the float value is NaN (PySpark isnan).
1588pub fn isnan(column: &Column) -> Column {
1589    column.clone().is_nan()
1590}
1591
1592/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1593pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1594    if columns.is_empty() {
1595        return Err("greatest requires at least one column".to_string());
1596    }
1597    if columns.len() == 1 {
1598        return Ok((*columns[0]).clone());
1599    }
1600    let mut expr = columns[0].expr().clone();
1601    for c in columns.iter().skip(1) {
1602        let args = [c.expr().clone()];
1603        expr = expr.map_many(
1604            |cols| crate::column::expect_col(crate::udfs::apply_greatest2(cols)),
1605            &args,
1606            |_schema, fields| Ok(fields[0].clone()),
1607        );
1608    }
1609    Ok(Column::from_expr(expr, None))
1610}
1611
1612/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1613pub fn least(columns: &[&Column]) -> Result<Column, String> {
1614    if columns.is_empty() {
1615        return Err("least requires at least one column".to_string());
1616    }
1617    if columns.len() == 1 {
1618        return Ok((*columns[0]).clone());
1619    }
1620    let mut expr = columns[0].expr().clone();
1621    for c in columns.iter().skip(1) {
1622        let args = [c.expr().clone()];
1623        expr = expr.map_many(
1624            |cols| crate::column::expect_col(crate::udfs::apply_least2(cols)),
1625            &args,
1626            |_schema, fields| Ok(fields[0].clone()),
1627        );
1628    }
1629    Ok(Column::from_expr(expr, None))
1630}
1631
1632/// Extract year from datetime column (PySpark year)
1633pub fn year(column: &Column) -> Column {
1634    column.clone().year()
1635}
1636
1637/// Extract month from datetime column (PySpark month)
1638pub fn month(column: &Column) -> Column {
1639    column.clone().month()
1640}
1641
1642/// Extract day of month from datetime column (PySpark day)
1643pub fn day(column: &Column) -> Column {
1644    column.clone().day()
1645}
1646
1647/// Cast or parse to date (PySpark to_date). When format is None: cast date/datetime to date, parse string with default formats. When format is Some: parse string with given format.
1648pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1649    let fmt = format.map(|s| s.to_string());
1650    let expr = column.expr().clone().map(
1651        move |col| {
1652            crate::column::expect_col(crate::udfs::apply_string_to_date_format(
1653                col,
1654                fmt.as_deref(),
1655                false,
1656            ))
1657        },
1658        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1659    );
1660    Ok(Column::from_expr(expr, None))
1661}
1662
1663/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1664pub fn date_format(column: &Column, format: &str) -> Column {
1665    column
1666        .clone()
1667        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1668}
1669
1670/// Current date (evaluation time). PySpark current_date.
1671pub fn current_date() -> Column {
1672    use polars::prelude::*;
1673    let today = chrono::Utc::now().date_naive();
1674    let days = (today - robin_sparkless_core::date_utils::epoch_naive_date()).num_days() as i32;
1675    crate::column::Column::from_expr(
1676        Expr::Literal(LiteralValue::Scalar(Scalar::new_date(days))),
1677        None,
1678    )
1679}
1680
1681/// Current timestamp (evaluation time). PySpark current_timestamp.
1682pub fn current_timestamp() -> Column {
1683    use polars::prelude::*;
1684    let ts = chrono::Utc::now().timestamp_micros();
1685    crate::column::Column::from_expr(
1686        Expr::Literal(LiteralValue::Scalar(Scalar::new_datetime(
1687            ts,
1688            TimeUnit::Microseconds,
1689            None,
1690        ))),
1691        None,
1692    )
1693}
1694
1695/// Alias for current_date (PySpark curdate).
1696pub fn curdate() -> Column {
1697    current_date()
1698}
1699
1700/// Alias for current_timestamp (PySpark now).
1701pub fn now() -> Column {
1702    current_timestamp()
1703}
1704
1705/// Alias for current_timestamp (PySpark localtimestamp).
1706pub fn localtimestamp() -> Column {
1707    current_timestamp()
1708}
1709
1710/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1711pub fn date_diff(end: &Column, start: &Column) -> Column {
1712    datediff(end, start)
1713}
1714
1715/// Alias for date_add (PySpark dateadd).
1716pub fn dateadd(column: &Column, n: i32) -> Column {
1717    date_add(column, n)
1718}
1719
1720/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1721pub fn extract(column: &Column, field: &str) -> Column {
1722    column.clone().extract(field)
1723}
1724
1725/// Alias for extract (PySpark date_part).
1726pub fn date_part(column: &Column, field: &str) -> Column {
1727    extract(column, field)
1728}
1729
1730/// Alias for extract (PySpark datepart).
1731pub fn datepart(column: &Column, field: &str) -> Column {
1732    extract(column, field)
1733}
1734
1735/// Timestamp to microseconds since epoch (PySpark unix_micros).
1736pub fn unix_micros(column: &Column) -> Column {
1737    column.clone().unix_micros()
1738}
1739
1740/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1741pub fn unix_millis(column: &Column) -> Column {
1742    column.clone().unix_millis()
1743}
1744
1745/// Timestamp to seconds since epoch (PySpark unix_seconds).
1746pub fn unix_seconds(column: &Column) -> Column {
1747    column.clone().unix_seconds()
1748}
1749
1750/// Weekday name "Mon","Tue",... (PySpark dayname).
1751pub fn dayname(column: &Column) -> Column {
1752    column.clone().dayname()
1753}
1754
1755/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1756pub fn weekday(column: &Column) -> Column {
1757    column.clone().weekday()
1758}
1759
1760/// Extract hour from datetime column (PySpark hour).
1761pub fn hour(column: &Column) -> Column {
1762    column.clone().hour()
1763}
1764
1765/// Extract minute from datetime column (PySpark minute).
1766pub fn minute(column: &Column) -> Column {
1767    column.clone().minute()
1768}
1769
1770/// Extract second from datetime column (PySpark second).
1771pub fn second(column: &Column) -> Column {
1772    column.clone().second()
1773}
1774
1775/// Add n days to date column (PySpark date_add).
1776pub fn date_add(column: &Column, n: i32) -> Column {
1777    column.clone().date_add(n)
1778}
1779
1780/// Subtract n days from date column (PySpark date_sub).
1781pub fn date_sub(column: &Column, n: i32) -> Column {
1782    column.clone().date_sub(n)
1783}
1784
1785/// Number of days between two date columns (PySpark datediff).
1786pub fn datediff(end: &Column, start: &Column) -> Column {
1787    start.clone().datediff(end)
1788}
1789
1790/// Last day of month for date column (PySpark last_day).
1791pub fn last_day(column: &Column) -> Column {
1792    column.clone().last_day()
1793}
1794
1795/// Truncate date/datetime to unit (PySpark trunc).
1796pub fn trunc(column: &Column, format: &str) -> Column {
1797    column.clone().trunc(format)
1798}
1799
1800/// Alias for trunc (PySpark date_trunc).
1801pub fn date_trunc(format: &str, column: &Column) -> Column {
1802    trunc(column, format)
1803}
1804
1805/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1806pub fn quarter(column: &Column) -> Column {
1807    column.clone().quarter()
1808}
1809
1810/// Extract ISO week of year (1-53) (PySpark weekofyear).
1811pub fn weekofyear(column: &Column) -> Column {
1812    column.clone().weekofyear()
1813}
1814
1815/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1816pub fn dayofweek(column: &Column) -> Column {
1817    column.clone().dayofweek()
1818}
1819
1820/// Extract day of year (1-366) (PySpark dayofyear).
1821pub fn dayofyear(column: &Column) -> Column {
1822    column.clone().dayofyear()
1823}
1824
1825/// Add n months to date column (PySpark add_months).
1826pub fn add_months(column: &Column, n: i32) -> Column {
1827    column.clone().add_months(n)
1828}
1829
1830/// Months between end and start dates as fractional (PySpark months_between).
1831/// When round_off is true, rounds to 8 decimal places (PySpark default).
1832pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1833    end.clone().months_between(start, round_off)
1834}
1835
1836/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
1837pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1838    column.clone().next_day(day_of_week)
1839}
1840
1841/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
1842pub fn unix_timestamp_now() -> Column {
1843    use polars::prelude::*;
1844    let secs = chrono::Utc::now().timestamp();
1845    crate::column::Column::from_expr(lit(secs), None)
1846}
1847
1848/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
1849pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1850    column.clone().unix_timestamp(format)
1851}
1852
1853/// Alias for unix_timestamp.
1854pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1855    unix_timestamp(column, format)
1856}
1857
1858/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1859pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1860    column.clone().from_unixtime(format)
1861}
1862
1863/// Build date from year, month, day columns (PySpark make_date).
1864pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1865    use polars::prelude::*;
1866    let args = [month.expr().clone(), day.expr().clone()];
1867    let expr = year.expr().clone().map_many(
1868        |cols| crate::column::expect_col(crate::udfs::apply_make_date(cols)),
1869        &args,
1870        |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Date)),
1871    );
1872    crate::column::Column::from_expr(expr, None)
1873}
1874
1875/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
1876/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
1877pub fn make_timestamp(
1878    year: &Column,
1879    month: &Column,
1880    day: &Column,
1881    hour: &Column,
1882    minute: &Column,
1883    sec: &Column,
1884    timezone: Option<&str>,
1885) -> Column {
1886    use polars::prelude::*;
1887    let tz_owned = timezone.map(|s| s.to_string());
1888    let args = [
1889        month.expr().clone(),
1890        day.expr().clone(),
1891        hour.expr().clone(),
1892        minute.expr().clone(),
1893        sec.expr().clone(),
1894    ];
1895    let expr = year.expr().clone().map_many(
1896        move |cols| {
1897            crate::column::expect_col(crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()))
1898        },
1899        &args,
1900        |_schema, fields| {
1901            Ok(Field::new(
1902                fields[0].name().clone(),
1903                DataType::Datetime(TimeUnit::Microseconds, None),
1904            ))
1905        },
1906    );
1907    crate::column::Column::from_expr(expr, None)
1908}
1909
1910/// Add amount of unit to timestamp (PySpark timestampadd).
1911pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1912    ts.clone().timestampadd(unit, amount)
1913}
1914
1915/// Difference between timestamps in unit (PySpark timestampdiff).
1916pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1917    start.clone().timestampdiff(unit, end)
1918}
1919
1920/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
1921pub fn days(n: i64) -> Column {
1922    make_interval(0, 0, 0, n, 0, 0, 0)
1923}
1924
1925/// Interval of n hours (PySpark hours).
1926pub fn hours(n: i64) -> Column {
1927    make_interval(0, 0, 0, 0, n, 0, 0)
1928}
1929
1930/// Interval of n minutes (PySpark minutes).
1931pub fn minutes(n: i64) -> Column {
1932    make_interval(0, 0, 0, 0, 0, n, 0)
1933}
1934
1935/// Interval of n months (PySpark months). Approximated as 30*n days.
1936pub fn months(n: i64) -> Column {
1937    make_interval(0, n, 0, 0, 0, 0, 0)
1938}
1939
1940/// Interval of n years (PySpark years). Approximated as 365*n days.
1941pub fn years(n: i64) -> Column {
1942    make_interval(n, 0, 0, 0, 0, 0, 0)
1943}
1944
1945/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
1946pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1947    column.clone().from_utc_timestamp(tz)
1948}
1949
1950/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1951pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1952    column.clone().to_utc_timestamp(tz)
1953}
1954
1955/// Convert timestamp between timezones (PySpark convert_timezone).
1956pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1957    let source_tz = source_tz.to_string();
1958    let target_tz = target_tz.to_string();
1959    let expr = column.expr().clone().map(
1960        move |s| {
1961            crate::column::expect_col(crate::udfs::apply_convert_timezone(
1962                s, &source_tz, &target_tz,
1963            ))
1964        },
1965        |_schema, field| Ok(field.clone()),
1966    );
1967    crate::column::Column::from_expr(expr, None)
1968}
1969
1970/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
1971pub fn current_timezone() -> Column {
1972    use polars::prelude::*;
1973    crate::column::Column::from_expr(lit("UTC"), None)
1974}
1975
1976/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
1977pub fn make_interval(
1978    years: i64,
1979    months: i64,
1980    weeks: i64,
1981    days: i64,
1982    hours: i64,
1983    mins: i64,
1984    secs: i64,
1985) -> Column {
1986    use polars::prelude::*;
1987    // Approximate: 1 year = 365 days, 1 month = 30 days
1988    let total_days = years * 365 + months * 30 + weeks * 7 + days;
1989    let args = DurationArgs::new()
1990        .with_days(lit(total_days))
1991        .with_hours(lit(hours))
1992        .with_minutes(lit(mins))
1993        .with_seconds(lit(secs));
1994    let dur = duration(args);
1995    crate::column::Column::from_expr(dur, None)
1996}
1997
1998/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
1999pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2000    use polars::prelude::*;
2001    let args = DurationArgs::new()
2002        .with_days(lit(days))
2003        .with_hours(lit(hours))
2004        .with_minutes(lit(minutes))
2005        .with_seconds(lit(seconds));
2006    let dur = duration(args);
2007    crate::column::Column::from_expr(dur, None)
2008}
2009
2010/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
2011pub fn make_ym_interval(years: i32, months: i32) -> Column {
2012    use polars::prelude::*;
2013    let total_months = years * 12 + months;
2014    crate::column::Column::from_expr(lit(total_months), None)
2015}
2016
2017/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
2018pub fn make_timestamp_ntz(
2019    year: &Column,
2020    month: &Column,
2021    day: &Column,
2022    hour: &Column,
2023    minute: &Column,
2024    sec: &Column,
2025) -> Column {
2026    make_timestamp(year, month, day, hour, minute, sec, None)
2027}
2028
2029/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
2030pub fn timestamp_seconds(column: &Column) -> Column {
2031    column.clone().timestamp_seconds()
2032}
2033
2034/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
2035pub fn timestamp_millis(column: &Column) -> Column {
2036    column.clone().timestamp_millis()
2037}
2038
2039/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
2040pub fn timestamp_micros(column: &Column) -> Column {
2041    column.clone().timestamp_micros()
2042}
2043
2044/// Date to days since 1970-01-01 (PySpark unix_date).
2045pub fn unix_date(column: &Column) -> Column {
2046    column.clone().unix_date()
2047}
2048
2049/// Days since epoch to date (PySpark date_from_unix_date).
2050pub fn date_from_unix_date(column: &Column) -> Column {
2051    column.clone().date_from_unix_date()
2052}
2053
2054/// Positive modulus (PySpark pmod).
2055pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2056    dividend.clone().pmod(divisor)
2057}
2058
2059/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
2060pub fn factorial(column: &Column) -> Column {
2061    column.clone().factorial()
2062}
2063
2064/// Concatenate string columns without separator (PySpark concat)
2065/// Concatenate string columns (PySpark concat). **Panics** if `columns` is empty.
2066pub fn concat(columns: &[&Column]) -> Column {
2067    use polars::prelude::*;
2068    if columns.is_empty() {
2069        panic!("concat requires at least one column");
2070    }
2071    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2072    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2073}
2074
2075/// Concatenate string columns with separator (PySpark concat_ws)
2076/// Concatenate with separator (PySpark concat_ws). **Panics** if `columns` is empty.
2077pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2078    use polars::prelude::*;
2079    if columns.is_empty() {
2080        panic!("concat_ws requires at least one column");
2081    }
2082    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2083    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2084}
2085
2086/// Row number window function (1, 2, 3 by order within partition).
2087/// Use with `.over(partition_by)` after ranking by an order column.
2088///
2089/// # Example
2090/// ```
2091/// use robin_sparkless_polars::{col, Column};
2092/// let salary_col = col("salary");
2093/// let rn = salary_col.row_number(true).over(&["dept"]);
2094/// ```
2095pub fn row_number(column: &Column) -> Column {
2096    column.clone().row_number(false)
2097}
2098
2099/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
2100pub fn rank(column: &Column, descending: bool) -> Column {
2101    column.clone().rank(descending)
2102}
2103
2104/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
2105pub fn dense_rank(column: &Column, descending: bool) -> Column {
2106    column.clone().dense_rank(descending)
2107}
2108
2109/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
2110pub fn lag(column: &Column, n: i64) -> Column {
2111    column.clone().lag(n)
2112}
2113
2114/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
2115pub fn lead(column: &Column, n: i64) -> Column {
2116    column.clone().lead(n)
2117}
2118
2119/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2120pub fn first_value(column: &Column) -> Column {
2121    column.clone().first_value()
2122}
2123
2124/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2125pub fn last_value(column: &Column) -> Column {
2126    column.clone().last_value()
2127}
2128
2129/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
2130pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2131    column.clone().percent_rank(partition_by, descending)
2132}
2133
2134/// Cumulative distribution in partition: row_number / count. Window is applied.
2135pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2136    column.clone().cume_dist(partition_by, descending)
2137}
2138
2139/// Ntile: bucket 1..n by rank within partition. Window is applied.
2140pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2141    column.clone().ntile(n, partition_by, descending)
2142}
2143
2144/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
2145pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2146    column.clone().nth_value(n, partition_by, descending)
2147}
2148
2149/// Coalesce - returns the first non-null value from multiple columns.
2150///
2151/// # Example
2152/// ```
2153/// use robin_sparkless_polars::functions::{col, lit_i64, coalesce};
2154///
2155/// // coalesce(col("a"), col("b"), lit(0))
2156/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
2157/// ```
2158/// First non-null value across columns (PySpark coalesce). **Panics** if `columns` is empty.
2159pub fn coalesce(columns: &[&Column]) -> Column {
2160    use polars::prelude::*;
2161    if columns.is_empty() {
2162        panic!("coalesce requires at least one column");
2163    }
2164    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2165    let expr = coalesce(&exprs);
2166    crate::column::Column::from_expr(expr, None)
2167}
2168
2169/// Alias for coalesce(col, value). PySpark nvl / ifnull.
2170pub fn nvl(column: &Column, value: &Column) -> Column {
2171    coalesce(&[column, value])
2172}
2173
2174/// Alias for nvl. PySpark ifnull.
2175pub fn ifnull(column: &Column, value: &Column) -> Column {
2176    nvl(column, value)
2177}
2178
2179/// Return null if column equals value, else column. PySpark nullif.
2180pub fn nullif(column: &Column, value: &Column) -> Column {
2181    use polars::prelude::*;
2182    let cond = column.expr().clone().eq(value.expr().clone());
2183    let null_lit = lit(NULL);
2184    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2185    crate::column::Column::from_expr(expr, None)
2186}
2187
2188/// Replace NaN with value. PySpark nanvl.
2189pub fn nanvl(column: &Column, value: &Column) -> Column {
2190    use polars::prelude::*;
2191    let cond = column.expr().clone().is_nan();
2192    let expr = when(cond)
2193        .then(value.expr().clone())
2194        .otherwise(column.expr().clone());
2195    crate::column::Column::from_expr(expr, None)
2196}
2197
2198/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
2199pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2200    use polars::prelude::*;
2201    let cond = col1.expr().clone().is_not_null();
2202    let expr = when(cond)
2203        .then(col2.expr().clone())
2204        .otherwise(col3.expr().clone());
2205    crate::column::Column::from_expr(expr, None)
2206}
2207
2208/// Alias for substring. PySpark substr.
2209pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2210    substring(column, start, length)
2211}
2212
2213/// Alias for pow. PySpark power.
2214pub fn power(column: &Column, exp: i64) -> Column {
2215    pow(column, exp)
2216}
2217
2218/// Alias for log (natural log). PySpark ln.
2219pub fn ln(column: &Column) -> Column {
2220    log(column)
2221}
2222
2223/// Alias for ceil. PySpark ceiling.
2224pub fn ceiling(column: &Column) -> Column {
2225    ceil(column)
2226}
2227
2228/// Alias for lower. PySpark lcase.
2229pub fn lcase(column: &Column) -> Column {
2230    lower(column)
2231}
2232
2233/// Alias for upper. PySpark ucase.
2234pub fn ucase(column: &Column) -> Column {
2235    upper(column)
2236}
2237
2238/// Alias for day. PySpark dayofmonth.
2239pub fn dayofmonth(column: &Column) -> Column {
2240    day(column)
2241}
2242
2243/// Alias for degrees. PySpark toDegrees.
2244pub fn to_degrees(column: &Column) -> Column {
2245    degrees(column)
2246}
2247
2248/// Alias for radians. PySpark toRadians.
2249pub fn to_radians(column: &Column) -> Column {
2250    radians(column)
2251}
2252
2253/// Hyperbolic cosine (PySpark cosh).
2254pub fn cosh(column: &Column) -> Column {
2255    column.clone().cosh()
2256}
2257/// Hyperbolic sine (PySpark sinh).
2258pub fn sinh(column: &Column) -> Column {
2259    column.clone().sinh()
2260}
2261/// Hyperbolic tangent (PySpark tanh).
2262pub fn tanh(column: &Column) -> Column {
2263    column.clone().tanh()
2264}
2265/// Inverse hyperbolic cosine (PySpark acosh).
2266pub fn acosh(column: &Column) -> Column {
2267    column.clone().acosh()
2268}
2269/// Inverse hyperbolic sine (PySpark asinh).
2270pub fn asinh(column: &Column) -> Column {
2271    column.clone().asinh()
2272}
2273/// Inverse hyperbolic tangent (PySpark atanh).
2274pub fn atanh(column: &Column) -> Column {
2275    column.clone().atanh()
2276}
2277/// Cube root (PySpark cbrt).
2278pub fn cbrt(column: &Column) -> Column {
2279    column.clone().cbrt()
2280}
2281/// exp(x) - 1 (PySpark expm1).
2282pub fn expm1(column: &Column) -> Column {
2283    column.clone().expm1()
2284}
2285/// log(1 + x) (PySpark log1p).
2286pub fn log1p(column: &Column) -> Column {
2287    column.clone().log1p()
2288}
2289/// Base-10 log (PySpark log10).
2290pub fn log10(column: &Column) -> Column {
2291    column.clone().log10()
2292}
2293/// Base-2 log (PySpark log2).
2294pub fn log2(column: &Column) -> Column {
2295    column.clone().log2()
2296}
2297/// Round to nearest integer (PySpark rint).
2298pub fn rint(column: &Column) -> Column {
2299    column.clone().rint()
2300}
2301/// sqrt(x*x + y*y) (PySpark hypot).
2302pub fn hypot(x: &Column, y: &Column) -> Column {
2303    let xx = x.expr().clone() * x.expr().clone();
2304    let yy = y.expr().clone() * y.expr().clone();
2305    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2306}
2307
2308/// True if column is null. PySpark isnull.
2309pub fn isnull(column: &Column) -> Column {
2310    column.clone().is_null()
2311}
2312
2313/// True if column is not null. PySpark isnotnull.
2314pub fn isnotnull(column: &Column) -> Column {
2315    column.clone().is_not_null()
2316}
2317
2318/// Create an array column from multiple columns (PySpark array).
2319/// With no arguments, returns a column of empty arrays (one per row); PySpark parity.
2320pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2321    use polars::prelude::*;
2322    if columns.is_empty() {
2323        // PySpark F.array() with no args: one empty list per row (broadcast literal).
2324        // Use .first() so the single-row literal is treated as a scalar and broadcasts to frame height.
2325        let empty_inner = Series::new("".into(), Vec::<i64>::new());
2326        let list_series = ListChunked::from_iter([Some(empty_inner)])
2327            .with_name("array".into())
2328            .into_series();
2329        let expr = lit(list_series).first();
2330        return Ok(crate::column::Column::from_expr(expr, None));
2331    }
2332    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2333    let expr = concat_list(exprs)
2334        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2335    Ok(crate::column::Column::from_expr(expr, None))
2336}
2337
2338/// Number of elements in list (PySpark size / array_size). Returns Int32.
2339pub fn array_size(column: &Column) -> Column {
2340    column.clone().array_size()
2341}
2342
2343/// Alias for array_size (PySpark size).
2344pub fn size(column: &Column) -> Column {
2345    column.clone().array_size()
2346}
2347
2348/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2349pub fn cardinality(column: &Column) -> Column {
2350    column.clone().cardinality()
2351}
2352
2353/// Check if list contains value (PySpark array_contains).
2354pub fn array_contains(column: &Column, value: &Column) -> Column {
2355    column.clone().array_contains(value.expr().clone())
2356}
2357
2358/// Join list of strings with separator (PySpark array_join).
2359pub fn array_join(column: &Column, separator: &str) -> Column {
2360    column.clone().array_join(separator)
2361}
2362
2363/// Maximum element in list (PySpark array_max).
2364pub fn array_max(column: &Column) -> Column {
2365    column.clone().array_max()
2366}
2367
2368/// Minimum element in list (PySpark array_min).
2369pub fn array_min(column: &Column) -> Column {
2370    column.clone().array_min()
2371}
2372
2373/// Get element at 1-based index (PySpark element_at).
2374pub fn element_at(column: &Column, index: i64) -> Column {
2375    column.clone().element_at(index)
2376}
2377
2378/// Sort list elements (PySpark array_sort).
2379pub fn array_sort(column: &Column) -> Column {
2380    column.clone().array_sort()
2381}
2382
2383/// Distinct elements in list (PySpark array_distinct).
2384pub fn array_distinct(column: &Column) -> Column {
2385    column.clone().array_distinct()
2386}
2387
2388/// Slice list from 1-based start with optional length (PySpark slice).
2389pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2390    column.clone().array_slice(start, length)
2391}
2392
2393/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2394/// step defaults to 1.
2395pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2396    use polars::prelude::{DataType, Field, as_struct, lit};
2397    let step_expr = step
2398        .map(|c| c.expr().clone().alias("2"))
2399        .unwrap_or_else(|| lit(1i64).alias("2"));
2400    let struct_expr = as_struct(vec![
2401        start.expr().clone().alias("0"),
2402        stop.expr().clone().alias("1"),
2403        step_expr,
2404    ]);
2405    let out_dtype = DataType::List(Box::new(DataType::Int64));
2406    let expr = struct_expr.map(
2407        |s| crate::column::expect_col(crate::udfs::apply_sequence(s)),
2408        move |_schema, field| Ok(Field::new(field.name().clone(), out_dtype.clone())),
2409    );
2410    crate::column::Column::from_expr(expr, None)
2411}
2412
2413/// Random permutation of list elements (PySpark shuffle).
2414pub fn shuffle(column: &Column) -> Column {
2415    let expr = column.expr().clone().map(
2416        |s| crate::column::expect_col(crate::udfs::apply_shuffle(s)),
2417        |_schema, field| Ok(field.clone()),
2418    );
2419    crate::column::Column::from_expr(expr, None)
2420}
2421
2422/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2423/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2424pub fn inline(column: &Column) -> Column {
2425    column.clone().explode()
2426}
2427
2428/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2429pub fn inline_outer(column: &Column) -> Column {
2430    column.clone().explode_outer()
2431}
2432
2433/// Explode list into one row per element (PySpark explode).
2434pub fn explode(column: &Column) -> Column {
2435    column.clone().explode()
2436}
2437
2438/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2439/// Implemented via Polars list.eval with col("") as element.
2440pub fn array_position(column: &Column, value: &Column) -> Column {
2441    column.clone().array_position(value.expr().clone())
2442}
2443
2444/// Remove null elements from list (PySpark array_compact).
2445pub fn array_compact(column: &Column) -> Column {
2446    column.clone().array_compact()
2447}
2448
2449/// New list with all elements equal to value removed (PySpark array_remove).
2450/// Implemented via Polars list.eval + list.drop_nulls.
2451pub fn array_remove(column: &Column, value: &Column) -> Column {
2452    column.clone().array_remove(value.expr().clone())
2453}
2454
2455/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2456pub fn array_repeat(column: &Column, n: i64) -> Column {
2457    column.clone().array_repeat(n)
2458}
2459
2460/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2461pub fn array_flatten(column: &Column) -> Column {
2462    column.clone().array_flatten()
2463}
2464
2465/// True if any list element satisfies the predicate (PySpark exists).
2466pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2467    column.clone().array_exists(predicate)
2468}
2469
2470/// True if all list elements satisfy the predicate (PySpark forall).
2471pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2472    column.clone().array_forall(predicate)
2473}
2474
2475/// Filter list elements by predicate (PySpark filter).
2476pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2477    column.clone().array_filter(predicate)
2478}
2479
2480/// Transform list elements by expression (PySpark transform).
2481pub fn array_transform(column: &Column, f: Expr) -> Column {
2482    column.clone().array_transform(f)
2483}
2484
2485/// Sum of list elements (PySpark aggregate sum).
2486pub fn array_sum(column: &Column) -> Column {
2487    column.clone().array_sum()
2488}
2489
2490/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2491pub fn aggregate(column: &Column, zero: &Column) -> Column {
2492    column.clone().array_aggregate(zero)
2493}
2494
2495/// Mean of list elements (PySpark aggregate avg).
2496pub fn array_mean(column: &Column) -> Column {
2497    column.clone().array_mean()
2498}
2499
2500/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2501/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2502pub fn posexplode(column: &Column) -> (Column, Column) {
2503    column.clone().posexplode()
2504}
2505
2506/// Build a map column from alternating key/value expressions (PySpark create_map).
2507/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2508/// With no args (or empty slice), returns a column of empty maps per row (PySpark parity #275).
2509pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2510    use polars::chunked_array::StructChunked;
2511    use polars::prelude::{IntoSeries, ListChunked, as_struct, concat_list, lit};
2512    if key_values.is_empty() {
2513        // PySpark F.create_map() with no args: one empty map {} per row (broadcast literal).
2514        let key_s = Series::new("key".into(), Vec::<String>::new());
2515        let value_s = Series::new("value".into(), Vec::<String>::new());
2516        let fields: [&Series; 2] = [&key_s, &value_s];
2517        let empty_struct = StructChunked::from_series(
2518            polars::prelude::PlSmallStr::EMPTY,
2519            0,
2520            fields.iter().copied(),
2521        )
2522        .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2523        .into_series();
2524        let list_series = ListChunked::from_iter([Some(empty_struct)])
2525            .with_name("create_map".into())
2526            .into_series();
2527        let expr = lit(list_series).first();
2528        return Ok(crate::column::Column::from_expr(expr, None));
2529    }
2530    let mut struct_exprs: Vec<Expr> = Vec::new();
2531    for i in (0..key_values.len()).step_by(2) {
2532        if i + 1 < key_values.len() {
2533            let k = key_values[i].expr().clone().alias("key");
2534            let v = key_values[i + 1].expr().clone().alias("value");
2535            struct_exprs.push(as_struct(vec![k, v]));
2536        }
2537    }
2538    let expr = concat_list(struct_exprs)
2539        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2540    Ok(crate::column::Column::from_expr(expr, None))
2541}
2542
2543/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2544pub fn map_keys(column: &Column) -> Column {
2545    column.clone().map_keys()
2546}
2547
2548/// Extract values from a map column (PySpark map_values).
2549pub fn map_values(column: &Column) -> Column {
2550    column.clone().map_values()
2551}
2552
2553/// Return map as list of structs {key, value} (PySpark map_entries).
2554pub fn map_entries(column: &Column) -> Column {
2555    column.clone().map_entries()
2556}
2557
2558/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2559pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2560    keys.clone().map_from_arrays(values)
2561}
2562
2563/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2564pub fn map_concat(a: &Column, b: &Column) -> Column {
2565    a.clone().map_concat(b)
2566}
2567
2568/// Array of structs {key, value} to map (PySpark map_from_entries).
2569pub fn map_from_entries(column: &Column) -> Column {
2570    column.clone().map_from_entries()
2571}
2572
2573/// True if map contains key (PySpark map_contains_key).
2574pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2575    map_col.clone().map_contains_key(key)
2576}
2577
2578/// Get value for key from map, or null (PySpark get).
2579pub fn get(map_col: &Column, key: &Column) -> Column {
2580    map_col.clone().get(key)
2581}
2582
2583/// Filter map entries by predicate (PySpark map_filter).
2584pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2585    map_col.clone().map_filter(predicate)
2586}
2587
2588/// Merge two maps by key with merge function (PySpark map_zip_with).
2589pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2590    map1.clone().map_zip_with(map2, merge)
2591}
2592
2593/// Convenience: zip_with with coalesce(left, right) merge.
2594pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2595    use polars::prelude::col;
2596    let left_field = col("").struct_().field_by_name("left");
2597    let right_field = col("").struct_().field_by_name("right");
2598    let merge = crate::column::Column::from_expr(
2599        coalesce(&[
2600            &crate::column::Column::from_expr(left_field, None),
2601            &crate::column::Column::from_expr(right_field, None),
2602        ])
2603        .into_expr(),
2604        None,
2605    );
2606    left.clone().zip_with(right, merge.into_expr())
2607}
2608
2609/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2610pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2611    use polars::prelude::col;
2612    let v1 = col("").struct_().field_by_name("value1");
2613    let v2 = col("").struct_().field_by_name("value2");
2614    let merge = coalesce(&[
2615        &crate::column::Column::from_expr(v1, None),
2616        &crate::column::Column::from_expr(v2, None),
2617    ])
2618    .into_expr();
2619    map1.clone().map_zip_with(map2, merge)
2620}
2621
2622/// Convenience: map_filter with value > threshold predicate.
2623pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2624    use polars::prelude::{col, lit};
2625    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2626    map_col.clone().map_filter(pred)
2627}
2628
2629/// Create struct from columns using column names as field names (PySpark struct).
2630/// Struct from columns (PySpark struct). **Panics** if `columns` is empty.
2631pub fn struct_(columns: &[&Column]) -> Column {
2632    use polars::prelude::as_struct;
2633    if columns.is_empty() {
2634        panic!("struct requires at least one column");
2635    }
2636    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2637    crate::column::Column::from_expr(as_struct(exprs), None)
2638}
2639
2640/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2641/// Struct from (name, column) pairs (PySpark named_struct). **Panics** if `pairs` is empty.
2642pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2643    use polars::prelude::as_struct;
2644    if pairs.is_empty() {
2645        panic!("named_struct requires at least one (name, column) pair");
2646    }
2647    let exprs: Vec<Expr> = pairs
2648        .iter()
2649        .map(|(name, col)| col.expr().clone().alias(*name))
2650        .collect();
2651    crate::column::Column::from_expr(as_struct(exprs), None)
2652}
2653
2654/// Append element to end of list (PySpark array_append).
2655pub fn array_append(array: &Column, elem: &Column) -> Column {
2656    array.clone().array_append(elem)
2657}
2658
2659/// Prepend element to start of list (PySpark array_prepend).
2660pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2661    array.clone().array_prepend(elem)
2662}
2663
2664/// Insert element at 1-based position (PySpark array_insert).
2665pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2666    array.clone().array_insert(pos, elem)
2667}
2668
2669/// Elements in first array not in second (PySpark array_except).
2670pub fn array_except(a: &Column, b: &Column) -> Column {
2671    a.clone().array_except(b)
2672}
2673
2674/// Elements in both arrays (PySpark array_intersect).
2675pub fn array_intersect(a: &Column, b: &Column) -> Column {
2676    a.clone().array_intersect(b)
2677}
2678
2679/// Distinct elements from both arrays (PySpark array_union).
2680pub fn array_union(a: &Column, b: &Column) -> Column {
2681    a.clone().array_union(b)
2682}
2683
2684/// Zip two arrays element-wise with merge function (PySpark zip_with).
2685pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2686    left.clone().zip_with(right, merge)
2687}
2688
2689/// Extract JSON path from string column (PySpark get_json_object).
2690pub fn get_json_object(column: &Column, path: &str) -> Column {
2691    column.clone().get_json_object(path)
2692}
2693
2694/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2695pub fn json_object_keys(column: &Column) -> Column {
2696    column.clone().json_object_keys()
2697}
2698
2699/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2700pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2701    column.clone().json_tuple(keys)
2702}
2703
2704/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2705pub fn from_csv(column: &Column) -> Column {
2706    column.clone().from_csv()
2707}
2708
2709/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2710pub fn to_csv(column: &Column) -> Column {
2711    column.clone().to_csv()
2712}
2713
2714/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2715pub fn schema_of_csv(_column: &Column) -> Column {
2716    Column::from_expr(
2717        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2718        Some("schema_of_csv".to_string()),
2719    )
2720}
2721
2722/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2723pub fn schema_of_json(_column: &Column) -> Column {
2724    Column::from_expr(
2725        lit("STRUCT<>".to_string()),
2726        Some("schema_of_json".to_string()),
2727    )
2728}
2729
2730/// Parse string column as JSON into struct (PySpark from_json).
2731pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2732    column.clone().from_json(schema)
2733}
2734
2735/// Serialize struct column to JSON string (PySpark to_json).
2736pub fn to_json(column: &Column) -> Column {
2737    column.clone().to_json()
2738}
2739
2740/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2741pub fn isin(column: &Column, other: &Column) -> Column {
2742    column.clone().isin(other)
2743}
2744
2745/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2746pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2747    let s = Series::from_iter(values.iter().cloned());
2748    Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2749}
2750
2751/// Check if column values are in the given string slice (PySpark isin with literal list).
2752pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2753    let s: Series = Series::from_iter(values.iter().copied());
2754    Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2755}
2756
2757/// Percent-decode URL-encoded string (PySpark url_decode).
2758pub fn url_decode(column: &Column) -> Column {
2759    column.clone().url_decode()
2760}
2761
2762/// Percent-encode string for URL (PySpark url_encode).
2763pub fn url_encode(column: &Column) -> Column {
2764    column.clone().url_encode()
2765}
2766
2767/// Bitwise left shift (PySpark shiftLeft). col << n.
2768pub fn shift_left(column: &Column, n: i32) -> Column {
2769    column.clone().shift_left(n)
2770}
2771
2772/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2773pub fn shift_right(column: &Column, n: i32) -> Column {
2774    column.clone().shift_right(n)
2775}
2776
2777/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2778pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2779    column.clone().shift_right_unsigned(n)
2780}
2781
2782/// Session/library version string (PySpark version).
2783pub fn version() -> Column {
2784    Column::from_expr(
2785        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2786        None,
2787    )
2788}
2789
2790/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2791pub fn equal_null(left: &Column, right: &Column) -> Column {
2792    left.clone().eq_null_safe(right)
2793}
2794
2795/// Length of JSON array at path (PySpark json_array_length).
2796pub fn json_array_length(column: &Column, path: &str) -> Column {
2797    column.clone().json_array_length(path)
2798}
2799
2800/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2801/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2802pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2803    column.clone().parse_url(part, key)
2804}
2805
2806/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2807pub fn hash(columns: &[&Column]) -> Column {
2808    use polars::prelude::*;
2809    if columns.is_empty() {
2810        return crate::column::Column::from_expr(lit(0i64), None);
2811    }
2812    if columns.len() == 1 {
2813        return columns[0].clone().hash();
2814    }
2815    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2816    let struct_expr = polars::prelude::as_struct(exprs);
2817    let name = columns[0].name().to_string();
2818    let expr = struct_expr.map(
2819        |s| crate::column::expect_col(crate::udfs::apply_hash_struct(s)),
2820        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2821    );
2822    crate::column::Column::from_expr(expr, Some(name))
2823}
2824
2825/// Stack columns into struct (PySpark stack). Alias for struct_.
2826pub fn stack(columns: &[&Column]) -> Column {
2827    struct_(columns)
2828}
2829
2830#[cfg(test)]
2831mod tests {
2832    use super::*;
2833    use crate::functions::{col, lit_bool, lit_f64, lit_i32, lit_i64, lit_str};
2834    use polars::prelude::{IntoLazy, df};
2835
2836    #[test]
2837    fn test_col_creates_column() {
2838        let column = col("test");
2839        assert_eq!(column.name(), "test");
2840    }
2841
2842    #[test]
2843    fn test_lit_i32() {
2844        let column = lit_i32(42);
2845        // The column should have a default name since it's a literal
2846        assert_eq!(column.name(), "<expr>");
2847    }
2848
2849    #[test]
2850    fn test_lit_i64() {
2851        let column = lit_i64(123456789012345i64);
2852        assert_eq!(column.name(), "<expr>");
2853    }
2854
2855    #[test]
2856    fn test_lit_f64() {
2857        let column = lit_f64(std::f64::consts::PI);
2858        assert_eq!(column.name(), "<expr>");
2859    }
2860
2861    #[test]
2862    fn test_lit_bool() {
2863        let column = lit_bool(true);
2864        assert_eq!(column.name(), "<expr>");
2865    }
2866
2867    #[test]
2868    fn test_lit_str() {
2869        let column = lit_str("hello");
2870        assert_eq!(column.name(), "<expr>");
2871    }
2872
2873    #[test]
2874    fn test_create_map_empty() {
2875        // PySpark F.create_map() with no args: column of empty maps (#275).
2876        let empty_col = create_map(&[]).unwrap();
2877        let df = df!("id" => &[1i64, 2i64]).unwrap();
2878        let out = df
2879            .lazy()
2880            .with_columns([empty_col.into_expr().alias("m")])
2881            .collect()
2882            .unwrap();
2883        assert_eq!(out.height(), 2);
2884        let m = out.column("m").unwrap();
2885        assert_eq!(m.len(), 2);
2886        let list = m.list().unwrap();
2887        for i in 0..2 {
2888            let row = list.get(i).unwrap();
2889            assert_eq!(row.len(), 0);
2890        }
2891    }
2892
2893    #[test]
2894    fn test_count_aggregation() {
2895        let column = col("value");
2896        let result = count(&column);
2897        assert_eq!(result.name(), "count");
2898    }
2899
2900    #[test]
2901    fn test_sum_aggregation() {
2902        let column = col("value");
2903        let result = sum(&column);
2904        assert_eq!(result.name(), "sum");
2905    }
2906
2907    #[test]
2908    fn test_avg_aggregation() {
2909        let column = col("value");
2910        let result = avg(&column);
2911        assert_eq!(result.name(), "avg");
2912    }
2913
2914    #[test]
2915    fn test_max_aggregation() {
2916        let column = col("value");
2917        let result = max(&column);
2918        assert_eq!(result.name(), "max");
2919    }
2920
2921    #[test]
2922    fn test_min_aggregation() {
2923        let column = col("value");
2924        let result = min(&column);
2925        assert_eq!(result.name(), "min");
2926    }
2927
2928    #[test]
2929    fn test_when_then_otherwise() {
2930        // Create a simple DataFrame
2931        let df = df!(
2932            "age" => &[15, 25, 35]
2933        )
2934        .unwrap();
2935
2936        // Build a when-then-otherwise expression
2937        let age_col = col("age");
2938        let condition = age_col.gt(polars::prelude::lit(18));
2939        let result = when(&condition)
2940            .then(&lit_str("adult"))
2941            .otherwise(&lit_str("minor"));
2942
2943        // Apply the expression
2944        let result_df = df
2945            .lazy()
2946            .with_column(result.into_expr().alias("status"))
2947            .collect()
2948            .unwrap();
2949
2950        // Verify the result
2951        let status_col = result_df.column("status").unwrap();
2952        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2953
2954        assert_eq!(values[0], Some("minor")); // age 15 < 18
2955        assert_eq!(values[1], Some("adult")); // age 25 > 18
2956        assert_eq!(values[2], Some("adult")); // age 35 > 18
2957    }
2958
2959    #[test]
2960    fn test_coalesce_returns_first_non_null() {
2961        // Create a DataFrame with some nulls
2962        let df = df!(
2963            "a" => &[Some(1), None, None],
2964            "b" => &[None, Some(2), None],
2965            "c" => &[None, None, Some(3)]
2966        )
2967        .unwrap();
2968
2969        let col_a = col("a");
2970        let col_b = col("b");
2971        let col_c = col("c");
2972        let result = coalesce(&[&col_a, &col_b, &col_c]);
2973
2974        // Apply the expression
2975        let result_df = df
2976            .lazy()
2977            .with_column(result.into_expr().alias("coalesced"))
2978            .collect()
2979            .unwrap();
2980
2981        // Verify the result
2982        let coalesced_col = result_df.column("coalesced").unwrap();
2983        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2984
2985        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2986        assert_eq!(values[1], Some(2)); // First non-null is 'b'
2987        assert_eq!(values[2], Some(3)); // First non-null is 'c'
2988    }
2989
2990    #[test]
2991    fn test_coalesce_with_literal_fallback() {
2992        // Create a DataFrame with all nulls in one row
2993        let df = df!(
2994            "a" => &[Some(1), None],
2995            "b" => &[None::<i32>, None::<i32>]
2996        )
2997        .unwrap();
2998
2999        let col_a = col("a");
3000        let col_b = col("b");
3001        let fallback = lit_i32(0);
3002        let result = coalesce(&[&col_a, &col_b, &fallback]);
3003
3004        // Apply the expression
3005        let result_df = df
3006            .lazy()
3007            .with_column(result.into_expr().alias("coalesced"))
3008            .collect()
3009            .unwrap();
3010
3011        // Verify the result
3012        let coalesced_col = result_df.column("coalesced").unwrap();
3013        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3014
3015        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3016        assert_eq!(values[1], Some(0)); // All nulls, use fallback
3017    }
3018
3019    #[test]
3020    #[should_panic(expected = "coalesce requires at least one column")]
3021    fn test_coalesce_empty_panics() {
3022        let columns: [&Column; 0] = [];
3023        let _ = coalesce(&columns);
3024    }
3025
3026    #[test]
3027    fn test_cast_double_string_column_strict_ok() {
3028        // All values parse as doubles, so strict cast should succeed.
3029        let df = df!(
3030            "s" => &["123", " 45.5 ", "0"]
3031        )
3032        .unwrap();
3033
3034        let s_col = col("s");
3035        let cast_col = cast(&s_col, "double").unwrap();
3036
3037        let out = df
3038            .lazy()
3039            .with_column(cast_col.into_expr().alias("v"))
3040            .collect()
3041            .unwrap();
3042
3043        let v = out.column("v").unwrap();
3044        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3045        assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3046    }
3047
3048    #[test]
3049    fn test_try_cast_double_string_column_invalid_to_null() {
3050        // Invalid numeric strings should become null under try_cast / try_to_number.
3051        let df = df!(
3052            "s" => &["123", " 45.5 ", "abc", ""]
3053        )
3054        .unwrap();
3055
3056        let s_col = col("s");
3057        let try_cast_col = try_cast(&s_col, "double").unwrap();
3058
3059        let out = df
3060            .lazy()
3061            .with_column(try_cast_col.into_expr().alias("v"))
3062            .collect()
3063            .unwrap();
3064
3065        let v = out.column("v").unwrap();
3066        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3067        assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3068    }
3069
3070    #[test]
3071    fn test_to_number_and_try_to_number_numerics_and_strings() {
3072        // Mixed numeric types should be cast to double; invalid strings become null only for try_to_number.
3073        let df = df!(
3074            "i" => &[1i32, 2, 3],
3075            "f" => &[1.5f64, 2.5, 3.5],
3076            "s" => &["10", "20.5", "xyz"]
3077        )
3078        .unwrap();
3079
3080        let i_col = col("i");
3081        let f_col = col("f");
3082        let s_col = col("s");
3083
3084        let to_number_i = to_number(&i_col, None).unwrap();
3085        let to_number_f = to_number(&f_col, None).unwrap();
3086        let try_to_number_s = try_to_number(&s_col, None).unwrap();
3087
3088        let out = df
3089            .lazy()
3090            .with_columns([
3091                to_number_i.into_expr().alias("i_num"),
3092                to_number_f.into_expr().alias("f_num"),
3093                try_to_number_s.into_expr().alias("s_num"),
3094            ])
3095            .collect()
3096            .unwrap();
3097
3098        let i_num = out.column("i_num").unwrap();
3099        let f_num = out.column("f_num").unwrap();
3100        let s_num = out.column("s_num").unwrap();
3101
3102        let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3103        let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3104        let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3105
3106        assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3107        assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3108        assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3109    }
3110}