Skip to main content

robin_sparkless/
functions.rs

1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5// -----------------------------------------------------------------------------
6// -----------------------------------------------------------------------------
7
8/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
9#[derive(Debug, Clone)]
10pub struct SortOrder {
11    pub(crate) expr: Expr,
12    pub(crate) descending: bool,
13    pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17    pub fn expr(&self) -> &Expr {
18        &self.expr
19    }
20}
21
22/// Ascending sort, nulls first (Spark default for ASC).
23pub fn asc(column: &Column) -> SortOrder {
24    SortOrder {
25        expr: column.expr().clone(),
26        descending: false,
27        nulls_last: false,
28    }
29}
30
31/// Ascending sort, nulls first.
32pub fn asc_nulls_first(column: &Column) -> SortOrder {
33    SortOrder {
34        expr: column.expr().clone(),
35        descending: false,
36        nulls_last: false,
37    }
38}
39
40/// Ascending sort, nulls last.
41pub fn asc_nulls_last(column: &Column) -> SortOrder {
42    SortOrder {
43        expr: column.expr().clone(),
44        descending: false,
45        nulls_last: true,
46    }
47}
48
49/// Descending sort, nulls last (Spark default for DESC).
50pub fn desc(column: &Column) -> SortOrder {
51    SortOrder {
52        expr: column.expr().clone(),
53        descending: true,
54        nulls_last: true,
55    }
56}
57
58/// Descending sort, nulls first.
59pub fn desc_nulls_first(column: &Column) -> SortOrder {
60    SortOrder {
61        expr: column.expr().clone(),
62        descending: true,
63        nulls_last: false,
64    }
65}
66
67/// Descending sort, nulls last.
68pub fn desc_nulls_last(column: &Column) -> SortOrder {
69    SortOrder {
70        expr: column.expr().clone(),
71        descending: true,
72        nulls_last: true,
73    }
74}
75
76// -----------------------------------------------------------------------------
77
78/// Parse PySpark-like type name to Polars DataType.
79pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80    let s = name.trim().to_lowercase();
81    Ok(match s.as_str() {
82        "int" | "integer" => DataType::Int32,
83        "long" | "bigint" => DataType::Int64,
84        "float" => DataType::Float32,
85        "double" => DataType::Float64,
86        "string" | "str" => DataType::String,
87        "boolean" | "bool" => DataType::Boolean,
88        "date" => DataType::Date,
89        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
90        _ => return Err(format!("unknown type name: {name}")),
91    })
92}
93
94/// Get a column by name
95pub fn col(name: &str) -> Column {
96    Column::new(name.to_string())
97}
98
99/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
100pub fn grouping(column: &Column) -> Column {
101    let _ = column;
102    Column::from_expr(lit(0i32), Some("grouping".to_string()))
103}
104
105/// Grouping set id (PySpark grouping_id). Stub: returns 0.
106pub fn grouping_id(_columns: &[Column]) -> Column {
107    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
108}
109
110/// Create a literal column from a value
111pub fn lit_i32(value: i32) -> Column {
112    let expr: Expr = lit(value);
113    Column::from_expr(expr, None)
114}
115
116pub fn lit_i64(value: i64) -> Column {
117    let expr: Expr = lit(value);
118    Column::from_expr(expr, None)
119}
120
121pub fn lit_f64(value: f64) -> Column {
122    let expr: Expr = lit(value);
123    Column::from_expr(expr, None)
124}
125
126pub fn lit_bool(value: bool) -> Column {
127    let expr: Expr = lit(value);
128    Column::from_expr(expr, None)
129}
130
131pub fn lit_str(value: &str) -> Column {
132    let expr: Expr = lit(value);
133    Column::from_expr(expr, None)
134}
135
136/// Count aggregation
137pub fn count(col: &Column) -> Column {
138    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
139}
140
141/// Sum aggregation
142pub fn sum(col: &Column) -> Column {
143    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
144}
145
146/// Average aggregation
147pub fn avg(col: &Column) -> Column {
148    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
149}
150
151/// Alias for avg (PySpark mean).
152pub fn mean(col: &Column) -> Column {
153    avg(col)
154}
155
156/// Maximum aggregation
157pub fn max(col: &Column) -> Column {
158    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
159}
160
161/// Minimum aggregation
162pub fn min(col: &Column) -> Column {
163    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
164}
165
166/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
167pub fn stddev(col: &Column) -> Column {
168    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
169}
170
171/// Variance (sample) aggregation (PySpark variance / var_samp)
172pub fn variance(col: &Column) -> Column {
173    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
174}
175
176/// Population standard deviation (ddof=0). PySpark stddev_pop.
177pub fn stddev_pop(col: &Column) -> Column {
178    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
179}
180
181/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
182pub fn stddev_samp(col: &Column) -> Column {
183    stddev(col)
184}
185
186/// Alias for stddev (PySpark std).
187pub fn std(col: &Column) -> Column {
188    stddev(col)
189}
190
191/// Population variance (ddof=0). PySpark var_pop.
192pub fn var_pop(col: &Column) -> Column {
193    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
194}
195
196/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
197pub fn var_samp(col: &Column) -> Column {
198    variance(col)
199}
200
201/// Median aggregation. PySpark median.
202pub fn median(col: &Column) -> Column {
203    use polars::prelude::QuantileMethod;
204    Column::from_expr(
205        col.expr()
206            .clone()
207            .quantile(lit(0.5), QuantileMethod::Linear),
208        Some("median".to_string()),
209    )
210}
211
212/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0.
213pub fn approx_percentile(col: &Column, percentage: f64) -> Column {
214    use polars::prelude::QuantileMethod;
215    Column::from_expr(
216        col.expr()
217            .clone()
218            .quantile(lit(percentage), QuantileMethod::Linear),
219        Some(format!("approx_percentile({percentage})")),
220    )
221}
222
223/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
224pub fn percentile_approx(col: &Column, percentage: f64) -> Column {
225    approx_percentile(col, percentage)
226}
227
228/// Mode aggregation - most frequent value. PySpark mode.
229pub fn mode(col: &Column) -> Column {
230    col.clone().mode()
231}
232
233/// Count distinct aggregation (PySpark countDistinct)
234pub fn count_distinct(col: &Column) -> Column {
235    use polars::prelude::DataType;
236    Column::from_expr(
237        col.expr().clone().n_unique().cast(DataType::Int64),
238        Some("count_distinct".to_string()),
239    )
240}
241
242/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
243pub fn kurtosis(col: &Column) -> Column {
244    Column::from_expr(
245        col.expr()
246            .clone()
247            .cast(DataType::Float64)
248            .kurtosis(true, true),
249        Some("kurtosis".to_string()),
250    )
251}
252
253/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
254pub fn skewness(col: &Column) -> Column {
255    Column::from_expr(
256        col.expr().clone().cast(DataType::Float64).skew(true),
257        Some("skewness".to_string()),
258    )
259}
260
261/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
262pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
263    use polars::prelude::{col as pl_col, len};
264    let c1 = pl_col(col1).cast(DataType::Float64);
265    let c2 = pl_col(col2).cast(DataType::Float64);
266    let n = len().cast(DataType::Float64);
267    let sum_ab = (c1.clone() * c2.clone()).sum();
268    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
269    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
270    (sum_ab - sum_a * sum_b / n.clone()) / n
271}
272
273/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
274pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
275    use polars::prelude::{col as pl_col, len, lit, when};
276    let c1 = pl_col(col1).cast(DataType::Float64);
277    let c2 = pl_col(col2).cast(DataType::Float64);
278    let n = len().cast(DataType::Float64);
279    let sum_ab = (c1.clone() * c2.clone()).sum();
280    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
281    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
282    when(len().gt(lit(1)))
283        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
284        .otherwise(lit(f64::NAN))
285}
286
287/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
288pub fn corr_expr(col1: &str, col2: &str) -> Expr {
289    use polars::prelude::{col as pl_col, len, lit, when};
290    let c1 = pl_col(col1).cast(DataType::Float64);
291    let c2 = pl_col(col2).cast(DataType::Float64);
292    let n = len().cast(DataType::Float64);
293    let n1 = (len() - lit(1)).cast(DataType::Float64);
294    let sum_ab = (c1.clone() * c2.clone()).sum();
295    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
296    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
297    let sum_a2 = (c1.clone() * c1).sum();
298    let sum_b2 = (c2.clone() * c2).sum();
299    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
300    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
301    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
302    let std_a = var_a.sqrt();
303    let std_b = var_b.sqrt();
304    when(len().gt(lit(1)))
305        .then(cov_samp / (std_a * std_b))
306        .otherwise(lit(f64::NAN))
307}
308
309// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
310
311fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312    use polars::prelude::col as pl_col;
313    let y = pl_col(y_col).cast(DataType::Float64);
314    let x = pl_col(x_col).cast(DataType::Float64);
315    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316    let n = y
317        .clone()
318        .filter(cond.clone())
319        .count()
320        .cast(DataType::Float64);
321    let sum_x = x.clone().filter(cond.clone()).sum();
322    let sum_y = y.clone().filter(cond.clone()).sum();
323    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325    let sum_xy = (x * y).filter(cond).sum();
326    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
330pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331    let (n, ..) = regr_cond_and_sums(y_col, x_col);
332    n
333}
334
335/// Regression: average of x (PySpark regr_avgx).
336pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337    use polars::prelude::{lit, when};
338    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339    when(n.clone().gt(lit(0.0)))
340        .then(sum_x / n)
341        .otherwise(lit(f64::NAN))
342}
343
344/// Regression: average of y (PySpark regr_avgy).
345pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346    use polars::prelude::{lit, when};
347    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348    when(n.clone().gt(lit(0.0)))
349        .then(sum_y / n)
350        .otherwise(lit(f64::NAN))
351}
352
353/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
354pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355    use polars::prelude::{lit, when};
356    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357    when(n.clone().gt(lit(0.0)))
358        .then(sum_xx - sum_x.clone() * sum_x / n)
359        .otherwise(lit(f64::NAN))
360}
361
362/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
363pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364    use polars::prelude::{lit, when};
365    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366    when(n.clone().gt(lit(0.0)))
367        .then(sum_yy - sum_y.clone() * sum_y / n)
368        .otherwise(lit(f64::NAN))
369}
370
371/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
372pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373    use polars::prelude::{lit, when};
374    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375    when(n.clone().gt(lit(0.0)))
376        .then(sum_xy - sum_x * sum_y / n)
377        .otherwise(lit(f64::NAN))
378}
379
380/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
381pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382    use polars::prelude::{lit, when};
383    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387        .then(regr_sxy / regr_sxx)
388        .otherwise(lit(f64::NAN))
389}
390
391/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
392pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393    use polars::prelude::{lit, when};
394    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397    let slope = regr_sxy.clone() / regr_sxx.clone();
398    let avg_y = sum_y / n.clone();
399    let avg_x = sum_x / n.clone();
400    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401        .then(avg_y - slope * avg_x)
402        .otherwise(lit(f64::NAN))
403}
404
405/// Regression R-squared (PySpark regr_r2).
406pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407    use polars::prelude::{lit, when};
408    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411    let regr_sxy = sum_xy - sum_x * sum_y / n;
412    when(
413        regr_sxx
414            .clone()
415            .gt(lit(0.0))
416            .and(regr_syy.clone().gt(lit(0.0))),
417    )
418    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419    .otherwise(lit(f64::NAN))
420}
421
422/// PySpark-style conditional expression builder.
423///
424/// # Example
425/// ```
426/// use robin_sparkless::{col, lit_i64, lit_str, when};
427///
428/// // when(condition).then(value).otherwise(fallback)
429/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
430///     .then(&lit_str("adult"))
431///     .otherwise(&lit_str("minor"));
432/// ```
433pub fn when(condition: &Column) -> WhenBuilder {
434    WhenBuilder::new(condition.expr().clone())
435}
436
437/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
438pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439    use polars::prelude::*;
440    let null_expr = Expr::Literal(LiteralValue::Null);
441    let expr = polars::prelude::when(condition.expr().clone())
442        .then(value.expr().clone())
443        .otherwise(null_expr);
444    crate::column::Column::from_expr(expr, None)
445}
446
447/// Builder for when-then-otherwise expressions
448pub struct WhenBuilder {
449    condition: Expr,
450}
451
452impl WhenBuilder {
453    fn new(condition: Expr) -> Self {
454        WhenBuilder { condition }
455    }
456
457    /// Specify the value when condition is true
458    pub fn then(self, value: &Column) -> ThenBuilder {
459        use polars::prelude::*;
460        let when_then = when(self.condition).then(value.expr().clone());
461        ThenBuilder::new(when_then)
462    }
463
464    /// Specify the value when condition is false
465    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
466    /// For this implementation, we require .then() to be called explicitly.
467    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
468    pub fn otherwise(self, _value: &Column) -> Column {
469        // This should not be called directly - when().otherwise() without .then() is not supported
470        // Users should use when(cond).then(val1).otherwise(val2)
471        panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
472    }
473}
474
475/// Builder for chaining when-then clauses before finalizing with otherwise
476pub struct ThenBuilder {
477    when_then: polars::prelude::Then, // The Polars WhenThen state
478}
479
480impl ThenBuilder {
481    fn new(when_then: polars::prelude::Then) -> Self {
482        ThenBuilder { when_then }
483    }
484
485    /// Chain an additional when-then clause
486    /// Note: Chaining multiple when-then clauses is not yet fully supported.
487    /// For now, use a single when().then().otherwise() pattern.
488    pub fn when(self, _condition: &Column) -> ThenBuilder {
489        // TODO: Implement proper chaining support
490        // For now, return self to allow compilation but chaining won't work correctly
491        self
492    }
493
494    /// Finalize the expression with the fallback value
495    pub fn otherwise(self, value: &Column) -> Column {
496        let expr = self.when_then.otherwise(value.expr().clone());
497        crate::column::Column::from_expr(expr, None)
498    }
499}
500
501/// Convert string column to uppercase (PySpark upper)
502pub fn upper(column: &Column) -> Column {
503    column.clone().upper()
504}
505
506/// Convert string column to lowercase (PySpark lower)
507pub fn lower(column: &Column) -> Column {
508    column.clone().lower()
509}
510
511/// Substring with 1-based start (PySpark substring semantics)
512pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
513    column.clone().substr(start, length)
514}
515
516/// String length in characters (PySpark length)
517pub fn length(column: &Column) -> Column {
518    column.clone().length()
519}
520
521/// Trim leading and trailing whitespace (PySpark trim)
522pub fn trim(column: &Column) -> Column {
523    column.clone().trim()
524}
525
526/// Trim leading whitespace (PySpark ltrim)
527pub fn ltrim(column: &Column) -> Column {
528    column.clone().ltrim()
529}
530
531/// Trim trailing whitespace (PySpark rtrim)
532pub fn rtrim(column: &Column) -> Column {
533    column.clone().rtrim()
534}
535
536/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
537pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
538    column.clone().btrim(trim_str)
539}
540
541/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
542pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
543    column.clone().locate(substr, pos)
544}
545
546/// Base conversion (PySpark conv). num from from_base to to_base.
547pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
548    column.clone().conv(from_base, to_base)
549}
550
551/// Convert to hex string (PySpark hex).
552pub fn hex(column: &Column) -> Column {
553    column.clone().hex()
554}
555
556/// Convert hex string to binary/string (PySpark unhex).
557pub fn unhex(column: &Column) -> Column {
558    column.clone().unhex()
559}
560
561/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
562pub fn encode(column: &Column, charset: &str) -> Column {
563    column.clone().encode(charset)
564}
565
566/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
567pub fn decode(column: &Column, charset: &str) -> Column {
568    column.clone().decode(charset)
569}
570
571/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
572pub fn to_binary(column: &Column, fmt: &str) -> Column {
573    column.clone().to_binary(fmt)
574}
575
576/// Try convert to binary; null on failure (PySpark try_to_binary).
577pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
578    column.clone().try_to_binary(fmt)
579}
580
581/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
582pub fn aes_encrypt(column: &Column, key: &str) -> Column {
583    column.clone().aes_encrypt(key)
584}
585
586/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
587pub fn aes_decrypt(column: &Column, key: &str) -> Column {
588    column.clone().aes_decrypt(key)
589}
590
591/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
592pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
593    column.clone().try_aes_decrypt(key)
594}
595
596/// Convert integer to binary string (PySpark bin).
597pub fn bin(column: &Column) -> Column {
598    column.clone().bin()
599}
600
601/// Get bit at 0-based position (PySpark getbit).
602pub fn getbit(column: &Column, pos: i64) -> Column {
603    column.clone().getbit(pos)
604}
605
606/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
607pub fn bit_and(left: &Column, right: &Column) -> Column {
608    left.clone().bit_and(right)
609}
610
611/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
612pub fn bit_or(left: &Column, right: &Column) -> Column {
613    left.clone().bit_or(right)
614}
615
616/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
617pub fn bit_xor(left: &Column, right: &Column) -> Column {
618    left.clone().bit_xor(right)
619}
620
621/// Count of set bits in the integer representation (PySpark bit_count).
622pub fn bit_count(column: &Column) -> Column {
623    column.clone().bit_count()
624}
625
626/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
627pub fn bitwise_not(column: &Column) -> Column {
628    column.clone().bitwise_not()
629}
630
631// --- Bitmap (PySpark 3.5+) ---
632
633/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
634pub fn bitmap_bit_position(column: &Column) -> Column {
635    use polars::prelude::DataType;
636    let expr = column.expr().clone().cast(DataType::Int32);
637    Column::from_expr(expr, None)
638}
639
640/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
641pub fn bitmap_bucket_number(column: &Column) -> Column {
642    use polars::prelude::DataType;
643    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
644    Column::from_expr(expr, None)
645}
646
647/// Count set bits in a bitmap binary column (PySpark bitmap_count).
648pub fn bitmap_count(column: &Column) -> Column {
649    use polars::prelude::{DataType, GetOutput};
650    let expr = column.expr().clone().map(
651        crate::udfs::apply_bitmap_count,
652        GetOutput::from_type(DataType::Int64),
653    );
654    Column::from_expr(expr, None)
655}
656
657/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
658/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
659pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
660    use polars::prelude::{DataType, GetOutput};
661    column.expr().clone().implode().map(
662        crate::udfs::apply_bitmap_construct_agg,
663        GetOutput::from_type(DataType::Binary),
664    )
665}
666
667/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
668pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
669    use polars::prelude::{DataType, GetOutput};
670    column.expr().clone().implode().map(
671        crate::udfs::apply_bitmap_or_agg,
672        GetOutput::from_type(DataType::Binary),
673    )
674}
675
676/// Alias for getbit (PySpark bit_get).
677pub fn bit_get(column: &Column, pos: i64) -> Column {
678    getbit(column, pos)
679}
680
681/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
682/// When err_msg is Some, it is used in the error message when assertion fails.
683pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
684    column.clone().assert_true(err_msg)
685}
686
687/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
688pub fn raise_error(message: &str) -> Column {
689    let msg = message.to_string();
690    let expr = lit(0i64).map(
691        move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
692            Err(PolarsError::ComputeError(msg.clone().into()))
693        },
694        GetOutput::from_type(DataType::Int64),
695    );
696    Column::from_expr(expr, Some("raise_error".to_string()))
697}
698
699/// Broadcast hint - no-op that returns the same DataFrame (PySpark broadcast).
700pub fn broadcast(df: &DataFrame) -> DataFrame {
701    df.clone()
702}
703
704/// Stub partition id - always 0 (PySpark spark_partition_id).
705pub fn spark_partition_id() -> Column {
706    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
707}
708
709/// Stub input file name - empty string (PySpark input_file_name).
710pub fn input_file_name() -> Column {
711    Column::from_expr(lit(""), Some("input_file_name".to_string()))
712}
713
714/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
715/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
716pub fn monotonically_increasing_id() -> Column {
717    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
718}
719
720/// Current catalog name stub (PySpark current_catalog).
721pub fn current_catalog() -> Column {
722    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
723}
724
725/// Current database/schema name stub (PySpark current_database).
726pub fn current_database() -> Column {
727    Column::from_expr(lit("default"), Some("current_database".to_string()))
728}
729
730/// Current schema name stub (PySpark current_schema).
731pub fn current_schema() -> Column {
732    Column::from_expr(lit("default"), Some("current_schema".to_string()))
733}
734
735/// Current user stub (PySpark current_user).
736pub fn current_user() -> Column {
737    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
738}
739
740/// User stub (PySpark user).
741pub fn user() -> Column {
742    Column::from_expr(lit("unknown"), Some("user".to_string()))
743}
744
745/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
746/// When added via with_column, generates one distinct value per row (PySpark-like).
747pub fn rand(seed: Option<u64>) -> Column {
748    Column::from_rand(seed)
749}
750
751/// Random standard normal per row, with optional seed (PySpark randn).
752/// When added via with_column, generates one distinct value per row (PySpark-like).
753pub fn randn(seed: Option<u64>) -> Column {
754    Column::from_randn(seed)
755}
756
757/// Call a registered UDF by name. PySpark: F.call_udf(udfName, *cols).
758/// Requires a session (set by get_or_create). Raises if UDF not found.
759pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
760    use polars::prelude::Column as PlColumn;
761
762    let session = crate::session::get_thread_udf_session().ok_or_else(|| {
763        PolarsError::InvalidOperation(
764            "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
765        )
766    })?;
767    let case_sensitive = session.is_case_sensitive();
768
769    // Python UDF: eager execution via UdfCall
770    #[cfg(feature = "pyo3")]
771    if session
772        .udf_registry
773        .get_python_udf(name, case_sensitive)
774        .is_some()
775    {
776        return Ok(Column::from_udf_call(name.to_string(), cols.to_vec()));
777    }
778
779    // Rust UDF: build lazy Expr
780    let udf = session
781        .udf_registry
782        .get_rust_udf(name, case_sensitive)
783        .ok_or_else(|| {
784            PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
785        })?;
786
787    let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
788    let output_type = DataType::String; // PySpark default
789
790    let expr = if exprs.len() == 1 {
791        let udf = udf.clone();
792        exprs.into_iter().next().unwrap().map(
793            move |c| {
794                let s = c.take_materialized_series();
795                udf.apply(&[s])
796                    .map(|out| Some(PlColumn::new("_".into(), out)))
797            },
798            GetOutput::from_type(output_type),
799        )
800    } else {
801        let udf = udf.clone();
802        let first = exprs[0].clone();
803        let rest: Vec<Expr> = exprs[1..].to_vec();
804        first.map_many(
805            move |columns| {
806                let series: Vec<Series> = columns
807                    .iter_mut()
808                    .map(|c| std::mem::take(c).take_materialized_series())
809                    .collect();
810                udf.apply(&series)
811                    .map(|out| Some(PlColumn::new("_".into(), out)))
812            },
813            &rest,
814            GetOutput::from_type(output_type),
815        )
816    };
817
818    Ok(Column::from_expr(expr, Some(format!("{name}()"))))
819}
820
821/// True if two arrays have any element in common (PySpark arrays_overlap).
822pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
823    left.clone().arrays_overlap(right)
824}
825
826/// Zip arrays into array of structs (PySpark arrays_zip).
827pub fn arrays_zip(left: &Column, right: &Column) -> Column {
828    left.clone().arrays_zip(right)
829}
830
831/// Explode; null/empty yields one row with null (PySpark explode_outer).
832pub fn explode_outer(column: &Column) -> Column {
833    column.clone().explode_outer()
834}
835
836/// Posexplode with null preservation (PySpark posexplode_outer).
837pub fn posexplode_outer(column: &Column) -> (Column, Column) {
838    column.clone().posexplode_outer()
839}
840
841/// Collect to array (PySpark array_agg).
842pub fn array_agg(column: &Column) -> Column {
843    column.clone().array_agg()
844}
845
846/// Transform map keys by expr (PySpark transform_keys).
847pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
848    column.clone().transform_keys(key_expr)
849}
850
851/// Transform map values by expr (PySpark transform_values).
852pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
853    column.clone().transform_values(value_expr)
854}
855
856/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
857pub fn str_to_map(
858    column: &Column,
859    pair_delim: Option<&str>,
860    key_value_delim: Option<&str>,
861) -> Column {
862    let pd = pair_delim.unwrap_or(",");
863    let kvd = key_value_delim.unwrap_or(":");
864    column.clone().str_to_map(pd, kvd)
865}
866
867/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
868pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
869    column.clone().regexp_extract(pattern, group_index)
870}
871
872/// Replace first match of regex (PySpark regexp_replace)
873pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
874    column.clone().regexp_replace(pattern, replacement)
875}
876
877/// Split string by delimiter (PySpark split)
878pub fn split(column: &Column, delimiter: &str) -> Column {
879    column.clone().split(delimiter)
880}
881
882/// Title case (PySpark initcap)
883pub fn initcap(column: &Column) -> Column {
884    column.clone().initcap()
885}
886
887/// Extract all matches of regex (PySpark regexp_extract_all).
888pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
889    column.clone().regexp_extract_all(pattern)
890}
891
892/// Check if string matches regex (PySpark regexp_like / rlike).
893pub fn regexp_like(column: &Column, pattern: &str) -> Column {
894    column.clone().regexp_like(pattern)
895}
896
897/// Count of non-overlapping regex matches (PySpark regexp_count).
898pub fn regexp_count(column: &Column, pattern: &str) -> Column {
899    column.clone().regexp_count(pattern)
900}
901
902/// First substring matching regex (PySpark regexp_substr). Null if no match.
903pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
904    column.clone().regexp_substr(pattern)
905}
906
907/// Split by delimiter and return 1-based part (PySpark split_part).
908pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
909    column.clone().split_part(delimiter, part_num)
910}
911
912/// 1-based position of first regex match (PySpark regexp_instr).
913pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
914    column.clone().regexp_instr(pattern, group_idx)
915}
916
917/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
918pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
919    str_column.clone().find_in_set(set_column)
920}
921
922/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
923pub fn format_string(format: &str, columns: &[&Column]) -> Column {
924    use polars::prelude::*;
925    if columns.is_empty() {
926        panic!("format_string needs at least one column");
927    }
928    let format_owned = format.to_string();
929    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
930    let expr = columns[0].expr().clone().map_many(
931        move |cols| crate::udfs::apply_format_string(cols, &format_owned),
932        &args,
933        GetOutput::from_type(DataType::String),
934    );
935    crate::column::Column::from_expr(expr, None)
936}
937
938/// Alias for format_string (PySpark printf).
939pub fn printf(format: &str, columns: &[&Column]) -> Column {
940    format_string(format, columns)
941}
942
943/// Repeat string n times (PySpark repeat).
944pub fn repeat(column: &Column, n: i32) -> Column {
945    column.clone().repeat(n)
946}
947
948/// Reverse string (PySpark reverse).
949pub fn reverse(column: &Column) -> Column {
950    column.clone().reverse()
951}
952
953/// Find substring position 1-based; 0 if not found (PySpark instr).
954pub fn instr(column: &Column, substr: &str) -> Column {
955    column.clone().instr(substr)
956}
957
958/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
959pub fn position(substr: &str, column: &Column) -> Column {
960    column.clone().instr(substr)
961}
962
963/// ASCII value of first character (PySpark ascii). Returns Int32.
964pub fn ascii(column: &Column) -> Column {
965    column.clone().ascii()
966}
967
968/// Format numeric as string with fixed decimal places (PySpark format_number).
969pub fn format_number(column: &Column, decimals: u32) -> Column {
970    column.clone().format_number(decimals)
971}
972
973/// Replace substring at 1-based position (PySpark overlay). replace is literal.
974pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
975    column.clone().overlay(replace, pos, length)
976}
977
978/// Int to single-character string (PySpark char). Valid codepoint only.
979pub fn char(column: &Column) -> Column {
980    column.clone().char()
981}
982
983/// Alias for char (PySpark chr).
984pub fn chr(column: &Column) -> Column {
985    column.clone().chr()
986}
987
988/// Base64 encode string bytes (PySpark base64).
989pub fn base64(column: &Column) -> Column {
990    column.clone().base64()
991}
992
993/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
994pub fn unbase64(column: &Column) -> Column {
995    column.clone().unbase64()
996}
997
998/// SHA1 hash of string bytes, return hex string (PySpark sha1).
999pub fn sha1(column: &Column) -> Column {
1000    column.clone().sha1()
1001}
1002
1003/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
1004pub fn sha2(column: &Column, bit_length: i32) -> Column {
1005    column.clone().sha2(bit_length)
1006}
1007
1008/// MD5 hash of string bytes, return hex string (PySpark md5).
1009pub fn md5(column: &Column) -> Column {
1010    column.clone().md5()
1011}
1012
1013/// Left-pad string to length with pad char (PySpark lpad).
1014pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1015    column.clone().lpad(length, pad)
1016}
1017
1018/// Right-pad string to length with pad char (PySpark rpad).
1019pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1020    column.clone().rpad(length, pad)
1021}
1022
1023/// Character-by-character translation (PySpark translate).
1024pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1025    column.clone().translate(from_str, to_str)
1026}
1027
1028/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
1029pub fn mask(
1030    column: &Column,
1031    upper_char: Option<char>,
1032    lower_char: Option<char>,
1033    digit_char: Option<char>,
1034    other_char: Option<char>,
1035) -> Column {
1036    column
1037        .clone()
1038        .mask(upper_char, lower_char, digit_char, other_char)
1039}
1040
1041/// Substring before/after nth delimiter (PySpark substring_index).
1042pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1043    column.clone().substring_index(delimiter, count)
1044}
1045
1046/// Leftmost n characters (PySpark left).
1047pub fn left(column: &Column, n: i64) -> Column {
1048    column.clone().left(n)
1049}
1050
1051/// Rightmost n characters (PySpark right).
1052pub fn right(column: &Column, n: i64) -> Column {
1053    column.clone().right(n)
1054}
1055
1056/// Replace all occurrences of search with replacement (literal). PySpark replace.
1057pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1058    column.clone().replace(search, replacement)
1059}
1060
1061/// True if string starts with prefix (PySpark startswith).
1062pub fn startswith(column: &Column, prefix: &str) -> Column {
1063    column.clone().startswith(prefix)
1064}
1065
1066/// True if string ends with suffix (PySpark endswith).
1067pub fn endswith(column: &Column, suffix: &str) -> Column {
1068    column.clone().endswith(suffix)
1069}
1070
1071/// True if string contains substring (literal). PySpark contains.
1072pub fn contains(column: &Column, substring: &str) -> Column {
1073    column.clone().contains(substring)
1074}
1075
1076/// SQL LIKE pattern (% any, _ one char). PySpark like.
1077/// When escape_char is Some(esc), esc + char treats that char as literal.
1078pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1079    column.clone().like(pattern, escape_char)
1080}
1081
1082/// Case-insensitive LIKE. PySpark ilike.
1083/// When escape_char is Some(esc), esc + char treats that char as literal.
1084pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1085    column.clone().ilike(pattern, escape_char)
1086}
1087
1088/// Alias for regexp_like. PySpark rlike / regexp.
1089pub fn rlike(column: &Column, pattern: &str) -> Column {
1090    column.clone().regexp_like(pattern)
1091}
1092
1093/// Alias for rlike (PySpark regexp).
1094pub fn regexp(column: &Column, pattern: &str) -> Column {
1095    rlike(column, pattern)
1096}
1097
1098/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1099pub fn soundex(column: &Column) -> Column {
1100    column.clone().soundex()
1101}
1102
1103/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1104pub fn levenshtein(column: &Column, other: &Column) -> Column {
1105    column.clone().levenshtein(other)
1106}
1107
1108/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1109pub fn crc32(column: &Column) -> Column {
1110    column.clone().crc32()
1111}
1112
1113/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1114pub fn xxhash64(column: &Column) -> Column {
1115    column.clone().xxhash64()
1116}
1117
1118/// Absolute value (PySpark abs)
1119pub fn abs(column: &Column) -> Column {
1120    column.clone().abs()
1121}
1122
1123/// Ceiling (PySpark ceil)
1124pub fn ceil(column: &Column) -> Column {
1125    column.clone().ceil()
1126}
1127
1128/// Floor (PySpark floor)
1129pub fn floor(column: &Column) -> Column {
1130    column.clone().floor()
1131}
1132
1133/// Round (PySpark round)
1134pub fn round(column: &Column, decimals: u32) -> Column {
1135    column.clone().round(decimals)
1136}
1137
1138/// Banker's rounding - round half to even (PySpark bround).
1139pub fn bround(column: &Column, scale: i32) -> Column {
1140    column.clone().bround(scale)
1141}
1142
1143/// Unary minus / negate (PySpark negate, negative).
1144pub fn negate(column: &Column) -> Column {
1145    column.clone().negate()
1146}
1147
1148/// Alias for negate. PySpark negative.
1149pub fn negative(column: &Column) -> Column {
1150    negate(column)
1151}
1152
1153/// Unary plus - no-op, returns column as-is (PySpark positive).
1154pub fn positive(column: &Column) -> Column {
1155    column.clone()
1156}
1157
1158/// Cotangent: 1/tan (PySpark cot).
1159pub fn cot(column: &Column) -> Column {
1160    column.clone().cot()
1161}
1162
1163/// Cosecant: 1/sin (PySpark csc).
1164pub fn csc(column: &Column) -> Column {
1165    column.clone().csc()
1166}
1167
1168/// Secant: 1/cos (PySpark sec).
1169pub fn sec(column: &Column) -> Column {
1170    column.clone().sec()
1171}
1172
1173/// Constant e = 2.718... (PySpark e).
1174pub fn e() -> Column {
1175    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1176}
1177
1178/// Constant pi = 3.14159... (PySpark pi).
1179pub fn pi() -> Column {
1180    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1181}
1182
1183/// Square root (PySpark sqrt)
1184pub fn sqrt(column: &Column) -> Column {
1185    column.clone().sqrt()
1186}
1187
1188/// Power (PySpark pow)
1189pub fn pow(column: &Column, exp: i64) -> Column {
1190    column.clone().pow(exp)
1191}
1192
1193/// Exponential (PySpark exp)
1194pub fn exp(column: &Column) -> Column {
1195    column.clone().exp()
1196}
1197
1198/// Natural logarithm (PySpark log with one arg)
1199pub fn log(column: &Column) -> Column {
1200    column.clone().log()
1201}
1202
1203/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1204pub fn log_with_base(column: &Column, base: f64) -> Column {
1205    crate::column::Column::from_expr(column.expr().clone().log(base), None)
1206}
1207
1208/// Sine in radians (PySpark sin)
1209pub fn sin(column: &Column) -> Column {
1210    column.clone().sin()
1211}
1212
1213/// Cosine in radians (PySpark cos)
1214pub fn cos(column: &Column) -> Column {
1215    column.clone().cos()
1216}
1217
1218/// Tangent in radians (PySpark tan)
1219pub fn tan(column: &Column) -> Column {
1220    column.clone().tan()
1221}
1222
1223/// Arc sine (PySpark asin)
1224pub fn asin(column: &Column) -> Column {
1225    column.clone().asin()
1226}
1227
1228/// Arc cosine (PySpark acos)
1229pub fn acos(column: &Column) -> Column {
1230    column.clone().acos()
1231}
1232
1233/// Arc tangent (PySpark atan)
1234pub fn atan(column: &Column) -> Column {
1235    column.clone().atan()
1236}
1237
1238/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1239pub fn atan2(y: &Column, x: &Column) -> Column {
1240    y.clone().atan2(x)
1241}
1242
1243/// Convert radians to degrees (PySpark degrees)
1244pub fn degrees(column: &Column) -> Column {
1245    column.clone().degrees()
1246}
1247
1248/// Convert degrees to radians (PySpark radians)
1249pub fn radians(column: &Column) -> Column {
1250    column.clone().radians()
1251}
1252
1253/// Sign of the number: -1, 0, or 1 (PySpark signum)
1254pub fn signum(column: &Column) -> Column {
1255    column.clone().signum()
1256}
1257
1258/// Alias for signum (PySpark sign).
1259pub fn sign(column: &Column) -> Column {
1260    signum(column)
1261}
1262
1263/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1264/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1265/// String-to-date accepts date and datetime strings (e.g. "2025-01-01 10:30:00" truncates to date) for Spark parity.
1266pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1267    let dtype = parse_type_name(type_name)?;
1268    if dtype == DataType::Boolean {
1269        use polars::prelude::GetOutput;
1270        let expr = column.expr().clone().map(
1271            |col| crate::udfs::apply_string_to_boolean(col, true),
1272            GetOutput::from_type(DataType::Boolean),
1273        );
1274        return Ok(Column::from_expr(expr, None));
1275    }
1276    if dtype == DataType::Date {
1277        use polars::prelude::GetOutput;
1278        let expr = column.expr().clone().map(
1279            |col| crate::udfs::apply_string_to_date(col, true),
1280            GetOutput::from_type(DataType::Date),
1281        );
1282        return Ok(Column::from_expr(expr, None));
1283    }
1284    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1285        use polars::prelude::GetOutput;
1286        let target = dtype.clone();
1287        let expr = column.expr().clone().map(
1288            move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
1289            GetOutput::from_type(dtype),
1290        );
1291        return Ok(Column::from_expr(expr, None));
1292    }
1293    if dtype == DataType::Float64 {
1294        use polars::prelude::GetOutput;
1295        // String-to-double uses custom parsing for Spark-style to_number semantics.
1296        let expr = column.expr().clone().map(
1297            |col| crate::udfs::apply_string_to_double(col, true),
1298            GetOutput::from_type(DataType::Float64),
1299        );
1300        return Ok(Column::from_expr(expr, None));
1301    }
1302    Ok(Column::from_expr(
1303        column.expr().clone().strict_cast(dtype),
1304        None,
1305    ))
1306}
1307
1308/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1309/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1310/// String-to-date accepts date and datetime strings; invalid strings become null.
1311pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1312    let dtype = parse_type_name(type_name)?;
1313    if dtype == DataType::Boolean {
1314        use polars::prelude::GetOutput;
1315        let expr = column.expr().clone().map(
1316            |col| crate::udfs::apply_string_to_boolean(col, false),
1317            GetOutput::from_type(DataType::Boolean),
1318        );
1319        return Ok(Column::from_expr(expr, None));
1320    }
1321    if dtype == DataType::Date {
1322        use polars::prelude::GetOutput;
1323        let expr = column.expr().clone().map(
1324            |col| crate::udfs::apply_string_to_date(col, false),
1325            GetOutput::from_type(DataType::Date),
1326        );
1327        return Ok(Column::from_expr(expr, None));
1328    }
1329    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1330        use polars::prelude::GetOutput;
1331        let target = dtype.clone();
1332        let expr = column.expr().clone().map(
1333            move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
1334            GetOutput::from_type(dtype),
1335        );
1336        return Ok(Column::from_expr(expr, None));
1337    }
1338    if dtype == DataType::Float64 {
1339        use polars::prelude::GetOutput;
1340        let expr = column.expr().clone().map(
1341            |col| crate::udfs::apply_string_to_double(col, false),
1342            GetOutput::from_type(DataType::Float64),
1343        );
1344        return Ok(Column::from_expr(expr, None));
1345    }
1346    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1347}
1348
1349/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1350/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1351/// Returns Err if the cast to string fails (invalid type name or unsupported column type).
1352pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1353    match format {
1354        Some(fmt) => Ok(column
1355            .clone()
1356            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1357        None => cast(column, "string"),
1358    }
1359}
1360
1361/// Alias for to_char (PySpark to_varchar).
1362pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1363    to_char(column, format)
1364}
1365
1366/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1367/// Returns Err if the cast to double fails (invalid type name or unsupported column type).
1368pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1369    cast(column, "double")
1370}
1371
1372/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1373/// Returns Err if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1374pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1375    try_cast(column, "double")
1376}
1377
1378/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1379pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1380    use polars::prelude::{DataType, GetOutput, TimeUnit};
1381    match format {
1382        None => crate::cast(column, "timestamp"),
1383        Some(fmt) => {
1384            let fmt_owned = fmt.to_string();
1385            let expr = column.expr().clone().map(
1386                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), true),
1387                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1388            );
1389            Ok(crate::column::Column::from_expr(expr, None))
1390        }
1391    }
1392}
1393
1394/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1395/// Returns Err if the try_cast setup fails (invalid type name) when format is None.
1396pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1397    use polars::prelude::*;
1398    match format {
1399        None => try_cast(column, "timestamp"),
1400        Some(fmt) => {
1401            let fmt_owned = fmt.to_string();
1402            let expr = column.expr().clone().map(
1403                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), false),
1404                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1405            );
1406            Ok(crate::column::Column::from_expr(expr, None))
1407        }
1408    }
1409}
1410
1411/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1412pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1413    use polars::prelude::{DataType, GetOutput, TimeUnit};
1414    match format {
1415        None => crate::cast(column, "timestamp"),
1416        Some(fmt) => {
1417            let fmt_owned = fmt.to_string();
1418            let expr = column.expr().clone().map(
1419                move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1420                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1421            );
1422            Ok(crate::column::Column::from_expr(expr, None))
1423        }
1424    }
1425}
1426
1427/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1428pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1429    use polars::prelude::{DataType, GetOutput, TimeUnit};
1430    match format {
1431        None => crate::cast(column, "timestamp"),
1432        Some(fmt) => {
1433            let fmt_owned = fmt.to_string();
1434            let expr = column.expr().clone().map(
1435                move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1436                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1437            );
1438            Ok(crate::column::Column::from_expr(expr, None))
1439        }
1440    }
1441}
1442
1443/// Division that returns null on divide-by-zero (PySpark try_divide).
1444pub fn try_divide(left: &Column, right: &Column) -> Column {
1445    use polars::prelude::*;
1446    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1447    let null_expr = Expr::Literal(LiteralValue::Null);
1448    let div_expr =
1449        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1450    let expr = polars::prelude::when(zero_cond)
1451        .then(null_expr)
1452        .otherwise(div_expr);
1453    crate::column::Column::from_expr(expr, None)
1454}
1455
1456/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1457pub fn try_add(left: &Column, right: &Column) -> Column {
1458    let args = [right.expr().clone()];
1459    let expr =
1460        left.expr()
1461            .clone()
1462            .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1463    Column::from_expr(expr, None)
1464}
1465
1466/// Subtract that returns null on overflow (PySpark try_subtract).
1467pub fn try_subtract(left: &Column, right: &Column) -> Column {
1468    let args = [right.expr().clone()];
1469    let expr = left.expr().clone().map_many(
1470        crate::udfs::apply_try_subtract,
1471        &args,
1472        GetOutput::same_type(),
1473    );
1474    Column::from_expr(expr, None)
1475}
1476
1477/// Multiply that returns null on overflow (PySpark try_multiply).
1478pub fn try_multiply(left: &Column, right: &Column) -> Column {
1479    let args = [right.expr().clone()];
1480    let expr = left.expr().clone().map_many(
1481        crate::udfs::apply_try_multiply,
1482        &args,
1483        GetOutput::same_type(),
1484    );
1485    Column::from_expr(expr, None)
1486}
1487
1488/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1489pub fn try_element_at(column: &Column, index: i64) -> Column {
1490    column.clone().element_at(index)
1491}
1492
1493/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1494pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1495    if num_bucket <= 0 {
1496        panic!(
1497            "width_bucket: num_bucket must be positive, got {}",
1498            num_bucket
1499        );
1500    }
1501    use polars::prelude::*;
1502    let v = value.expr().clone().cast(DataType::Float64);
1503    let min_expr = lit(min_val);
1504    let max_expr = lit(max_val);
1505    let nb = num_bucket as f64;
1506    let width = (max_val - min_val) / nb;
1507    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1508    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1509    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1510    let expr = polars::prelude::when(v.clone().lt(min_expr))
1511        .then(lit(0i64))
1512        .when(v.gt_eq(max_expr))
1513        .then(lit(num_bucket + 1))
1514        .otherwise(bucket_clamped);
1515    crate::column::Column::from_expr(expr, None)
1516}
1517
1518/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1519pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1520    use polars::prelude::*;
1521    if columns.is_empty() {
1522        panic!("elt requires at least one column");
1523    }
1524    let idx_expr = index.expr().clone();
1525    let null_expr = Expr::Literal(LiteralValue::Null);
1526    let mut expr = null_expr;
1527    for (i, c) in columns.iter().enumerate().rev() {
1528        let n = (i + 1) as i64;
1529        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1530            .then(c.expr().clone())
1531            .otherwise(expr);
1532    }
1533    crate::column::Column::from_expr(expr, None)
1534}
1535
1536/// Bit length of string (bytes * 8) (PySpark bit_length).
1537pub fn bit_length(column: &Column) -> Column {
1538    column.clone().bit_length()
1539}
1540
1541/// Length of string in bytes (PySpark octet_length).
1542pub fn octet_length(column: &Column) -> Column {
1543    column.clone().octet_length()
1544}
1545
1546/// Length of string in characters (PySpark char_length). Alias of length().
1547pub fn char_length(column: &Column) -> Column {
1548    column.clone().char_length()
1549}
1550
1551/// Length of string in characters (PySpark character_length). Alias of length().
1552pub fn character_length(column: &Column) -> Column {
1553    column.clone().character_length()
1554}
1555
1556/// Data type of column as string (PySpark typeof). Constant per column from schema.
1557pub fn typeof_(column: &Column) -> Column {
1558    column.clone().typeof_()
1559}
1560
1561/// True where the float value is NaN (PySpark isnan).
1562pub fn isnan(column: &Column) -> Column {
1563    column.clone().is_nan()
1564}
1565
1566/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1567pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1568    if columns.is_empty() {
1569        return Err("greatest requires at least one column".to_string());
1570    }
1571    if columns.len() == 1 {
1572        return Ok((*columns[0]).clone());
1573    }
1574    let mut expr = columns[0].expr().clone();
1575    for c in columns.iter().skip(1) {
1576        let args = [c.expr().clone()];
1577        expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1578    }
1579    Ok(Column::from_expr(expr, None))
1580}
1581
1582/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1583pub fn least(columns: &[&Column]) -> Result<Column, String> {
1584    if columns.is_empty() {
1585        return Err("least requires at least one column".to_string());
1586    }
1587    if columns.len() == 1 {
1588        return Ok((*columns[0]).clone());
1589    }
1590    let mut expr = columns[0].expr().clone();
1591    for c in columns.iter().skip(1) {
1592        let args = [c.expr().clone()];
1593        expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1594    }
1595    Ok(Column::from_expr(expr, None))
1596}
1597
1598/// Extract year from datetime column (PySpark year)
1599pub fn year(column: &Column) -> Column {
1600    column.clone().year()
1601}
1602
1603/// Extract month from datetime column (PySpark month)
1604pub fn month(column: &Column) -> Column {
1605    column.clone().month()
1606}
1607
1608/// Extract day of month from datetime column (PySpark day)
1609pub fn day(column: &Column) -> Column {
1610    column.clone().day()
1611}
1612
1613/// Cast to date (PySpark to_date)
1614pub fn to_date(column: &Column) -> Column {
1615    column.clone().to_date()
1616}
1617
1618/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1619pub fn date_format(column: &Column, format: &str) -> Column {
1620    column
1621        .clone()
1622        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1623}
1624
1625/// Current date (evaluation time). PySpark current_date.
1626pub fn current_date() -> Column {
1627    use polars::prelude::*;
1628    let today = chrono::Utc::now().date_naive();
1629    let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1630    crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1631}
1632
1633/// Current timestamp (evaluation time). PySpark current_timestamp.
1634pub fn current_timestamp() -> Column {
1635    use polars::prelude::*;
1636    let ts = chrono::Utc::now().timestamp_micros();
1637    crate::column::Column::from_expr(
1638        Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1639        None,
1640    )
1641}
1642
1643/// Alias for current_date (PySpark curdate).
1644pub fn curdate() -> Column {
1645    current_date()
1646}
1647
1648/// Alias for current_timestamp (PySpark now).
1649pub fn now() -> Column {
1650    current_timestamp()
1651}
1652
1653/// Alias for current_timestamp (PySpark localtimestamp).
1654pub fn localtimestamp() -> Column {
1655    current_timestamp()
1656}
1657
1658/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1659pub fn date_diff(end: &Column, start: &Column) -> Column {
1660    datediff(end, start)
1661}
1662
1663/// Alias for date_add (PySpark dateadd).
1664pub fn dateadd(column: &Column, n: i32) -> Column {
1665    date_add(column, n)
1666}
1667
1668/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1669pub fn extract(column: &Column, field: &str) -> Column {
1670    column.clone().extract(field)
1671}
1672
1673/// Alias for extract (PySpark date_part).
1674pub fn date_part(column: &Column, field: &str) -> Column {
1675    extract(column, field)
1676}
1677
1678/// Alias for extract (PySpark datepart).
1679pub fn datepart(column: &Column, field: &str) -> Column {
1680    extract(column, field)
1681}
1682
1683/// Timestamp to microseconds since epoch (PySpark unix_micros).
1684pub fn unix_micros(column: &Column) -> Column {
1685    column.clone().unix_micros()
1686}
1687
1688/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1689pub fn unix_millis(column: &Column) -> Column {
1690    column.clone().unix_millis()
1691}
1692
1693/// Timestamp to seconds since epoch (PySpark unix_seconds).
1694pub fn unix_seconds(column: &Column) -> Column {
1695    column.clone().unix_seconds()
1696}
1697
1698/// Weekday name "Mon","Tue",... (PySpark dayname).
1699pub fn dayname(column: &Column) -> Column {
1700    column.clone().dayname()
1701}
1702
1703/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1704pub fn weekday(column: &Column) -> Column {
1705    column.clone().weekday()
1706}
1707
1708/// Extract hour from datetime column (PySpark hour).
1709pub fn hour(column: &Column) -> Column {
1710    column.clone().hour()
1711}
1712
1713/// Extract minute from datetime column (PySpark minute).
1714pub fn minute(column: &Column) -> Column {
1715    column.clone().minute()
1716}
1717
1718/// Extract second from datetime column (PySpark second).
1719pub fn second(column: &Column) -> Column {
1720    column.clone().second()
1721}
1722
1723/// Add n days to date column (PySpark date_add).
1724pub fn date_add(column: &Column, n: i32) -> Column {
1725    column.clone().date_add(n)
1726}
1727
1728/// Subtract n days from date column (PySpark date_sub).
1729pub fn date_sub(column: &Column, n: i32) -> Column {
1730    column.clone().date_sub(n)
1731}
1732
1733/// Number of days between two date columns (PySpark datediff).
1734pub fn datediff(end: &Column, start: &Column) -> Column {
1735    start.clone().datediff(end)
1736}
1737
1738/// Last day of month for date column (PySpark last_day).
1739pub fn last_day(column: &Column) -> Column {
1740    column.clone().last_day()
1741}
1742
1743/// Truncate date/datetime to unit (PySpark trunc).
1744pub fn trunc(column: &Column, format: &str) -> Column {
1745    column.clone().trunc(format)
1746}
1747
1748/// Alias for trunc (PySpark date_trunc).
1749pub fn date_trunc(format: &str, column: &Column) -> Column {
1750    trunc(column, format)
1751}
1752
1753/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1754pub fn quarter(column: &Column) -> Column {
1755    column.clone().quarter()
1756}
1757
1758/// Extract ISO week of year (1-53) (PySpark weekofyear).
1759pub fn weekofyear(column: &Column) -> Column {
1760    column.clone().weekofyear()
1761}
1762
1763/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1764pub fn dayofweek(column: &Column) -> Column {
1765    column.clone().dayofweek()
1766}
1767
1768/// Extract day of year (1-366) (PySpark dayofyear).
1769pub fn dayofyear(column: &Column) -> Column {
1770    column.clone().dayofyear()
1771}
1772
1773/// Add n months to date column (PySpark add_months).
1774pub fn add_months(column: &Column, n: i32) -> Column {
1775    column.clone().add_months(n)
1776}
1777
1778/// Months between end and start dates as fractional (PySpark months_between).
1779/// When round_off is true, rounds to 8 decimal places (PySpark default).
1780pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1781    end.clone().months_between(start, round_off)
1782}
1783
1784/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
1785pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1786    column.clone().next_day(day_of_week)
1787}
1788
1789/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
1790pub fn unix_timestamp_now() -> Column {
1791    use polars::prelude::*;
1792    let secs = chrono::Utc::now().timestamp();
1793    crate::column::Column::from_expr(lit(secs), None)
1794}
1795
1796/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
1797pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1798    column.clone().unix_timestamp(format)
1799}
1800
1801/// Alias for unix_timestamp.
1802pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1803    unix_timestamp(column, format)
1804}
1805
1806/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1807pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1808    column.clone().from_unixtime(format)
1809}
1810
1811/// Build date from year, month, day columns (PySpark make_date).
1812pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1813    use polars::prelude::*;
1814    let args = [month.expr().clone(), day.expr().clone()];
1815    let expr = year.expr().clone().map_many(
1816        crate::udfs::apply_make_date,
1817        &args,
1818        GetOutput::from_type(DataType::Date),
1819    );
1820    crate::column::Column::from_expr(expr, None)
1821}
1822
1823/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
1824/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
1825pub fn make_timestamp(
1826    year: &Column,
1827    month: &Column,
1828    day: &Column,
1829    hour: &Column,
1830    minute: &Column,
1831    sec: &Column,
1832    timezone: Option<&str>,
1833) -> Column {
1834    use polars::prelude::*;
1835    let tz_owned = timezone.map(|s| s.to_string());
1836    let args = [
1837        month.expr().clone(),
1838        day.expr().clone(),
1839        hour.expr().clone(),
1840        minute.expr().clone(),
1841        sec.expr().clone(),
1842    ];
1843    let expr = year.expr().clone().map_many(
1844        move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1845        &args,
1846        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1847    );
1848    crate::column::Column::from_expr(expr, None)
1849}
1850
1851/// Add amount of unit to timestamp (PySpark timestampadd).
1852pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1853    ts.clone().timestampadd(unit, amount)
1854}
1855
1856/// Difference between timestamps in unit (PySpark timestampdiff).
1857pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1858    start.clone().timestampdiff(unit, end)
1859}
1860
1861/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
1862pub fn days(n: i64) -> Column {
1863    make_interval(0, 0, 0, n, 0, 0, 0)
1864}
1865
1866/// Interval of n hours (PySpark hours).
1867pub fn hours(n: i64) -> Column {
1868    make_interval(0, 0, 0, 0, n, 0, 0)
1869}
1870
1871/// Interval of n minutes (PySpark minutes).
1872pub fn minutes(n: i64) -> Column {
1873    make_interval(0, 0, 0, 0, 0, n, 0)
1874}
1875
1876/// Interval of n months (PySpark months). Approximated as 30*n days.
1877pub fn months(n: i64) -> Column {
1878    make_interval(0, n, 0, 0, 0, 0, 0)
1879}
1880
1881/// Interval of n years (PySpark years). Approximated as 365*n days.
1882pub fn years(n: i64) -> Column {
1883    make_interval(n, 0, 0, 0, 0, 0, 0)
1884}
1885
1886/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
1887pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1888    column.clone().from_utc_timestamp(tz)
1889}
1890
1891/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1892pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1893    column.clone().to_utc_timestamp(tz)
1894}
1895
1896/// Convert timestamp between timezones (PySpark convert_timezone).
1897pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1898    let source_tz = source_tz.to_string();
1899    let target_tz = target_tz.to_string();
1900    let expr = column.expr().clone().map(
1901        move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
1902        GetOutput::same_type(),
1903    );
1904    crate::column::Column::from_expr(expr, None)
1905}
1906
1907/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
1908pub fn current_timezone() -> Column {
1909    use polars::prelude::*;
1910    crate::column::Column::from_expr(lit("UTC"), None)
1911}
1912
1913/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
1914pub fn make_interval(
1915    years: i64,
1916    months: i64,
1917    weeks: i64,
1918    days: i64,
1919    hours: i64,
1920    mins: i64,
1921    secs: i64,
1922) -> Column {
1923    use polars::prelude::*;
1924    // Approximate: 1 year = 365 days, 1 month = 30 days
1925    let total_days = years * 365 + months * 30 + weeks * 7 + days;
1926    let args = DurationArgs::new()
1927        .with_days(lit(total_days))
1928        .with_hours(lit(hours))
1929        .with_minutes(lit(mins))
1930        .with_seconds(lit(secs));
1931    let dur = duration(args);
1932    crate::column::Column::from_expr(dur, None)
1933}
1934
1935/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
1936pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
1937    use polars::prelude::*;
1938    let args = DurationArgs::new()
1939        .with_days(lit(days))
1940        .with_hours(lit(hours))
1941        .with_minutes(lit(minutes))
1942        .with_seconds(lit(seconds));
1943    let dur = duration(args);
1944    crate::column::Column::from_expr(dur, None)
1945}
1946
1947/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
1948pub fn make_ym_interval(years: i32, months: i32) -> Column {
1949    use polars::prelude::*;
1950    let total_months = years * 12 + months;
1951    crate::column::Column::from_expr(lit(total_months), None)
1952}
1953
1954/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
1955pub fn make_timestamp_ntz(
1956    year: &Column,
1957    month: &Column,
1958    day: &Column,
1959    hour: &Column,
1960    minute: &Column,
1961    sec: &Column,
1962) -> Column {
1963    make_timestamp(year, month, day, hour, minute, sec, None)
1964}
1965
1966/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
1967pub fn timestamp_seconds(column: &Column) -> Column {
1968    column.clone().timestamp_seconds()
1969}
1970
1971/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
1972pub fn timestamp_millis(column: &Column) -> Column {
1973    column.clone().timestamp_millis()
1974}
1975
1976/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
1977pub fn timestamp_micros(column: &Column) -> Column {
1978    column.clone().timestamp_micros()
1979}
1980
1981/// Date to days since 1970-01-01 (PySpark unix_date).
1982pub fn unix_date(column: &Column) -> Column {
1983    column.clone().unix_date()
1984}
1985
1986/// Days since epoch to date (PySpark date_from_unix_date).
1987pub fn date_from_unix_date(column: &Column) -> Column {
1988    column.clone().date_from_unix_date()
1989}
1990
1991/// Positive modulus (PySpark pmod).
1992pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
1993    dividend.clone().pmod(divisor)
1994}
1995
1996/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
1997pub fn factorial(column: &Column) -> Column {
1998    column.clone().factorial()
1999}
2000
2001/// Concatenate string columns without separator (PySpark concat)
2002pub fn concat(columns: &[&Column]) -> Column {
2003    use polars::prelude::*;
2004    if columns.is_empty() {
2005        panic!("concat requires at least one column");
2006    }
2007    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2008    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2009}
2010
2011/// Concatenate string columns with separator (PySpark concat_ws)
2012pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2013    use polars::prelude::*;
2014    if columns.is_empty() {
2015        panic!("concat_ws requires at least one column");
2016    }
2017    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2018    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2019}
2020
2021/// Row number window function (1, 2, 3 by order within partition).
2022/// Use with `.over(partition_by)` after ranking by an order column.
2023///
2024/// # Example
2025/// ```
2026/// use robin_sparkless::{col, Column};
2027/// let salary_col = col("salary");
2028/// let rn = salary_col.row_number(true).over(&["dept"]);
2029/// ```
2030pub fn row_number(column: &Column) -> Column {
2031    column.clone().row_number(false)
2032}
2033
2034/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
2035pub fn rank(column: &Column, descending: bool) -> Column {
2036    column.clone().rank(descending)
2037}
2038
2039/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
2040pub fn dense_rank(column: &Column, descending: bool) -> Column {
2041    column.clone().dense_rank(descending)
2042}
2043
2044/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
2045pub fn lag(column: &Column, n: i64) -> Column {
2046    column.clone().lag(n)
2047}
2048
2049/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
2050pub fn lead(column: &Column, n: i64) -> Column {
2051    column.clone().lead(n)
2052}
2053
2054/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2055pub fn first_value(column: &Column) -> Column {
2056    column.clone().first_value()
2057}
2058
2059/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2060pub fn last_value(column: &Column) -> Column {
2061    column.clone().last_value()
2062}
2063
2064/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
2065pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2066    column.clone().percent_rank(partition_by, descending)
2067}
2068
2069/// Cumulative distribution in partition: row_number / count. Window is applied.
2070pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2071    column.clone().cume_dist(partition_by, descending)
2072}
2073
2074/// Ntile: bucket 1..n by rank within partition. Window is applied.
2075pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2076    column.clone().ntile(n, partition_by, descending)
2077}
2078
2079/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
2080pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2081    column.clone().nth_value(n, partition_by, descending)
2082}
2083
2084/// Coalesce - returns the first non-null value from multiple columns.
2085///
2086/// # Example
2087/// ```
2088/// use robin_sparkless::{col, lit_i64, coalesce};
2089///
2090/// // coalesce(col("a"), col("b"), lit(0))
2091/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
2092/// ```
2093pub fn coalesce(columns: &[&Column]) -> Column {
2094    use polars::prelude::*;
2095    if columns.is_empty() {
2096        panic!("coalesce requires at least one column");
2097    }
2098    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2099    let expr = coalesce(&exprs);
2100    crate::column::Column::from_expr(expr, None)
2101}
2102
2103/// Alias for coalesce(col, value). PySpark nvl / ifnull.
2104pub fn nvl(column: &Column, value: &Column) -> Column {
2105    coalesce(&[column, value])
2106}
2107
2108/// Alias for nvl. PySpark ifnull.
2109pub fn ifnull(column: &Column, value: &Column) -> Column {
2110    nvl(column, value)
2111}
2112
2113/// Return null if column equals value, else column. PySpark nullif.
2114pub fn nullif(column: &Column, value: &Column) -> Column {
2115    use polars::prelude::*;
2116    let cond = column.expr().clone().eq(value.expr().clone());
2117    let null_lit = Expr::Literal(LiteralValue::Null);
2118    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2119    crate::column::Column::from_expr(expr, None)
2120}
2121
2122/// Replace NaN with value. PySpark nanvl.
2123pub fn nanvl(column: &Column, value: &Column) -> Column {
2124    use polars::prelude::*;
2125    let cond = column.expr().clone().is_nan();
2126    let expr = when(cond)
2127        .then(value.expr().clone())
2128        .otherwise(column.expr().clone());
2129    crate::column::Column::from_expr(expr, None)
2130}
2131
2132/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
2133pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2134    use polars::prelude::*;
2135    let cond = col1.expr().clone().is_not_null();
2136    let expr = when(cond)
2137        .then(col2.expr().clone())
2138        .otherwise(col3.expr().clone());
2139    crate::column::Column::from_expr(expr, None)
2140}
2141
2142/// Alias for substring. PySpark substr.
2143pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2144    substring(column, start, length)
2145}
2146
2147/// Alias for pow. PySpark power.
2148pub fn power(column: &Column, exp: i64) -> Column {
2149    pow(column, exp)
2150}
2151
2152/// Alias for log (natural log). PySpark ln.
2153pub fn ln(column: &Column) -> Column {
2154    log(column)
2155}
2156
2157/// Alias for ceil. PySpark ceiling.
2158pub fn ceiling(column: &Column) -> Column {
2159    ceil(column)
2160}
2161
2162/// Alias for lower. PySpark lcase.
2163pub fn lcase(column: &Column) -> Column {
2164    lower(column)
2165}
2166
2167/// Alias for upper. PySpark ucase.
2168pub fn ucase(column: &Column) -> Column {
2169    upper(column)
2170}
2171
2172/// Alias for day. PySpark dayofmonth.
2173pub fn dayofmonth(column: &Column) -> Column {
2174    day(column)
2175}
2176
2177/// Alias for degrees. PySpark toDegrees.
2178pub fn to_degrees(column: &Column) -> Column {
2179    degrees(column)
2180}
2181
2182/// Alias for radians. PySpark toRadians.
2183pub fn to_radians(column: &Column) -> Column {
2184    radians(column)
2185}
2186
2187/// Hyperbolic cosine (PySpark cosh).
2188pub fn cosh(column: &Column) -> Column {
2189    column.clone().cosh()
2190}
2191/// Hyperbolic sine (PySpark sinh).
2192pub fn sinh(column: &Column) -> Column {
2193    column.clone().sinh()
2194}
2195/// Hyperbolic tangent (PySpark tanh).
2196pub fn tanh(column: &Column) -> Column {
2197    column.clone().tanh()
2198}
2199/// Inverse hyperbolic cosine (PySpark acosh).
2200pub fn acosh(column: &Column) -> Column {
2201    column.clone().acosh()
2202}
2203/// Inverse hyperbolic sine (PySpark asinh).
2204pub fn asinh(column: &Column) -> Column {
2205    column.clone().asinh()
2206}
2207/// Inverse hyperbolic tangent (PySpark atanh).
2208pub fn atanh(column: &Column) -> Column {
2209    column.clone().atanh()
2210}
2211/// Cube root (PySpark cbrt).
2212pub fn cbrt(column: &Column) -> Column {
2213    column.clone().cbrt()
2214}
2215/// exp(x) - 1 (PySpark expm1).
2216pub fn expm1(column: &Column) -> Column {
2217    column.clone().expm1()
2218}
2219/// log(1 + x) (PySpark log1p).
2220pub fn log1p(column: &Column) -> Column {
2221    column.clone().log1p()
2222}
2223/// Base-10 log (PySpark log10).
2224pub fn log10(column: &Column) -> Column {
2225    column.clone().log10()
2226}
2227/// Base-2 log (PySpark log2).
2228pub fn log2(column: &Column) -> Column {
2229    column.clone().log2()
2230}
2231/// Round to nearest integer (PySpark rint).
2232pub fn rint(column: &Column) -> Column {
2233    column.clone().rint()
2234}
2235/// sqrt(x*x + y*y) (PySpark hypot).
2236pub fn hypot(x: &Column, y: &Column) -> Column {
2237    let xx = x.expr().clone() * x.expr().clone();
2238    let yy = y.expr().clone() * y.expr().clone();
2239    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2240}
2241
2242/// True if column is null. PySpark isnull.
2243pub fn isnull(column: &Column) -> Column {
2244    column.clone().is_null()
2245}
2246
2247/// True if column is not null. PySpark isnotnull.
2248pub fn isnotnull(column: &Column) -> Column {
2249    column.clone().is_not_null()
2250}
2251
2252/// Create an array column from multiple columns (PySpark array).
2253pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2254    use polars::prelude::*;
2255    if columns.is_empty() {
2256        return Err(PolarsError::ComputeError(
2257            "array requires at least one column".into(),
2258        ));
2259    }
2260    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2261    let expr = concat_list(exprs)
2262        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2263    Ok(crate::column::Column::from_expr(expr, None))
2264}
2265
2266/// Number of elements in list (PySpark size / array_size). Returns Int32.
2267pub fn array_size(column: &Column) -> Column {
2268    column.clone().array_size()
2269}
2270
2271/// Alias for array_size (PySpark size).
2272pub fn size(column: &Column) -> Column {
2273    column.clone().array_size()
2274}
2275
2276/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2277pub fn cardinality(column: &Column) -> Column {
2278    column.clone().cardinality()
2279}
2280
2281/// Check if list contains value (PySpark array_contains).
2282pub fn array_contains(column: &Column, value: &Column) -> Column {
2283    column.clone().array_contains(value.expr().clone())
2284}
2285
2286/// Join list of strings with separator (PySpark array_join).
2287pub fn array_join(column: &Column, separator: &str) -> Column {
2288    column.clone().array_join(separator)
2289}
2290
2291/// Maximum element in list (PySpark array_max).
2292pub fn array_max(column: &Column) -> Column {
2293    column.clone().array_max()
2294}
2295
2296/// Minimum element in list (PySpark array_min).
2297pub fn array_min(column: &Column) -> Column {
2298    column.clone().array_min()
2299}
2300
2301/// Get element at 1-based index (PySpark element_at).
2302pub fn element_at(column: &Column, index: i64) -> Column {
2303    column.clone().element_at(index)
2304}
2305
2306/// Sort list elements (PySpark array_sort).
2307pub fn array_sort(column: &Column) -> Column {
2308    column.clone().array_sort()
2309}
2310
2311/// Distinct elements in list (PySpark array_distinct).
2312pub fn array_distinct(column: &Column) -> Column {
2313    column.clone().array_distinct()
2314}
2315
2316/// Slice list from 1-based start with optional length (PySpark slice).
2317pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2318    column.clone().array_slice(start, length)
2319}
2320
2321/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2322/// step defaults to 1.
2323pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2324    use polars::prelude::{as_struct, lit, DataType, GetOutput};
2325    let step_expr = step
2326        .map(|c| c.expr().clone().alias("2"))
2327        .unwrap_or_else(|| lit(1i64).alias("2"));
2328    let struct_expr = as_struct(vec![
2329        start.expr().clone().alias("0"),
2330        stop.expr().clone().alias("1"),
2331        step_expr,
2332    ]);
2333    let out_dtype = DataType::List(Box::new(DataType::Int64));
2334    let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2335    crate::column::Column::from_expr(expr, None)
2336}
2337
2338/// Random permutation of list elements (PySpark shuffle).
2339pub fn shuffle(column: &Column) -> Column {
2340    use polars::prelude::GetOutput;
2341    let expr = column
2342        .expr()
2343        .clone()
2344        .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2345    crate::column::Column::from_expr(expr, None)
2346}
2347
2348/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2349/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2350pub fn inline(column: &Column) -> Column {
2351    column.clone().explode()
2352}
2353
2354/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2355pub fn inline_outer(column: &Column) -> Column {
2356    column.clone().explode_outer()
2357}
2358
2359/// Explode list into one row per element (PySpark explode).
2360pub fn explode(column: &Column) -> Column {
2361    column.clone().explode()
2362}
2363
2364/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2365/// Implemented via Polars list.eval with col("") as element.
2366pub fn array_position(column: &Column, value: &Column) -> Column {
2367    column.clone().array_position(value.expr().clone())
2368}
2369
2370/// Remove null elements from list (PySpark array_compact).
2371pub fn array_compact(column: &Column) -> Column {
2372    column.clone().array_compact()
2373}
2374
2375/// New list with all elements equal to value removed (PySpark array_remove).
2376/// Implemented via Polars list.eval + list.drop_nulls.
2377pub fn array_remove(column: &Column, value: &Column) -> Column {
2378    column.clone().array_remove(value.expr().clone())
2379}
2380
2381/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2382pub fn array_repeat(column: &Column, n: i64) -> Column {
2383    column.clone().array_repeat(n)
2384}
2385
2386/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2387pub fn array_flatten(column: &Column) -> Column {
2388    column.clone().array_flatten()
2389}
2390
2391/// True if any list element satisfies the predicate (PySpark exists).
2392pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2393    column.clone().array_exists(predicate)
2394}
2395
2396/// True if all list elements satisfy the predicate (PySpark forall).
2397pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2398    column.clone().array_forall(predicate)
2399}
2400
2401/// Filter list elements by predicate (PySpark filter).
2402pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2403    column.clone().array_filter(predicate)
2404}
2405
2406/// Transform list elements by expression (PySpark transform).
2407pub fn array_transform(column: &Column, f: Expr) -> Column {
2408    column.clone().array_transform(f)
2409}
2410
2411/// Sum of list elements (PySpark aggregate sum).
2412pub fn array_sum(column: &Column) -> Column {
2413    column.clone().array_sum()
2414}
2415
2416/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2417pub fn aggregate(column: &Column, zero: &Column) -> Column {
2418    column.clone().array_aggregate(zero)
2419}
2420
2421/// Mean of list elements (PySpark aggregate avg).
2422pub fn array_mean(column: &Column) -> Column {
2423    column.clone().array_mean()
2424}
2425
2426/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2427/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2428pub fn posexplode(column: &Column) -> (Column, Column) {
2429    column.clone().posexplode()
2430}
2431
2432/// Build a map column from alternating key/value expressions (PySpark create_map).
2433/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2434pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2435    use polars::prelude::{as_struct, concat_list};
2436    if key_values.is_empty() {
2437        return Err(PolarsError::ComputeError(
2438            "create_map requires at least one key-value pair".into(),
2439        ));
2440    }
2441    let mut struct_exprs: Vec<Expr> = Vec::new();
2442    for i in (0..key_values.len()).step_by(2) {
2443        if i + 1 < key_values.len() {
2444            let k = key_values[i].expr().clone().alias("key");
2445            let v = key_values[i + 1].expr().clone().alias("value");
2446            struct_exprs.push(as_struct(vec![k, v]));
2447        }
2448    }
2449    let expr = concat_list(struct_exprs)
2450        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2451    Ok(crate::column::Column::from_expr(expr, None))
2452}
2453
2454/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2455pub fn map_keys(column: &Column) -> Column {
2456    column.clone().map_keys()
2457}
2458
2459/// Extract values from a map column (PySpark map_values).
2460pub fn map_values(column: &Column) -> Column {
2461    column.clone().map_values()
2462}
2463
2464/// Return map as list of structs {key, value} (PySpark map_entries).
2465pub fn map_entries(column: &Column) -> Column {
2466    column.clone().map_entries()
2467}
2468
2469/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2470pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2471    keys.clone().map_from_arrays(values)
2472}
2473
2474/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2475pub fn map_concat(a: &Column, b: &Column) -> Column {
2476    a.clone().map_concat(b)
2477}
2478
2479/// Array of structs {key, value} to map (PySpark map_from_entries).
2480pub fn map_from_entries(column: &Column) -> Column {
2481    column.clone().map_from_entries()
2482}
2483
2484/// True if map contains key (PySpark map_contains_key).
2485pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2486    map_col.clone().map_contains_key(key)
2487}
2488
2489/// Get value for key from map, or null (PySpark get).
2490pub fn get(map_col: &Column, key: &Column) -> Column {
2491    map_col.clone().get(key)
2492}
2493
2494/// Filter map entries by predicate (PySpark map_filter).
2495pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2496    map_col.clone().map_filter(predicate)
2497}
2498
2499/// Merge two maps by key with merge function (PySpark map_zip_with).
2500pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2501    map1.clone().map_zip_with(map2, merge)
2502}
2503
2504/// Convenience: zip_with with coalesce(left, right) merge.
2505pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2506    use polars::prelude::col;
2507    let left_field = col("").struct_().field_by_name("left");
2508    let right_field = col("").struct_().field_by_name("right");
2509    let merge = crate::column::Column::from_expr(
2510        coalesce(&[
2511            &crate::column::Column::from_expr(left_field, None),
2512            &crate::column::Column::from_expr(right_field, None),
2513        ])
2514        .into_expr(),
2515        None,
2516    );
2517    left.clone().zip_with(right, merge.into_expr())
2518}
2519
2520/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2521pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2522    use polars::prelude::col;
2523    let v1 = col("").struct_().field_by_name("value1");
2524    let v2 = col("").struct_().field_by_name("value2");
2525    let merge = coalesce(&[
2526        &crate::column::Column::from_expr(v1, None),
2527        &crate::column::Column::from_expr(v2, None),
2528    ])
2529    .into_expr();
2530    map1.clone().map_zip_with(map2, merge)
2531}
2532
2533/// Convenience: map_filter with value > threshold predicate.
2534pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2535    use polars::prelude::{col, lit};
2536    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2537    map_col.clone().map_filter(pred)
2538}
2539
2540/// Create struct from columns using column names as field names (PySpark struct).
2541pub fn struct_(columns: &[&Column]) -> Column {
2542    use polars::prelude::as_struct;
2543    if columns.is_empty() {
2544        panic!("struct requires at least one column");
2545    }
2546    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2547    crate::column::Column::from_expr(as_struct(exprs), None)
2548}
2549
2550/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2551pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2552    use polars::prelude::as_struct;
2553    if pairs.is_empty() {
2554        panic!("named_struct requires at least one (name, column) pair");
2555    }
2556    let exprs: Vec<Expr> = pairs
2557        .iter()
2558        .map(|(name, col)| col.expr().clone().alias(*name))
2559        .collect();
2560    crate::column::Column::from_expr(as_struct(exprs), None)
2561}
2562
2563/// Append element to end of list (PySpark array_append).
2564pub fn array_append(array: &Column, elem: &Column) -> Column {
2565    array.clone().array_append(elem)
2566}
2567
2568/// Prepend element to start of list (PySpark array_prepend).
2569pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2570    array.clone().array_prepend(elem)
2571}
2572
2573/// Insert element at 1-based position (PySpark array_insert).
2574pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2575    array.clone().array_insert(pos, elem)
2576}
2577
2578/// Elements in first array not in second (PySpark array_except).
2579pub fn array_except(a: &Column, b: &Column) -> Column {
2580    a.clone().array_except(b)
2581}
2582
2583/// Elements in both arrays (PySpark array_intersect).
2584pub fn array_intersect(a: &Column, b: &Column) -> Column {
2585    a.clone().array_intersect(b)
2586}
2587
2588/// Distinct elements from both arrays (PySpark array_union).
2589pub fn array_union(a: &Column, b: &Column) -> Column {
2590    a.clone().array_union(b)
2591}
2592
2593/// Zip two arrays element-wise with merge function (PySpark zip_with).
2594pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2595    left.clone().zip_with(right, merge)
2596}
2597
2598/// Extract JSON path from string column (PySpark get_json_object).
2599pub fn get_json_object(column: &Column, path: &str) -> Column {
2600    column.clone().get_json_object(path)
2601}
2602
2603/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2604pub fn json_object_keys(column: &Column) -> Column {
2605    column.clone().json_object_keys()
2606}
2607
2608/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2609pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2610    column.clone().json_tuple(keys)
2611}
2612
2613/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2614pub fn from_csv(column: &Column) -> Column {
2615    column.clone().from_csv()
2616}
2617
2618/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2619pub fn to_csv(column: &Column) -> Column {
2620    column.clone().to_csv()
2621}
2622
2623/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2624pub fn schema_of_csv(_column: &Column) -> Column {
2625    Column::from_expr(
2626        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2627        Some("schema_of_csv".to_string()),
2628    )
2629}
2630
2631/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2632pub fn schema_of_json(_column: &Column) -> Column {
2633    Column::from_expr(
2634        lit("STRUCT<>".to_string()),
2635        Some("schema_of_json".to_string()),
2636    )
2637}
2638
2639/// Parse string column as JSON into struct (PySpark from_json).
2640pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2641    column.clone().from_json(schema)
2642}
2643
2644/// Serialize struct column to JSON string (PySpark to_json).
2645pub fn to_json(column: &Column) -> Column {
2646    column.clone().to_json()
2647}
2648
2649/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2650pub fn isin(column: &Column, other: &Column) -> Column {
2651    column.clone().isin(other)
2652}
2653
2654/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2655pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2656    let s = Series::from_iter(values.iter().cloned());
2657    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2658}
2659
2660/// Check if column values are in the given string slice (PySpark isin with literal list).
2661pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2662    let s: Series = Series::from_iter(values.iter().copied());
2663    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2664}
2665
2666/// Percent-decode URL-encoded string (PySpark url_decode).
2667pub fn url_decode(column: &Column) -> Column {
2668    column.clone().url_decode()
2669}
2670
2671/// Percent-encode string for URL (PySpark url_encode).
2672pub fn url_encode(column: &Column) -> Column {
2673    column.clone().url_encode()
2674}
2675
2676/// Bitwise left shift (PySpark shiftLeft). col << n.
2677pub fn shift_left(column: &Column, n: i32) -> Column {
2678    column.clone().shift_left(n)
2679}
2680
2681/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2682pub fn shift_right(column: &Column, n: i32) -> Column {
2683    column.clone().shift_right(n)
2684}
2685
2686/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2687pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2688    column.clone().shift_right_unsigned(n)
2689}
2690
2691/// Session/library version string (PySpark version).
2692pub fn version() -> Column {
2693    Column::from_expr(
2694        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2695        None,
2696    )
2697}
2698
2699/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2700pub fn equal_null(left: &Column, right: &Column) -> Column {
2701    left.clone().eq_null_safe(right)
2702}
2703
2704/// Length of JSON array at path (PySpark json_array_length).
2705pub fn json_array_length(column: &Column, path: &str) -> Column {
2706    column.clone().json_array_length(path)
2707}
2708
2709/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2710/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2711pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2712    column.clone().parse_url(part, key)
2713}
2714
2715/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2716pub fn hash(columns: &[&Column]) -> Column {
2717    use polars::prelude::*;
2718    if columns.is_empty() {
2719        return crate::column::Column::from_expr(lit(0i64), None);
2720    }
2721    if columns.len() == 1 {
2722        return columns[0].clone().hash();
2723    }
2724    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2725    let struct_expr = polars::prelude::as_struct(exprs);
2726    let name = columns[0].name().to_string();
2727    let expr = struct_expr.map(
2728        crate::udfs::apply_hash_struct,
2729        GetOutput::from_type(DataType::Int64),
2730    );
2731    crate::column::Column::from_expr(expr, Some(name))
2732}
2733
2734/// Stack columns into struct (PySpark stack). Alias for struct_.
2735pub fn stack(columns: &[&Column]) -> Column {
2736    struct_(columns)
2737}
2738
2739#[cfg(test)]
2740mod tests {
2741    use super::*;
2742    use polars::prelude::{df, IntoLazy};
2743
2744    #[test]
2745    fn test_col_creates_column() {
2746        let column = col("test");
2747        assert_eq!(column.name(), "test");
2748    }
2749
2750    #[test]
2751    fn test_lit_i32() {
2752        let column = lit_i32(42);
2753        // The column should have a default name since it's a literal
2754        assert_eq!(column.name(), "<expr>");
2755    }
2756
2757    #[test]
2758    fn test_lit_i64() {
2759        let column = lit_i64(123456789012345i64);
2760        assert_eq!(column.name(), "<expr>");
2761    }
2762
2763    #[test]
2764    fn test_lit_f64() {
2765        let column = lit_f64(std::f64::consts::PI);
2766        assert_eq!(column.name(), "<expr>");
2767    }
2768
2769    #[test]
2770    fn test_lit_bool() {
2771        let column = lit_bool(true);
2772        assert_eq!(column.name(), "<expr>");
2773    }
2774
2775    #[test]
2776    fn test_lit_str() {
2777        let column = lit_str("hello");
2778        assert_eq!(column.name(), "<expr>");
2779    }
2780
2781    #[test]
2782    fn test_count_aggregation() {
2783        let column = col("value");
2784        let result = count(&column);
2785        assert_eq!(result.name(), "count");
2786    }
2787
2788    #[test]
2789    fn test_sum_aggregation() {
2790        let column = col("value");
2791        let result = sum(&column);
2792        assert_eq!(result.name(), "sum");
2793    }
2794
2795    #[test]
2796    fn test_avg_aggregation() {
2797        let column = col("value");
2798        let result = avg(&column);
2799        assert_eq!(result.name(), "avg");
2800    }
2801
2802    #[test]
2803    fn test_max_aggregation() {
2804        let column = col("value");
2805        let result = max(&column);
2806        assert_eq!(result.name(), "max");
2807    }
2808
2809    #[test]
2810    fn test_min_aggregation() {
2811        let column = col("value");
2812        let result = min(&column);
2813        assert_eq!(result.name(), "min");
2814    }
2815
2816    #[test]
2817    fn test_when_then_otherwise() {
2818        // Create a simple DataFrame
2819        let df = df!(
2820            "age" => &[15, 25, 35]
2821        )
2822        .unwrap();
2823
2824        // Build a when-then-otherwise expression
2825        let age_col = col("age");
2826        let condition = age_col.gt(polars::prelude::lit(18));
2827        let result = when(&condition)
2828            .then(&lit_str("adult"))
2829            .otherwise(&lit_str("minor"));
2830
2831        // Apply the expression
2832        let result_df = df
2833            .lazy()
2834            .with_column(result.into_expr().alias("status"))
2835            .collect()
2836            .unwrap();
2837
2838        // Verify the result
2839        let status_col = result_df.column("status").unwrap();
2840        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2841
2842        assert_eq!(values[0], Some("minor")); // age 15 < 18
2843        assert_eq!(values[1], Some("adult")); // age 25 > 18
2844        assert_eq!(values[2], Some("adult")); // age 35 > 18
2845    }
2846
2847    #[test]
2848    fn test_coalesce_returns_first_non_null() {
2849        // Create a DataFrame with some nulls
2850        let df = df!(
2851            "a" => &[Some(1), None, None],
2852            "b" => &[None, Some(2), None],
2853            "c" => &[None, None, Some(3)]
2854        )
2855        .unwrap();
2856
2857        let col_a = col("a");
2858        let col_b = col("b");
2859        let col_c = col("c");
2860        let result = coalesce(&[&col_a, &col_b, &col_c]);
2861
2862        // Apply the expression
2863        let result_df = df
2864            .lazy()
2865            .with_column(result.into_expr().alias("coalesced"))
2866            .collect()
2867            .unwrap();
2868
2869        // Verify the result
2870        let coalesced_col = result_df.column("coalesced").unwrap();
2871        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2872
2873        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2874        assert_eq!(values[1], Some(2)); // First non-null is 'b'
2875        assert_eq!(values[2], Some(3)); // First non-null is 'c'
2876    }
2877
2878    #[test]
2879    fn test_coalesce_with_literal_fallback() {
2880        // Create a DataFrame with all nulls in one row
2881        let df = df!(
2882            "a" => &[Some(1), None],
2883            "b" => &[None::<i32>, None::<i32>]
2884        )
2885        .unwrap();
2886
2887        let col_a = col("a");
2888        let col_b = col("b");
2889        let fallback = lit_i32(0);
2890        let result = coalesce(&[&col_a, &col_b, &fallback]);
2891
2892        // Apply the expression
2893        let result_df = df
2894            .lazy()
2895            .with_column(result.into_expr().alias("coalesced"))
2896            .collect()
2897            .unwrap();
2898
2899        // Verify the result
2900        let coalesced_col = result_df.column("coalesced").unwrap();
2901        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2902
2903        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2904        assert_eq!(values[1], Some(0)); // All nulls, use fallback
2905    }
2906
2907    #[test]
2908    #[should_panic(expected = "coalesce requires at least one column")]
2909    fn test_coalesce_empty_panics() {
2910        let columns: [&Column; 0] = [];
2911        let _ = coalesce(&columns);
2912    }
2913
2914    #[test]
2915    fn test_cast_double_string_column_strict_ok() {
2916        // All values parse as doubles, so strict cast should succeed.
2917        let df = df!(
2918            "s" => &["123", " 45.5 ", "0"]
2919        )
2920        .unwrap();
2921
2922        let s_col = col("s");
2923        let cast_col = cast(&s_col, "double").unwrap();
2924
2925        let out = df
2926            .lazy()
2927            .with_column(cast_col.into_expr().alias("v"))
2928            .collect()
2929            .unwrap();
2930
2931        let v = out.column("v").unwrap();
2932        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
2933        assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
2934    }
2935
2936    #[test]
2937    fn test_try_cast_double_string_column_invalid_to_null() {
2938        // Invalid numeric strings should become null under try_cast / try_to_number.
2939        let df = df!(
2940            "s" => &["123", " 45.5 ", "abc", ""]
2941        )
2942        .unwrap();
2943
2944        let s_col = col("s");
2945        let try_cast_col = try_cast(&s_col, "double").unwrap();
2946
2947        let out = df
2948            .lazy()
2949            .with_column(try_cast_col.into_expr().alias("v"))
2950            .collect()
2951            .unwrap();
2952
2953        let v = out.column("v").unwrap();
2954        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
2955        assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
2956    }
2957
2958    #[test]
2959    fn test_to_number_and_try_to_number_numerics_and_strings() {
2960        // Mixed numeric types should be cast to double; invalid strings become null only for try_to_number.
2961        let df = df!(
2962            "i" => &[1i32, 2, 3],
2963            "f" => &[1.5f64, 2.5, 3.5],
2964            "s" => &["10", "20.5", "xyz"]
2965        )
2966        .unwrap();
2967
2968        let i_col = col("i");
2969        let f_col = col("f");
2970        let s_col = col("s");
2971
2972        let to_number_i = to_number(&i_col, None).unwrap();
2973        let to_number_f = to_number(&f_col, None).unwrap();
2974        let try_to_number_s = try_to_number(&s_col, None).unwrap();
2975
2976        let out = df
2977            .lazy()
2978            .with_columns([
2979                to_number_i.into_expr().alias("i_num"),
2980                to_number_f.into_expr().alias("f_num"),
2981                try_to_number_s.into_expr().alias("s_num"),
2982            ])
2983            .collect()
2984            .unwrap();
2985
2986        let i_num = out.column("i_num").unwrap();
2987        let f_num = out.column("f_num").unwrap();
2988        let s_num = out.column("s_num").unwrap();
2989
2990        let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
2991        let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
2992        let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
2993
2994        assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
2995        assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
2996        assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
2997    }
2998}