Skip to main content

robin_sparkless/
functions.rs

1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5// -----------------------------------------------------------------------------
6// -----------------------------------------------------------------------------
7
8/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
9#[derive(Debug, Clone)]
10pub struct SortOrder {
11    pub(crate) expr: Expr,
12    pub(crate) descending: bool,
13    pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17    pub fn expr(&self) -> &Expr {
18        &self.expr
19    }
20}
21
22/// Ascending sort, nulls first (Spark default for ASC).
23pub fn asc(column: &Column) -> SortOrder {
24    SortOrder {
25        expr: column.expr().clone(),
26        descending: false,
27        nulls_last: false,
28    }
29}
30
31/// Ascending sort, nulls first.
32pub fn asc_nulls_first(column: &Column) -> SortOrder {
33    SortOrder {
34        expr: column.expr().clone(),
35        descending: false,
36        nulls_last: false,
37    }
38}
39
40/// Ascending sort, nulls last.
41pub fn asc_nulls_last(column: &Column) -> SortOrder {
42    SortOrder {
43        expr: column.expr().clone(),
44        descending: false,
45        nulls_last: true,
46    }
47}
48
49/// Descending sort, nulls last (Spark default for DESC).
50pub fn desc(column: &Column) -> SortOrder {
51    SortOrder {
52        expr: column.expr().clone(),
53        descending: true,
54        nulls_last: true,
55    }
56}
57
58/// Descending sort, nulls first.
59pub fn desc_nulls_first(column: &Column) -> SortOrder {
60    SortOrder {
61        expr: column.expr().clone(),
62        descending: true,
63        nulls_last: false,
64    }
65}
66
67/// Descending sort, nulls last.
68pub fn desc_nulls_last(column: &Column) -> SortOrder {
69    SortOrder {
70        expr: column.expr().clone(),
71        descending: true,
72        nulls_last: true,
73    }
74}
75
76// -----------------------------------------------------------------------------
77
78/// Parse PySpark-like type name to Polars DataType.
79pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80    let s = name.trim().to_lowercase();
81    Ok(match s.as_str() {
82        "int" | "integer" => DataType::Int32,
83        "long" | "bigint" => DataType::Int64,
84        "float" => DataType::Float32,
85        "double" => DataType::Float64,
86        "string" | "str" => DataType::String,
87        "boolean" | "bool" => DataType::Boolean,
88        "date" => DataType::Date,
89        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
90        _ => return Err(format!("unknown type name: {name}")),
91    })
92}
93
94/// Get a column by name
95pub fn col(name: &str) -> Column {
96    Column::new(name.to_string())
97}
98
99/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
100pub fn grouping(column: &Column) -> Column {
101    let _ = column;
102    Column::from_expr(lit(0i32), Some("grouping".to_string()))
103}
104
105/// Grouping set id (PySpark grouping_id). Stub: returns 0.
106pub fn grouping_id(_columns: &[Column]) -> Column {
107    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
108}
109
110/// Create a literal column from a value
111pub fn lit_i32(value: i32) -> Column {
112    let expr: Expr = lit(value);
113    Column::from_expr(expr, None)
114}
115
116pub fn lit_i64(value: i64) -> Column {
117    let expr: Expr = lit(value);
118    Column::from_expr(expr, None)
119}
120
121pub fn lit_f64(value: f64) -> Column {
122    let expr: Expr = lit(value);
123    Column::from_expr(expr, None)
124}
125
126pub fn lit_bool(value: bool) -> Column {
127    let expr: Expr = lit(value);
128    Column::from_expr(expr, None)
129}
130
131pub fn lit_str(value: &str) -> Column {
132    let expr: Expr = lit(value);
133    Column::from_expr(expr, None)
134}
135
136/// Count aggregation
137pub fn count(col: &Column) -> Column {
138    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
139}
140
141/// Sum aggregation
142pub fn sum(col: &Column) -> Column {
143    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
144}
145
146/// Average aggregation
147pub fn avg(col: &Column) -> Column {
148    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
149}
150
151/// Alias for avg (PySpark mean).
152pub fn mean(col: &Column) -> Column {
153    avg(col)
154}
155
156/// Maximum aggregation
157pub fn max(col: &Column) -> Column {
158    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
159}
160
161/// Minimum aggregation
162pub fn min(col: &Column) -> Column {
163    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
164}
165
166/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
167pub fn stddev(col: &Column) -> Column {
168    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
169}
170
171/// Variance (sample) aggregation (PySpark variance / var_samp)
172pub fn variance(col: &Column) -> Column {
173    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
174}
175
176/// Population standard deviation (ddof=0). PySpark stddev_pop.
177pub fn stddev_pop(col: &Column) -> Column {
178    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
179}
180
181/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
182pub fn stddev_samp(col: &Column) -> Column {
183    stddev(col)
184}
185
186/// Alias for stddev (PySpark std).
187pub fn std(col: &Column) -> Column {
188    stddev(col)
189}
190
191/// Population variance (ddof=0). PySpark var_pop.
192pub fn var_pop(col: &Column) -> Column {
193    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
194}
195
196/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
197pub fn var_samp(col: &Column) -> Column {
198    variance(col)
199}
200
201/// Median aggregation. PySpark median.
202pub fn median(col: &Column) -> Column {
203    use polars::prelude::QuantileMethod;
204    Column::from_expr(
205        col.expr()
206            .clone()
207            .quantile(lit(0.5), QuantileMethod::Linear),
208        Some("median".to_string()),
209    )
210}
211
212/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0.
213pub fn approx_percentile(col: &Column, percentage: f64) -> Column {
214    use polars::prelude::QuantileMethod;
215    Column::from_expr(
216        col.expr()
217            .clone()
218            .quantile(lit(percentage), QuantileMethod::Linear),
219        Some(format!("approx_percentile({percentage})")),
220    )
221}
222
223/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
224pub fn percentile_approx(col: &Column, percentage: f64) -> Column {
225    approx_percentile(col, percentage)
226}
227
228/// Mode aggregation - most frequent value. PySpark mode.
229pub fn mode(col: &Column) -> Column {
230    col.clone().mode()
231}
232
233/// Count distinct aggregation (PySpark countDistinct)
234pub fn count_distinct(col: &Column) -> Column {
235    use polars::prelude::DataType;
236    Column::from_expr(
237        col.expr().clone().n_unique().cast(DataType::Int64),
238        Some("count_distinct".to_string()),
239    )
240}
241
242/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
243pub fn kurtosis(col: &Column) -> Column {
244    Column::from_expr(
245        col.expr()
246            .clone()
247            .cast(DataType::Float64)
248            .kurtosis(true, true),
249        Some("kurtosis".to_string()),
250    )
251}
252
253/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
254pub fn skewness(col: &Column) -> Column {
255    Column::from_expr(
256        col.expr().clone().cast(DataType::Float64).skew(true),
257        Some("skewness".to_string()),
258    )
259}
260
261/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
262pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
263    use polars::prelude::{col as pl_col, len};
264    let c1 = pl_col(col1).cast(DataType::Float64);
265    let c2 = pl_col(col2).cast(DataType::Float64);
266    let n = len().cast(DataType::Float64);
267    let sum_ab = (c1.clone() * c2.clone()).sum();
268    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
269    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
270    (sum_ab - sum_a * sum_b / n.clone()) / n
271}
272
273/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
274pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
275    use polars::prelude::{col as pl_col, len, lit, when};
276    let c1 = pl_col(col1).cast(DataType::Float64);
277    let c2 = pl_col(col2).cast(DataType::Float64);
278    let n = len().cast(DataType::Float64);
279    let sum_ab = (c1.clone() * c2.clone()).sum();
280    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
281    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
282    when(len().gt(lit(1)))
283        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
284        .otherwise(lit(f64::NAN))
285}
286
287/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
288pub fn corr_expr(col1: &str, col2: &str) -> Expr {
289    use polars::prelude::{col as pl_col, len, lit, when};
290    let c1 = pl_col(col1).cast(DataType::Float64);
291    let c2 = pl_col(col2).cast(DataType::Float64);
292    let n = len().cast(DataType::Float64);
293    let n1 = (len() - lit(1)).cast(DataType::Float64);
294    let sum_ab = (c1.clone() * c2.clone()).sum();
295    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
296    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
297    let sum_a2 = (c1.clone() * c1).sum();
298    let sum_b2 = (c2.clone() * c2).sum();
299    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
300    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
301    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
302    let std_a = var_a.sqrt();
303    let std_b = var_b.sqrt();
304    when(len().gt(lit(1)))
305        .then(cov_samp / (std_a * std_b))
306        .otherwise(lit(f64::NAN))
307}
308
309// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
310
311fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312    use polars::prelude::col as pl_col;
313    let y = pl_col(y_col).cast(DataType::Float64);
314    let x = pl_col(x_col).cast(DataType::Float64);
315    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316    let n = y
317        .clone()
318        .filter(cond.clone())
319        .count()
320        .cast(DataType::Float64);
321    let sum_x = x.clone().filter(cond.clone()).sum();
322    let sum_y = y.clone().filter(cond.clone()).sum();
323    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325    let sum_xy = (x * y).filter(cond).sum();
326    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
330pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331    let (n, ..) = regr_cond_and_sums(y_col, x_col);
332    n
333}
334
335/// Regression: average of x (PySpark regr_avgx).
336pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337    use polars::prelude::{lit, when};
338    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339    when(n.clone().gt(lit(0.0)))
340        .then(sum_x / n)
341        .otherwise(lit(f64::NAN))
342}
343
344/// Regression: average of y (PySpark regr_avgy).
345pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346    use polars::prelude::{lit, when};
347    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348    when(n.clone().gt(lit(0.0)))
349        .then(sum_y / n)
350        .otherwise(lit(f64::NAN))
351}
352
353/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
354pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355    use polars::prelude::{lit, when};
356    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357    when(n.clone().gt(lit(0.0)))
358        .then(sum_xx - sum_x.clone() * sum_x / n)
359        .otherwise(lit(f64::NAN))
360}
361
362/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
363pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364    use polars::prelude::{lit, when};
365    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366    when(n.clone().gt(lit(0.0)))
367        .then(sum_yy - sum_y.clone() * sum_y / n)
368        .otherwise(lit(f64::NAN))
369}
370
371/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
372pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373    use polars::prelude::{lit, when};
374    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375    when(n.clone().gt(lit(0.0)))
376        .then(sum_xy - sum_x * sum_y / n)
377        .otherwise(lit(f64::NAN))
378}
379
380/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
381pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382    use polars::prelude::{lit, when};
383    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387        .then(regr_sxy / regr_sxx)
388        .otherwise(lit(f64::NAN))
389}
390
391/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
392pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393    use polars::prelude::{lit, when};
394    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397    let slope = regr_sxy.clone() / regr_sxx.clone();
398    let avg_y = sum_y / n.clone();
399    let avg_x = sum_x / n.clone();
400    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401        .then(avg_y - slope * avg_x)
402        .otherwise(lit(f64::NAN))
403}
404
405/// Regression R-squared (PySpark regr_r2).
406pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407    use polars::prelude::{lit, when};
408    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411    let regr_sxy = sum_xy - sum_x * sum_y / n;
412    when(
413        regr_sxx
414            .clone()
415            .gt(lit(0.0))
416            .and(regr_syy.clone().gt(lit(0.0))),
417    )
418    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419    .otherwise(lit(f64::NAN))
420}
421
422/// PySpark-style conditional expression builder.
423///
424/// # Example
425/// ```
426/// use robin_sparkless::{col, lit_i64, lit_str, when};
427///
428/// // when(condition).then(value).otherwise(fallback)
429/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
430///     .then(&lit_str("adult"))
431///     .otherwise(&lit_str("minor"));
432/// ```
433pub fn when(condition: &Column) -> WhenBuilder {
434    WhenBuilder::new(condition.expr().clone())
435}
436
437/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
438pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439    use polars::prelude::*;
440    let null_expr = Expr::Literal(LiteralValue::Null);
441    let expr = polars::prelude::when(condition.expr().clone())
442        .then(value.expr().clone())
443        .otherwise(null_expr);
444    crate::column::Column::from_expr(expr, None)
445}
446
447/// Builder for when-then-otherwise expressions
448pub struct WhenBuilder {
449    condition: Expr,
450}
451
452impl WhenBuilder {
453    fn new(condition: Expr) -> Self {
454        WhenBuilder { condition }
455    }
456
457    /// Specify the value when condition is true
458    pub fn then(self, value: &Column) -> ThenBuilder {
459        use polars::prelude::*;
460        let when_then = when(self.condition).then(value.expr().clone());
461        ThenBuilder::new(when_then)
462    }
463
464    /// Specify the value when condition is false
465    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
466    /// For this implementation, we require .then() to be called explicitly.
467    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
468    pub fn otherwise(self, _value: &Column) -> Column {
469        // This should not be called directly - when().otherwise() without .then() is not supported
470        // Users should use when(cond).then(val1).otherwise(val2)
471        panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
472    }
473}
474
475/// Builder for chaining when-then clauses before finalizing with otherwise
476pub struct ThenBuilder {
477    when_then: polars::prelude::Then, // The Polars WhenThen state
478}
479
480impl ThenBuilder {
481    fn new(when_then: polars::prelude::Then) -> Self {
482        ThenBuilder { when_then }
483    }
484
485    /// Chain an additional when-then clause
486    /// Note: Chaining multiple when-then clauses is not yet fully supported.
487    /// For now, use a single when().then().otherwise() pattern.
488    pub fn when(self, _condition: &Column) -> ThenBuilder {
489        // TODO: Implement proper chaining support
490        // For now, return self to allow compilation but chaining won't work correctly
491        self
492    }
493
494    /// Finalize the expression with the fallback value
495    pub fn otherwise(self, value: &Column) -> Column {
496        let expr = self.when_then.otherwise(value.expr().clone());
497        crate::column::Column::from_expr(expr, None)
498    }
499}
500
501/// Convert string column to uppercase (PySpark upper)
502pub fn upper(column: &Column) -> Column {
503    column.clone().upper()
504}
505
506/// Convert string column to lowercase (PySpark lower)
507pub fn lower(column: &Column) -> Column {
508    column.clone().lower()
509}
510
511/// Substring with 1-based start (PySpark substring semantics)
512pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
513    column.clone().substr(start, length)
514}
515
516/// String length in characters (PySpark length)
517pub fn length(column: &Column) -> Column {
518    column.clone().length()
519}
520
521/// Trim leading and trailing whitespace (PySpark trim)
522pub fn trim(column: &Column) -> Column {
523    column.clone().trim()
524}
525
526/// Trim leading whitespace (PySpark ltrim)
527pub fn ltrim(column: &Column) -> Column {
528    column.clone().ltrim()
529}
530
531/// Trim trailing whitespace (PySpark rtrim)
532pub fn rtrim(column: &Column) -> Column {
533    column.clone().rtrim()
534}
535
536/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
537pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
538    column.clone().btrim(trim_str)
539}
540
541/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
542pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
543    column.clone().locate(substr, pos)
544}
545
546/// Base conversion (PySpark conv). num from from_base to to_base.
547pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
548    column.clone().conv(from_base, to_base)
549}
550
551/// Convert to hex string (PySpark hex).
552pub fn hex(column: &Column) -> Column {
553    column.clone().hex()
554}
555
556/// Convert hex string to binary/string (PySpark unhex).
557pub fn unhex(column: &Column) -> Column {
558    column.clone().unhex()
559}
560
561/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
562pub fn encode(column: &Column, charset: &str) -> Column {
563    column.clone().encode(charset)
564}
565
566/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
567pub fn decode(column: &Column, charset: &str) -> Column {
568    column.clone().decode(charset)
569}
570
571/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
572pub fn to_binary(column: &Column, fmt: &str) -> Column {
573    column.clone().to_binary(fmt)
574}
575
576/// Try convert to binary; null on failure (PySpark try_to_binary).
577pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
578    column.clone().try_to_binary(fmt)
579}
580
581/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
582pub fn aes_encrypt(column: &Column, key: &str) -> Column {
583    column.clone().aes_encrypt(key)
584}
585
586/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
587pub fn aes_decrypt(column: &Column, key: &str) -> Column {
588    column.clone().aes_decrypt(key)
589}
590
591/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
592pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
593    column.clone().try_aes_decrypt(key)
594}
595
596/// Convert integer to binary string (PySpark bin).
597pub fn bin(column: &Column) -> Column {
598    column.clone().bin()
599}
600
601/// Get bit at 0-based position (PySpark getbit).
602pub fn getbit(column: &Column, pos: i64) -> Column {
603    column.clone().getbit(pos)
604}
605
606/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
607pub fn bit_and(left: &Column, right: &Column) -> Column {
608    left.clone().bit_and(right)
609}
610
611/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
612pub fn bit_or(left: &Column, right: &Column) -> Column {
613    left.clone().bit_or(right)
614}
615
616/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
617pub fn bit_xor(left: &Column, right: &Column) -> Column {
618    left.clone().bit_xor(right)
619}
620
621/// Count of set bits in the integer representation (PySpark bit_count).
622pub fn bit_count(column: &Column) -> Column {
623    column.clone().bit_count()
624}
625
626/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
627pub fn bitwise_not(column: &Column) -> Column {
628    column.clone().bitwise_not()
629}
630
631// --- Bitmap (PySpark 3.5+) ---
632
633/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
634pub fn bitmap_bit_position(column: &Column) -> Column {
635    use polars::prelude::DataType;
636    let expr = column.expr().clone().cast(DataType::Int32);
637    Column::from_expr(expr, None)
638}
639
640/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
641pub fn bitmap_bucket_number(column: &Column) -> Column {
642    use polars::prelude::DataType;
643    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
644    Column::from_expr(expr, None)
645}
646
647/// Count set bits in a bitmap binary column (PySpark bitmap_count).
648pub fn bitmap_count(column: &Column) -> Column {
649    use polars::prelude::{DataType, GetOutput};
650    let expr = column.expr().clone().map(
651        crate::udfs::apply_bitmap_count,
652        GetOutput::from_type(DataType::Int64),
653    );
654    Column::from_expr(expr, None)
655}
656
657/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
658/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
659pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
660    use polars::prelude::{DataType, GetOutput};
661    column.expr().clone().implode().map(
662        crate::udfs::apply_bitmap_construct_agg,
663        GetOutput::from_type(DataType::Binary),
664    )
665}
666
667/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
668pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
669    use polars::prelude::{DataType, GetOutput};
670    column.expr().clone().implode().map(
671        crate::udfs::apply_bitmap_or_agg,
672        GetOutput::from_type(DataType::Binary),
673    )
674}
675
676/// Alias for getbit (PySpark bit_get).
677pub fn bit_get(column: &Column, pos: i64) -> Column {
678    getbit(column, pos)
679}
680
681/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
682/// When err_msg is Some, it is used in the error message when assertion fails.
683pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
684    column.clone().assert_true(err_msg)
685}
686
687/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
688pub fn raise_error(message: &str) -> Column {
689    let msg = message.to_string();
690    let expr = lit(0i64).map(
691        move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
692            Err(PolarsError::ComputeError(msg.clone().into()))
693        },
694        GetOutput::from_type(DataType::Int64),
695    );
696    Column::from_expr(expr, Some("raise_error".to_string()))
697}
698
699/// Broadcast hint - no-op that returns the same DataFrame (PySpark broadcast).
700pub fn broadcast(df: &DataFrame) -> DataFrame {
701    df.clone()
702}
703
704/// Stub partition id - always 0 (PySpark spark_partition_id).
705pub fn spark_partition_id() -> Column {
706    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
707}
708
709/// Stub input file name - empty string (PySpark input_file_name).
710pub fn input_file_name() -> Column {
711    Column::from_expr(lit(""), Some("input_file_name".to_string()))
712}
713
714/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
715/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
716pub fn monotonically_increasing_id() -> Column {
717    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
718}
719
720/// Current catalog name stub (PySpark current_catalog).
721pub fn current_catalog() -> Column {
722    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
723}
724
725/// Current database/schema name stub (PySpark current_database).
726pub fn current_database() -> Column {
727    Column::from_expr(lit("default"), Some("current_database".to_string()))
728}
729
730/// Current schema name stub (PySpark current_schema).
731pub fn current_schema() -> Column {
732    Column::from_expr(lit("default"), Some("current_schema".to_string()))
733}
734
735/// Current user stub (PySpark current_user).
736pub fn current_user() -> Column {
737    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
738}
739
740/// User stub (PySpark user).
741pub fn user() -> Column {
742    Column::from_expr(lit("unknown"), Some("user".to_string()))
743}
744
745/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
746/// When added via with_column, generates one distinct value per row (PySpark-like).
747pub fn rand(seed: Option<u64>) -> Column {
748    Column::from_rand(seed)
749}
750
751/// Random standard normal per row, with optional seed (PySpark randn).
752/// When added via with_column, generates one distinct value per row (PySpark-like).
753pub fn randn(seed: Option<u64>) -> Column {
754    Column::from_randn(seed)
755}
756
757/// True if two arrays have any element in common (PySpark arrays_overlap).
758pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
759    left.clone().arrays_overlap(right)
760}
761
762/// Zip arrays into array of structs (PySpark arrays_zip).
763pub fn arrays_zip(left: &Column, right: &Column) -> Column {
764    left.clone().arrays_zip(right)
765}
766
767/// Explode; null/empty yields one row with null (PySpark explode_outer).
768pub fn explode_outer(column: &Column) -> Column {
769    column.clone().explode_outer()
770}
771
772/// Posexplode with null preservation (PySpark posexplode_outer).
773pub fn posexplode_outer(column: &Column) -> (Column, Column) {
774    column.clone().posexplode_outer()
775}
776
777/// Collect to array (PySpark array_agg).
778pub fn array_agg(column: &Column) -> Column {
779    column.clone().array_agg()
780}
781
782/// Transform map keys by expr (PySpark transform_keys).
783pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
784    column.clone().transform_keys(key_expr)
785}
786
787/// Transform map values by expr (PySpark transform_values).
788pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
789    column.clone().transform_values(value_expr)
790}
791
792/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
793pub fn str_to_map(
794    column: &Column,
795    pair_delim: Option<&str>,
796    key_value_delim: Option<&str>,
797) -> Column {
798    let pd = pair_delim.unwrap_or(",");
799    let kvd = key_value_delim.unwrap_or(":");
800    column.clone().str_to_map(pd, kvd)
801}
802
803/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
804pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
805    column.clone().regexp_extract(pattern, group_index)
806}
807
808/// Replace first match of regex (PySpark regexp_replace)
809pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
810    column.clone().regexp_replace(pattern, replacement)
811}
812
813/// Split string by delimiter (PySpark split)
814pub fn split(column: &Column, delimiter: &str) -> Column {
815    column.clone().split(delimiter)
816}
817
818/// Title case (PySpark initcap)
819pub fn initcap(column: &Column) -> Column {
820    column.clone().initcap()
821}
822
823/// Extract all matches of regex (PySpark regexp_extract_all).
824pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
825    column.clone().regexp_extract_all(pattern)
826}
827
828/// Check if string matches regex (PySpark regexp_like / rlike).
829pub fn regexp_like(column: &Column, pattern: &str) -> Column {
830    column.clone().regexp_like(pattern)
831}
832
833/// Count of non-overlapping regex matches (PySpark regexp_count).
834pub fn regexp_count(column: &Column, pattern: &str) -> Column {
835    column.clone().regexp_count(pattern)
836}
837
838/// First substring matching regex (PySpark regexp_substr). Null if no match.
839pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
840    column.clone().regexp_substr(pattern)
841}
842
843/// Split by delimiter and return 1-based part (PySpark split_part).
844pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
845    column.clone().split_part(delimiter, part_num)
846}
847
848/// 1-based position of first regex match (PySpark regexp_instr).
849pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
850    column.clone().regexp_instr(pattern, group_idx)
851}
852
853/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
854pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
855    str_column.clone().find_in_set(set_column)
856}
857
858/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
859pub fn format_string(format: &str, columns: &[&Column]) -> Column {
860    use polars::prelude::*;
861    if columns.is_empty() {
862        panic!("format_string needs at least one column");
863    }
864    let format_owned = format.to_string();
865    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
866    let expr = columns[0].expr().clone().map_many(
867        move |cols| crate::udfs::apply_format_string(cols, &format_owned),
868        &args,
869        GetOutput::from_type(DataType::String),
870    );
871    crate::column::Column::from_expr(expr, None)
872}
873
874/// Alias for format_string (PySpark printf).
875pub fn printf(format: &str, columns: &[&Column]) -> Column {
876    format_string(format, columns)
877}
878
879/// Repeat string n times (PySpark repeat).
880pub fn repeat(column: &Column, n: i32) -> Column {
881    column.clone().repeat(n)
882}
883
884/// Reverse string (PySpark reverse).
885pub fn reverse(column: &Column) -> Column {
886    column.clone().reverse()
887}
888
889/// Find substring position 1-based; 0 if not found (PySpark instr).
890pub fn instr(column: &Column, substr: &str) -> Column {
891    column.clone().instr(substr)
892}
893
894/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
895pub fn position(substr: &str, column: &Column) -> Column {
896    column.clone().instr(substr)
897}
898
899/// ASCII value of first character (PySpark ascii). Returns Int32.
900pub fn ascii(column: &Column) -> Column {
901    column.clone().ascii()
902}
903
904/// Format numeric as string with fixed decimal places (PySpark format_number).
905pub fn format_number(column: &Column, decimals: u32) -> Column {
906    column.clone().format_number(decimals)
907}
908
909/// Replace substring at 1-based position (PySpark overlay). replace is literal.
910pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
911    column.clone().overlay(replace, pos, length)
912}
913
914/// Int to single-character string (PySpark char). Valid codepoint only.
915pub fn char(column: &Column) -> Column {
916    column.clone().char()
917}
918
919/// Alias for char (PySpark chr).
920pub fn chr(column: &Column) -> Column {
921    column.clone().chr()
922}
923
924/// Base64 encode string bytes (PySpark base64).
925pub fn base64(column: &Column) -> Column {
926    column.clone().base64()
927}
928
929/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
930pub fn unbase64(column: &Column) -> Column {
931    column.clone().unbase64()
932}
933
934/// SHA1 hash of string bytes, return hex string (PySpark sha1).
935pub fn sha1(column: &Column) -> Column {
936    column.clone().sha1()
937}
938
939/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
940pub fn sha2(column: &Column, bit_length: i32) -> Column {
941    column.clone().sha2(bit_length)
942}
943
944/// MD5 hash of string bytes, return hex string (PySpark md5).
945pub fn md5(column: &Column) -> Column {
946    column.clone().md5()
947}
948
949/// Left-pad string to length with pad char (PySpark lpad).
950pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
951    column.clone().lpad(length, pad)
952}
953
954/// Right-pad string to length with pad char (PySpark rpad).
955pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
956    column.clone().rpad(length, pad)
957}
958
959/// Character-by-character translation (PySpark translate).
960pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
961    column.clone().translate(from_str, to_str)
962}
963
964/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
965pub fn mask(
966    column: &Column,
967    upper_char: Option<char>,
968    lower_char: Option<char>,
969    digit_char: Option<char>,
970    other_char: Option<char>,
971) -> Column {
972    column
973        .clone()
974        .mask(upper_char, lower_char, digit_char, other_char)
975}
976
977/// Substring before/after nth delimiter (PySpark substring_index).
978pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
979    column.clone().substring_index(delimiter, count)
980}
981
982/// Leftmost n characters (PySpark left).
983pub fn left(column: &Column, n: i64) -> Column {
984    column.clone().left(n)
985}
986
987/// Rightmost n characters (PySpark right).
988pub fn right(column: &Column, n: i64) -> Column {
989    column.clone().right(n)
990}
991
992/// Replace all occurrences of search with replacement (literal). PySpark replace.
993pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
994    column.clone().replace(search, replacement)
995}
996
997/// True if string starts with prefix (PySpark startswith).
998pub fn startswith(column: &Column, prefix: &str) -> Column {
999    column.clone().startswith(prefix)
1000}
1001
1002/// True if string ends with suffix (PySpark endswith).
1003pub fn endswith(column: &Column, suffix: &str) -> Column {
1004    column.clone().endswith(suffix)
1005}
1006
1007/// True if string contains substring (literal). PySpark contains.
1008pub fn contains(column: &Column, substring: &str) -> Column {
1009    column.clone().contains(substring)
1010}
1011
1012/// SQL LIKE pattern (% any, _ one char). PySpark like.
1013/// When escape_char is Some(esc), esc + char treats that char as literal.
1014pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1015    column.clone().like(pattern, escape_char)
1016}
1017
1018/// Case-insensitive LIKE. PySpark ilike.
1019/// When escape_char is Some(esc), esc + char treats that char as literal.
1020pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1021    column.clone().ilike(pattern, escape_char)
1022}
1023
1024/// Alias for regexp_like. PySpark rlike / regexp.
1025pub fn rlike(column: &Column, pattern: &str) -> Column {
1026    column.clone().regexp_like(pattern)
1027}
1028
1029/// Alias for rlike (PySpark regexp).
1030pub fn regexp(column: &Column, pattern: &str) -> Column {
1031    rlike(column, pattern)
1032}
1033
1034/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1035pub fn soundex(column: &Column) -> Column {
1036    column.clone().soundex()
1037}
1038
1039/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1040pub fn levenshtein(column: &Column, other: &Column) -> Column {
1041    column.clone().levenshtein(other)
1042}
1043
1044/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1045pub fn crc32(column: &Column) -> Column {
1046    column.clone().crc32()
1047}
1048
1049/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1050pub fn xxhash64(column: &Column) -> Column {
1051    column.clone().xxhash64()
1052}
1053
1054/// Absolute value (PySpark abs)
1055pub fn abs(column: &Column) -> Column {
1056    column.clone().abs()
1057}
1058
1059/// Ceiling (PySpark ceil)
1060pub fn ceil(column: &Column) -> Column {
1061    column.clone().ceil()
1062}
1063
1064/// Floor (PySpark floor)
1065pub fn floor(column: &Column) -> Column {
1066    column.clone().floor()
1067}
1068
1069/// Round (PySpark round)
1070pub fn round(column: &Column, decimals: u32) -> Column {
1071    column.clone().round(decimals)
1072}
1073
1074/// Banker's rounding - round half to even (PySpark bround).
1075pub fn bround(column: &Column, scale: i32) -> Column {
1076    column.clone().bround(scale)
1077}
1078
1079/// Unary minus / negate (PySpark negate, negative).
1080pub fn negate(column: &Column) -> Column {
1081    column.clone().negate()
1082}
1083
1084/// Alias for negate. PySpark negative.
1085pub fn negative(column: &Column) -> Column {
1086    negate(column)
1087}
1088
1089/// Unary plus - no-op, returns column as-is (PySpark positive).
1090pub fn positive(column: &Column) -> Column {
1091    column.clone()
1092}
1093
1094/// Cotangent: 1/tan (PySpark cot).
1095pub fn cot(column: &Column) -> Column {
1096    column.clone().cot()
1097}
1098
1099/// Cosecant: 1/sin (PySpark csc).
1100pub fn csc(column: &Column) -> Column {
1101    column.clone().csc()
1102}
1103
1104/// Secant: 1/cos (PySpark sec).
1105pub fn sec(column: &Column) -> Column {
1106    column.clone().sec()
1107}
1108
1109/// Constant e = 2.718... (PySpark e).
1110pub fn e() -> Column {
1111    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1112}
1113
1114/// Constant pi = 3.14159... (PySpark pi).
1115pub fn pi() -> Column {
1116    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1117}
1118
1119/// Square root (PySpark sqrt)
1120pub fn sqrt(column: &Column) -> Column {
1121    column.clone().sqrt()
1122}
1123
1124/// Power (PySpark pow)
1125pub fn pow(column: &Column, exp: i64) -> Column {
1126    column.clone().pow(exp)
1127}
1128
1129/// Exponential (PySpark exp)
1130pub fn exp(column: &Column) -> Column {
1131    column.clone().exp()
1132}
1133
1134/// Natural logarithm (PySpark log with one arg)
1135pub fn log(column: &Column) -> Column {
1136    column.clone().log()
1137}
1138
1139/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1140pub fn log_with_base(column: &Column, base: f64) -> Column {
1141    crate::column::Column::from_expr(column.expr().clone().log(base), None)
1142}
1143
1144/// Sine in radians (PySpark sin)
1145pub fn sin(column: &Column) -> Column {
1146    column.clone().sin()
1147}
1148
1149/// Cosine in radians (PySpark cos)
1150pub fn cos(column: &Column) -> Column {
1151    column.clone().cos()
1152}
1153
1154/// Tangent in radians (PySpark tan)
1155pub fn tan(column: &Column) -> Column {
1156    column.clone().tan()
1157}
1158
1159/// Arc sine (PySpark asin)
1160pub fn asin(column: &Column) -> Column {
1161    column.clone().asin()
1162}
1163
1164/// Arc cosine (PySpark acos)
1165pub fn acos(column: &Column) -> Column {
1166    column.clone().acos()
1167}
1168
1169/// Arc tangent (PySpark atan)
1170pub fn atan(column: &Column) -> Column {
1171    column.clone().atan()
1172}
1173
1174/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1175pub fn atan2(y: &Column, x: &Column) -> Column {
1176    y.clone().atan2(x)
1177}
1178
1179/// Convert radians to degrees (PySpark degrees)
1180pub fn degrees(column: &Column) -> Column {
1181    column.clone().degrees()
1182}
1183
1184/// Convert degrees to radians (PySpark radians)
1185pub fn radians(column: &Column) -> Column {
1186    column.clone().radians()
1187}
1188
1189/// Sign of the number: -1, 0, or 1 (PySpark signum)
1190pub fn signum(column: &Column) -> Column {
1191    column.clone().signum()
1192}
1193
1194/// Alias for signum (PySpark sign).
1195pub fn sign(column: &Column) -> Column {
1196    signum(column)
1197}
1198
1199/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1200pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1201    let dtype = parse_type_name(type_name)?;
1202    Ok(Column::from_expr(
1203        column.expr().clone().strict_cast(dtype),
1204        None,
1205    ))
1206}
1207
1208/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1209pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1210    let dtype = parse_type_name(type_name)?;
1211    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1212}
1213
1214/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1215/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1216/// Panics if the cast to string fails (invalid type name or unsupported column type).
1217pub fn to_char(column: &Column, format: Option<&str>) -> Column {
1218    match format {
1219        Some(fmt) => column
1220            .clone()
1221            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt)),
1222        None => {
1223            cast(column, "string").expect("to_char: cast to string failed; use a valid column type")
1224        }
1225    }
1226}
1227
1228/// Alias for to_char (PySpark to_varchar).
1229pub fn to_varchar(column: &Column, format: Option<&str>) -> Column {
1230    to_char(column, format)
1231}
1232
1233/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1234/// Panics if the cast to double fails (invalid type name or unsupported column type).
1235pub fn to_number(column: &Column, _format: Option<&str>) -> Column {
1236    cast(column, "double").expect("to_number: cast to double failed; use a valid column type")
1237}
1238
1239/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1240/// Panics if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1241pub fn try_to_number(column: &Column, _format: Option<&str>) -> Column {
1242    try_cast(column, "double")
1243        .expect("try_to_number: try_cast to double failed; use a valid type name")
1244}
1245
1246/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1247pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1248    use polars::prelude::{DataType, GetOutput, TimeUnit};
1249    match format {
1250        None => crate::cast(column, "timestamp"),
1251        Some(fmt) => {
1252            let fmt_owned = fmt.to_string();
1253            let expr = column.expr().clone().map(
1254                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), true),
1255                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1256            );
1257            Ok(crate::column::Column::from_expr(expr, None))
1258        }
1259    }
1260}
1261
1262/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1263/// Panics if the try_cast setup fails (invalid type name) when format is None.
1264pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Column {
1265    use polars::prelude::*;
1266    match format {
1267        None => try_cast(column, "timestamp")
1268            .expect("try_to_timestamp: try_cast to timestamp failed; use a valid type name"),
1269        Some(fmt) => {
1270            let fmt_owned = fmt.to_string();
1271            let expr = column.expr().clone().map(
1272                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), false),
1273                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1274            );
1275            crate::column::Column::from_expr(expr, None)
1276        }
1277    }
1278}
1279
1280/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1281pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1282    use polars::prelude::{DataType, GetOutput, TimeUnit};
1283    match format {
1284        None => crate::cast(column, "timestamp"),
1285        Some(fmt) => {
1286            let fmt_owned = fmt.to_string();
1287            let expr = column.expr().clone().map(
1288                move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1289                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1290            );
1291            Ok(crate::column::Column::from_expr(expr, None))
1292        }
1293    }
1294}
1295
1296/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1297pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1298    use polars::prelude::{DataType, GetOutput, TimeUnit};
1299    match format {
1300        None => crate::cast(column, "timestamp"),
1301        Some(fmt) => {
1302            let fmt_owned = fmt.to_string();
1303            let expr = column.expr().clone().map(
1304                move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1305                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1306            );
1307            Ok(crate::column::Column::from_expr(expr, None))
1308        }
1309    }
1310}
1311
1312/// Division that returns null on divide-by-zero (PySpark try_divide).
1313pub fn try_divide(left: &Column, right: &Column) -> Column {
1314    use polars::prelude::*;
1315    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1316    let null_expr = Expr::Literal(LiteralValue::Null);
1317    let div_expr =
1318        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1319    let expr = polars::prelude::when(zero_cond)
1320        .then(null_expr)
1321        .otherwise(div_expr);
1322    crate::column::Column::from_expr(expr, None)
1323}
1324
1325/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1326pub fn try_add(left: &Column, right: &Column) -> Column {
1327    let args = [right.expr().clone()];
1328    let expr =
1329        left.expr()
1330            .clone()
1331            .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1332    Column::from_expr(expr, None)
1333}
1334
1335/// Subtract that returns null on overflow (PySpark try_subtract).
1336pub fn try_subtract(left: &Column, right: &Column) -> Column {
1337    let args = [right.expr().clone()];
1338    let expr = left.expr().clone().map_many(
1339        crate::udfs::apply_try_subtract,
1340        &args,
1341        GetOutput::same_type(),
1342    );
1343    Column::from_expr(expr, None)
1344}
1345
1346/// Multiply that returns null on overflow (PySpark try_multiply).
1347pub fn try_multiply(left: &Column, right: &Column) -> Column {
1348    let args = [right.expr().clone()];
1349    let expr = left.expr().clone().map_many(
1350        crate::udfs::apply_try_multiply,
1351        &args,
1352        GetOutput::same_type(),
1353    );
1354    Column::from_expr(expr, None)
1355}
1356
1357/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1358pub fn try_element_at(column: &Column, index: i64) -> Column {
1359    column.clone().element_at(index)
1360}
1361
1362/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1363pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1364    if num_bucket <= 0 {
1365        panic!(
1366            "width_bucket: num_bucket must be positive, got {}",
1367            num_bucket
1368        );
1369    }
1370    use polars::prelude::*;
1371    let v = value.expr().clone().cast(DataType::Float64);
1372    let min_expr = lit(min_val);
1373    let max_expr = lit(max_val);
1374    let nb = num_bucket as f64;
1375    let width = (max_val - min_val) / nb;
1376    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1377    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1378    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1379    let expr = polars::prelude::when(v.clone().lt(min_expr))
1380        .then(lit(0i64))
1381        .when(v.gt_eq(max_expr))
1382        .then(lit(num_bucket + 1))
1383        .otherwise(bucket_clamped);
1384    crate::column::Column::from_expr(expr, None)
1385}
1386
1387/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1388pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1389    use polars::prelude::*;
1390    if columns.is_empty() {
1391        panic!("elt requires at least one column");
1392    }
1393    let idx_expr = index.expr().clone();
1394    let null_expr = Expr::Literal(LiteralValue::Null);
1395    let mut expr = null_expr;
1396    for (i, c) in columns.iter().enumerate().rev() {
1397        let n = (i + 1) as i64;
1398        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1399            .then(c.expr().clone())
1400            .otherwise(expr);
1401    }
1402    crate::column::Column::from_expr(expr, None)
1403}
1404
1405/// Bit length of string (bytes * 8) (PySpark bit_length).
1406pub fn bit_length(column: &Column) -> Column {
1407    column.clone().bit_length()
1408}
1409
1410/// Length of string in bytes (PySpark octet_length).
1411pub fn octet_length(column: &Column) -> Column {
1412    column.clone().octet_length()
1413}
1414
1415/// Length of string in characters (PySpark char_length). Alias of length().
1416pub fn char_length(column: &Column) -> Column {
1417    column.clone().char_length()
1418}
1419
1420/// Length of string in characters (PySpark character_length). Alias of length().
1421pub fn character_length(column: &Column) -> Column {
1422    column.clone().character_length()
1423}
1424
1425/// Data type of column as string (PySpark typeof). Constant per column from schema.
1426pub fn typeof_(column: &Column) -> Column {
1427    column.clone().typeof_()
1428}
1429
1430/// True where the float value is NaN (PySpark isnan).
1431pub fn isnan(column: &Column) -> Column {
1432    column.clone().is_nan()
1433}
1434
1435/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1436pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1437    if columns.is_empty() {
1438        return Err("greatest requires at least one column".to_string());
1439    }
1440    if columns.len() == 1 {
1441        return Ok((*columns[0]).clone());
1442    }
1443    let mut expr = columns[0].expr().clone();
1444    for c in columns.iter().skip(1) {
1445        let args = [c.expr().clone()];
1446        expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1447    }
1448    Ok(Column::from_expr(expr, None))
1449}
1450
1451/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1452pub fn least(columns: &[&Column]) -> Result<Column, String> {
1453    if columns.is_empty() {
1454        return Err("least requires at least one column".to_string());
1455    }
1456    if columns.len() == 1 {
1457        return Ok((*columns[0]).clone());
1458    }
1459    let mut expr = columns[0].expr().clone();
1460    for c in columns.iter().skip(1) {
1461        let args = [c.expr().clone()];
1462        expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1463    }
1464    Ok(Column::from_expr(expr, None))
1465}
1466
1467/// Extract year from datetime column (PySpark year)
1468pub fn year(column: &Column) -> Column {
1469    column.clone().year()
1470}
1471
1472/// Extract month from datetime column (PySpark month)
1473pub fn month(column: &Column) -> Column {
1474    column.clone().month()
1475}
1476
1477/// Extract day of month from datetime column (PySpark day)
1478pub fn day(column: &Column) -> Column {
1479    column.clone().day()
1480}
1481
1482/// Cast to date (PySpark to_date)
1483pub fn to_date(column: &Column) -> Column {
1484    column.clone().to_date()
1485}
1486
1487/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1488pub fn date_format(column: &Column, format: &str) -> Column {
1489    column
1490        .clone()
1491        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1492}
1493
1494/// Current date (evaluation time). PySpark current_date.
1495pub fn current_date() -> Column {
1496    use polars::prelude::*;
1497    let today = chrono::Utc::now().date_naive();
1498    let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1499    crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1500}
1501
1502/// Current timestamp (evaluation time). PySpark current_timestamp.
1503pub fn current_timestamp() -> Column {
1504    use polars::prelude::*;
1505    let ts = chrono::Utc::now().timestamp_micros();
1506    crate::column::Column::from_expr(
1507        Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1508        None,
1509    )
1510}
1511
1512/// Alias for current_date (PySpark curdate).
1513pub fn curdate() -> Column {
1514    current_date()
1515}
1516
1517/// Alias for current_timestamp (PySpark now).
1518pub fn now() -> Column {
1519    current_timestamp()
1520}
1521
1522/// Alias for current_timestamp (PySpark localtimestamp).
1523pub fn localtimestamp() -> Column {
1524    current_timestamp()
1525}
1526
1527/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1528pub fn date_diff(end: &Column, start: &Column) -> Column {
1529    datediff(end, start)
1530}
1531
1532/// Alias for date_add (PySpark dateadd).
1533pub fn dateadd(column: &Column, n: i32) -> Column {
1534    date_add(column, n)
1535}
1536
1537/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1538pub fn extract(column: &Column, field: &str) -> Column {
1539    column.clone().extract(field)
1540}
1541
1542/// Alias for extract (PySpark date_part).
1543pub fn date_part(column: &Column, field: &str) -> Column {
1544    extract(column, field)
1545}
1546
1547/// Alias for extract (PySpark datepart).
1548pub fn datepart(column: &Column, field: &str) -> Column {
1549    extract(column, field)
1550}
1551
1552/// Timestamp to microseconds since epoch (PySpark unix_micros).
1553pub fn unix_micros(column: &Column) -> Column {
1554    column.clone().unix_micros()
1555}
1556
1557/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1558pub fn unix_millis(column: &Column) -> Column {
1559    column.clone().unix_millis()
1560}
1561
1562/// Timestamp to seconds since epoch (PySpark unix_seconds).
1563pub fn unix_seconds(column: &Column) -> Column {
1564    column.clone().unix_seconds()
1565}
1566
1567/// Weekday name "Mon","Tue",... (PySpark dayname).
1568pub fn dayname(column: &Column) -> Column {
1569    column.clone().dayname()
1570}
1571
1572/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1573pub fn weekday(column: &Column) -> Column {
1574    column.clone().weekday()
1575}
1576
1577/// Extract hour from datetime column (PySpark hour).
1578pub fn hour(column: &Column) -> Column {
1579    column.clone().hour()
1580}
1581
1582/// Extract minute from datetime column (PySpark minute).
1583pub fn minute(column: &Column) -> Column {
1584    column.clone().minute()
1585}
1586
1587/// Extract second from datetime column (PySpark second).
1588pub fn second(column: &Column) -> Column {
1589    column.clone().second()
1590}
1591
1592/// Add n days to date column (PySpark date_add).
1593pub fn date_add(column: &Column, n: i32) -> Column {
1594    column.clone().date_add(n)
1595}
1596
1597/// Subtract n days from date column (PySpark date_sub).
1598pub fn date_sub(column: &Column, n: i32) -> Column {
1599    column.clone().date_sub(n)
1600}
1601
1602/// Number of days between two date columns (PySpark datediff).
1603pub fn datediff(end: &Column, start: &Column) -> Column {
1604    start.clone().datediff(end)
1605}
1606
1607/// Last day of month for date column (PySpark last_day).
1608pub fn last_day(column: &Column) -> Column {
1609    column.clone().last_day()
1610}
1611
1612/// Truncate date/datetime to unit (PySpark trunc).
1613pub fn trunc(column: &Column, format: &str) -> Column {
1614    column.clone().trunc(format)
1615}
1616
1617/// Alias for trunc (PySpark date_trunc).
1618pub fn date_trunc(format: &str, column: &Column) -> Column {
1619    trunc(column, format)
1620}
1621
1622/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1623pub fn quarter(column: &Column) -> Column {
1624    column.clone().quarter()
1625}
1626
1627/// Extract ISO week of year (1-53) (PySpark weekofyear).
1628pub fn weekofyear(column: &Column) -> Column {
1629    column.clone().weekofyear()
1630}
1631
1632/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1633pub fn dayofweek(column: &Column) -> Column {
1634    column.clone().dayofweek()
1635}
1636
1637/// Extract day of year (1-366) (PySpark dayofyear).
1638pub fn dayofyear(column: &Column) -> Column {
1639    column.clone().dayofyear()
1640}
1641
1642/// Add n months to date column (PySpark add_months).
1643pub fn add_months(column: &Column, n: i32) -> Column {
1644    column.clone().add_months(n)
1645}
1646
1647/// Months between end and start dates as fractional (PySpark months_between).
1648/// When round_off is true, rounds to 8 decimal places (PySpark default).
1649pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1650    end.clone().months_between(start, round_off)
1651}
1652
1653/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
1654pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1655    column.clone().next_day(day_of_week)
1656}
1657
1658/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
1659pub fn unix_timestamp_now() -> Column {
1660    use polars::prelude::*;
1661    let secs = chrono::Utc::now().timestamp();
1662    crate::column::Column::from_expr(lit(secs), None)
1663}
1664
1665/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
1666pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1667    column.clone().unix_timestamp(format)
1668}
1669
1670/// Alias for unix_timestamp.
1671pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1672    unix_timestamp(column, format)
1673}
1674
1675/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1676pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1677    column.clone().from_unixtime(format)
1678}
1679
1680/// Build date from year, month, day columns (PySpark make_date).
1681pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1682    use polars::prelude::*;
1683    let args = [month.expr().clone(), day.expr().clone()];
1684    let expr = year.expr().clone().map_many(
1685        crate::udfs::apply_make_date,
1686        &args,
1687        GetOutput::from_type(DataType::Date),
1688    );
1689    crate::column::Column::from_expr(expr, None)
1690}
1691
1692/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
1693/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
1694pub fn make_timestamp(
1695    year: &Column,
1696    month: &Column,
1697    day: &Column,
1698    hour: &Column,
1699    minute: &Column,
1700    sec: &Column,
1701    timezone: Option<&str>,
1702) -> Column {
1703    use polars::prelude::*;
1704    let tz_owned = timezone.map(|s| s.to_string());
1705    let args = [
1706        month.expr().clone(),
1707        day.expr().clone(),
1708        hour.expr().clone(),
1709        minute.expr().clone(),
1710        sec.expr().clone(),
1711    ];
1712    let expr = year.expr().clone().map_many(
1713        move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1714        &args,
1715        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1716    );
1717    crate::column::Column::from_expr(expr, None)
1718}
1719
1720/// Add amount of unit to timestamp (PySpark timestampadd).
1721pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1722    ts.clone().timestampadd(unit, amount)
1723}
1724
1725/// Difference between timestamps in unit (PySpark timestampdiff).
1726pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1727    start.clone().timestampdiff(unit, end)
1728}
1729
1730/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
1731pub fn days(n: i64) -> Column {
1732    make_interval(0, 0, 0, n, 0, 0, 0)
1733}
1734
1735/// Interval of n hours (PySpark hours).
1736pub fn hours(n: i64) -> Column {
1737    make_interval(0, 0, 0, 0, n, 0, 0)
1738}
1739
1740/// Interval of n minutes (PySpark minutes).
1741pub fn minutes(n: i64) -> Column {
1742    make_interval(0, 0, 0, 0, 0, n, 0)
1743}
1744
1745/// Interval of n months (PySpark months). Approximated as 30*n days.
1746pub fn months(n: i64) -> Column {
1747    make_interval(0, n, 0, 0, 0, 0, 0)
1748}
1749
1750/// Interval of n years (PySpark years). Approximated as 365*n days.
1751pub fn years(n: i64) -> Column {
1752    make_interval(n, 0, 0, 0, 0, 0, 0)
1753}
1754
1755/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
1756pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1757    column.clone().from_utc_timestamp(tz)
1758}
1759
1760/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1761pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1762    column.clone().to_utc_timestamp(tz)
1763}
1764
1765/// Convert timestamp between timezones (PySpark convert_timezone).
1766pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1767    let source_tz = source_tz.to_string();
1768    let target_tz = target_tz.to_string();
1769    let expr = column.expr().clone().map(
1770        move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
1771        GetOutput::same_type(),
1772    );
1773    crate::column::Column::from_expr(expr, None)
1774}
1775
1776/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
1777pub fn current_timezone() -> Column {
1778    use polars::prelude::*;
1779    crate::column::Column::from_expr(lit("UTC"), None)
1780}
1781
1782/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
1783pub fn make_interval(
1784    years: i64,
1785    months: i64,
1786    weeks: i64,
1787    days: i64,
1788    hours: i64,
1789    mins: i64,
1790    secs: i64,
1791) -> Column {
1792    use polars::prelude::*;
1793    // Approximate: 1 year = 365 days, 1 month = 30 days
1794    let total_days = years * 365 + months * 30 + weeks * 7 + days;
1795    let args = DurationArgs::new()
1796        .with_days(lit(total_days))
1797        .with_hours(lit(hours))
1798        .with_minutes(lit(mins))
1799        .with_seconds(lit(secs));
1800    let dur = duration(args);
1801    crate::column::Column::from_expr(dur, None)
1802}
1803
1804/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
1805pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
1806    use polars::prelude::*;
1807    let args = DurationArgs::new()
1808        .with_days(lit(days))
1809        .with_hours(lit(hours))
1810        .with_minutes(lit(minutes))
1811        .with_seconds(lit(seconds));
1812    let dur = duration(args);
1813    crate::column::Column::from_expr(dur, None)
1814}
1815
1816/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
1817pub fn make_ym_interval(years: i32, months: i32) -> Column {
1818    use polars::prelude::*;
1819    let total_months = years * 12 + months;
1820    crate::column::Column::from_expr(lit(total_months), None)
1821}
1822
1823/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
1824pub fn make_timestamp_ntz(
1825    year: &Column,
1826    month: &Column,
1827    day: &Column,
1828    hour: &Column,
1829    minute: &Column,
1830    sec: &Column,
1831) -> Column {
1832    make_timestamp(year, month, day, hour, minute, sec, None)
1833}
1834
1835/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
1836pub fn timestamp_seconds(column: &Column) -> Column {
1837    column.clone().timestamp_seconds()
1838}
1839
1840/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
1841pub fn timestamp_millis(column: &Column) -> Column {
1842    column.clone().timestamp_millis()
1843}
1844
1845/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
1846pub fn timestamp_micros(column: &Column) -> Column {
1847    column.clone().timestamp_micros()
1848}
1849
1850/// Date to days since 1970-01-01 (PySpark unix_date).
1851pub fn unix_date(column: &Column) -> Column {
1852    column.clone().unix_date()
1853}
1854
1855/// Days since epoch to date (PySpark date_from_unix_date).
1856pub fn date_from_unix_date(column: &Column) -> Column {
1857    column.clone().date_from_unix_date()
1858}
1859
1860/// Positive modulus (PySpark pmod).
1861pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
1862    dividend.clone().pmod(divisor)
1863}
1864
1865/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
1866pub fn factorial(column: &Column) -> Column {
1867    column.clone().factorial()
1868}
1869
1870/// Concatenate string columns without separator (PySpark concat)
1871pub fn concat(columns: &[&Column]) -> Column {
1872    use polars::prelude::*;
1873    if columns.is_empty() {
1874        panic!("concat requires at least one column");
1875    }
1876    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1877    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
1878}
1879
1880/// Concatenate string columns with separator (PySpark concat_ws)
1881pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
1882    use polars::prelude::*;
1883    if columns.is_empty() {
1884        panic!("concat_ws requires at least one column");
1885    }
1886    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1887    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
1888}
1889
1890/// Row number window function (1, 2, 3 by order within partition).
1891/// Use with `.over(partition_by)` after ranking by an order column.
1892///
1893/// # Example
1894/// ```
1895/// use robin_sparkless::{col, Column};
1896/// let salary_col = col("salary");
1897/// let rn = salary_col.row_number(true).over(&["dept"]);
1898/// ```
1899pub fn row_number(column: &Column) -> Column {
1900    column.clone().row_number(false)
1901}
1902
1903/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
1904pub fn rank(column: &Column, descending: bool) -> Column {
1905    column.clone().rank(descending)
1906}
1907
1908/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
1909pub fn dense_rank(column: &Column, descending: bool) -> Column {
1910    column.clone().dense_rank(descending)
1911}
1912
1913/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
1914pub fn lag(column: &Column, n: i64) -> Column {
1915    column.clone().lag(n)
1916}
1917
1918/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
1919pub fn lead(column: &Column, n: i64) -> Column {
1920    column.clone().lead(n)
1921}
1922
1923/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
1924pub fn first_value(column: &Column) -> Column {
1925    column.clone().first_value()
1926}
1927
1928/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
1929pub fn last_value(column: &Column) -> Column {
1930    column.clone().last_value()
1931}
1932
1933/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
1934pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
1935    column.clone().percent_rank(partition_by, descending)
1936}
1937
1938/// Cumulative distribution in partition: row_number / count. Window is applied.
1939pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
1940    column.clone().cume_dist(partition_by, descending)
1941}
1942
1943/// Ntile: bucket 1..n by rank within partition. Window is applied.
1944pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
1945    column.clone().ntile(n, partition_by, descending)
1946}
1947
1948/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
1949pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
1950    column.clone().nth_value(n, partition_by, descending)
1951}
1952
1953/// Coalesce - returns the first non-null value from multiple columns.
1954///
1955/// # Example
1956/// ```
1957/// use robin_sparkless::{col, lit_i64, coalesce};
1958///
1959/// // coalesce(col("a"), col("b"), lit(0))
1960/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
1961/// ```
1962pub fn coalesce(columns: &[&Column]) -> Column {
1963    use polars::prelude::*;
1964    if columns.is_empty() {
1965        panic!("coalesce requires at least one column");
1966    }
1967    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1968    let expr = coalesce(&exprs);
1969    crate::column::Column::from_expr(expr, None)
1970}
1971
1972/// Alias for coalesce(col, value). PySpark nvl / ifnull.
1973pub fn nvl(column: &Column, value: &Column) -> Column {
1974    coalesce(&[column, value])
1975}
1976
1977/// Alias for nvl. PySpark ifnull.
1978pub fn ifnull(column: &Column, value: &Column) -> Column {
1979    nvl(column, value)
1980}
1981
1982/// Return null if column equals value, else column. PySpark nullif.
1983pub fn nullif(column: &Column, value: &Column) -> Column {
1984    use polars::prelude::*;
1985    let cond = column.expr().clone().eq(value.expr().clone());
1986    let null_lit = Expr::Literal(LiteralValue::Null);
1987    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
1988    crate::column::Column::from_expr(expr, None)
1989}
1990
1991/// Replace NaN with value. PySpark nanvl.
1992pub fn nanvl(column: &Column, value: &Column) -> Column {
1993    use polars::prelude::*;
1994    let cond = column.expr().clone().is_nan();
1995    let expr = when(cond)
1996        .then(value.expr().clone())
1997        .otherwise(column.expr().clone());
1998    crate::column::Column::from_expr(expr, None)
1999}
2000
2001/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
2002pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2003    use polars::prelude::*;
2004    let cond = col1.expr().clone().is_not_null();
2005    let expr = when(cond)
2006        .then(col2.expr().clone())
2007        .otherwise(col3.expr().clone());
2008    crate::column::Column::from_expr(expr, None)
2009}
2010
2011/// Alias for substring. PySpark substr.
2012pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2013    substring(column, start, length)
2014}
2015
2016/// Alias for pow. PySpark power.
2017pub fn power(column: &Column, exp: i64) -> Column {
2018    pow(column, exp)
2019}
2020
2021/// Alias for log (natural log). PySpark ln.
2022pub fn ln(column: &Column) -> Column {
2023    log(column)
2024}
2025
2026/// Alias for ceil. PySpark ceiling.
2027pub fn ceiling(column: &Column) -> Column {
2028    ceil(column)
2029}
2030
2031/// Alias for lower. PySpark lcase.
2032pub fn lcase(column: &Column) -> Column {
2033    lower(column)
2034}
2035
2036/// Alias for upper. PySpark ucase.
2037pub fn ucase(column: &Column) -> Column {
2038    upper(column)
2039}
2040
2041/// Alias for day. PySpark dayofmonth.
2042pub fn dayofmonth(column: &Column) -> Column {
2043    day(column)
2044}
2045
2046/// Alias for degrees. PySpark toDegrees.
2047pub fn to_degrees(column: &Column) -> Column {
2048    degrees(column)
2049}
2050
2051/// Alias for radians. PySpark toRadians.
2052pub fn to_radians(column: &Column) -> Column {
2053    radians(column)
2054}
2055
2056/// Hyperbolic cosine (PySpark cosh).
2057pub fn cosh(column: &Column) -> Column {
2058    column.clone().cosh()
2059}
2060/// Hyperbolic sine (PySpark sinh).
2061pub fn sinh(column: &Column) -> Column {
2062    column.clone().sinh()
2063}
2064/// Hyperbolic tangent (PySpark tanh).
2065pub fn tanh(column: &Column) -> Column {
2066    column.clone().tanh()
2067}
2068/// Inverse hyperbolic cosine (PySpark acosh).
2069pub fn acosh(column: &Column) -> Column {
2070    column.clone().acosh()
2071}
2072/// Inverse hyperbolic sine (PySpark asinh).
2073pub fn asinh(column: &Column) -> Column {
2074    column.clone().asinh()
2075}
2076/// Inverse hyperbolic tangent (PySpark atanh).
2077pub fn atanh(column: &Column) -> Column {
2078    column.clone().atanh()
2079}
2080/// Cube root (PySpark cbrt).
2081pub fn cbrt(column: &Column) -> Column {
2082    column.clone().cbrt()
2083}
2084/// exp(x) - 1 (PySpark expm1).
2085pub fn expm1(column: &Column) -> Column {
2086    column.clone().expm1()
2087}
2088/// log(1 + x) (PySpark log1p).
2089pub fn log1p(column: &Column) -> Column {
2090    column.clone().log1p()
2091}
2092/// Base-10 log (PySpark log10).
2093pub fn log10(column: &Column) -> Column {
2094    column.clone().log10()
2095}
2096/// Base-2 log (PySpark log2).
2097pub fn log2(column: &Column) -> Column {
2098    column.clone().log2()
2099}
2100/// Round to nearest integer (PySpark rint).
2101pub fn rint(column: &Column) -> Column {
2102    column.clone().rint()
2103}
2104/// sqrt(x*x + y*y) (PySpark hypot).
2105pub fn hypot(x: &Column, y: &Column) -> Column {
2106    let xx = x.expr().clone() * x.expr().clone();
2107    let yy = y.expr().clone() * y.expr().clone();
2108    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2109}
2110
2111/// True if column is null. PySpark isnull.
2112pub fn isnull(column: &Column) -> Column {
2113    column.clone().is_null()
2114}
2115
2116/// True if column is not null. PySpark isnotnull.
2117pub fn isnotnull(column: &Column) -> Column {
2118    column.clone().is_not_null()
2119}
2120
2121/// Create an array column from multiple columns (PySpark array).
2122pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2123    use polars::prelude::*;
2124    if columns.is_empty() {
2125        panic!("array requires at least one column");
2126    }
2127    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2128    let expr = concat_list(exprs)
2129        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2130    Ok(crate::column::Column::from_expr(expr, None))
2131}
2132
2133/// Number of elements in list (PySpark size / array_size). Returns Int32.
2134pub fn array_size(column: &Column) -> Column {
2135    column.clone().array_size()
2136}
2137
2138/// Alias for array_size (PySpark size).
2139pub fn size(column: &Column) -> Column {
2140    column.clone().array_size()
2141}
2142
2143/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2144pub fn cardinality(column: &Column) -> Column {
2145    column.clone().cardinality()
2146}
2147
2148/// Check if list contains value (PySpark array_contains).
2149pub fn array_contains(column: &Column, value: &Column) -> Column {
2150    column.clone().array_contains(value.expr().clone())
2151}
2152
2153/// Join list of strings with separator (PySpark array_join).
2154pub fn array_join(column: &Column, separator: &str) -> Column {
2155    column.clone().array_join(separator)
2156}
2157
2158/// Maximum element in list (PySpark array_max).
2159pub fn array_max(column: &Column) -> Column {
2160    column.clone().array_max()
2161}
2162
2163/// Minimum element in list (PySpark array_min).
2164pub fn array_min(column: &Column) -> Column {
2165    column.clone().array_min()
2166}
2167
2168/// Get element at 1-based index (PySpark element_at).
2169pub fn element_at(column: &Column, index: i64) -> Column {
2170    column.clone().element_at(index)
2171}
2172
2173/// Sort list elements (PySpark array_sort).
2174pub fn array_sort(column: &Column) -> Column {
2175    column.clone().array_sort()
2176}
2177
2178/// Distinct elements in list (PySpark array_distinct).
2179pub fn array_distinct(column: &Column) -> Column {
2180    column.clone().array_distinct()
2181}
2182
2183/// Slice list from 1-based start with optional length (PySpark slice).
2184pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2185    column.clone().array_slice(start, length)
2186}
2187
2188/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2189/// step defaults to 1.
2190pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2191    use polars::prelude::{as_struct, lit, DataType, GetOutput};
2192    let step_expr = step
2193        .map(|c| c.expr().clone().alias("2"))
2194        .unwrap_or_else(|| lit(1i64).alias("2"));
2195    let struct_expr = as_struct(vec![
2196        start.expr().clone().alias("0"),
2197        stop.expr().clone().alias("1"),
2198        step_expr,
2199    ]);
2200    let out_dtype = DataType::List(Box::new(DataType::Int64));
2201    let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2202    crate::column::Column::from_expr(expr, None)
2203}
2204
2205/// Random permutation of list elements (PySpark shuffle).
2206pub fn shuffle(column: &Column) -> Column {
2207    use polars::prelude::GetOutput;
2208    let expr = column
2209        .expr()
2210        .clone()
2211        .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2212    crate::column::Column::from_expr(expr, None)
2213}
2214
2215/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2216/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2217pub fn inline(column: &Column) -> Column {
2218    column.clone().explode()
2219}
2220
2221/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2222pub fn inline_outer(column: &Column) -> Column {
2223    column.clone().explode_outer()
2224}
2225
2226/// Explode list into one row per element (PySpark explode).
2227pub fn explode(column: &Column) -> Column {
2228    column.clone().explode()
2229}
2230
2231/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2232/// Implemented via Polars list.eval with col("") as element.
2233pub fn array_position(column: &Column, value: &Column) -> Column {
2234    column.clone().array_position(value.expr().clone())
2235}
2236
2237/// Remove null elements from list (PySpark array_compact).
2238pub fn array_compact(column: &Column) -> Column {
2239    column.clone().array_compact()
2240}
2241
2242/// New list with all elements equal to value removed (PySpark array_remove).
2243/// Implemented via Polars list.eval + list.drop_nulls.
2244pub fn array_remove(column: &Column, value: &Column) -> Column {
2245    column.clone().array_remove(value.expr().clone())
2246}
2247
2248/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2249pub fn array_repeat(column: &Column, n: i64) -> Column {
2250    column.clone().array_repeat(n)
2251}
2252
2253/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2254pub fn array_flatten(column: &Column) -> Column {
2255    column.clone().array_flatten()
2256}
2257
2258/// True if any list element satisfies the predicate (PySpark exists).
2259pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2260    column.clone().array_exists(predicate)
2261}
2262
2263/// True if all list elements satisfy the predicate (PySpark forall).
2264pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2265    column.clone().array_forall(predicate)
2266}
2267
2268/// Filter list elements by predicate (PySpark filter).
2269pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2270    column.clone().array_filter(predicate)
2271}
2272
2273/// Transform list elements by expression (PySpark transform).
2274pub fn array_transform(column: &Column, f: Expr) -> Column {
2275    column.clone().array_transform(f)
2276}
2277
2278/// Sum of list elements (PySpark aggregate sum).
2279pub fn array_sum(column: &Column) -> Column {
2280    column.clone().array_sum()
2281}
2282
2283/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2284pub fn aggregate(column: &Column, zero: &Column) -> Column {
2285    column.clone().array_aggregate(zero)
2286}
2287
2288/// Mean of list elements (PySpark aggregate avg).
2289pub fn array_mean(column: &Column) -> Column {
2290    column.clone().array_mean()
2291}
2292
2293/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2294/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2295pub fn posexplode(column: &Column) -> (Column, Column) {
2296    column.clone().posexplode()
2297}
2298
2299/// Build a map column from alternating key/value expressions (PySpark create_map).
2300/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2301pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2302    use polars::prelude::{as_struct, concat_list};
2303    if key_values.is_empty() {
2304        panic!("create_map requires at least one key-value pair");
2305    }
2306    let mut struct_exprs: Vec<Expr> = Vec::new();
2307    for i in (0..key_values.len()).step_by(2) {
2308        if i + 1 < key_values.len() {
2309            let k = key_values[i].expr().clone().alias("key");
2310            let v = key_values[i + 1].expr().clone().alias("value");
2311            struct_exprs.push(as_struct(vec![k, v]));
2312        }
2313    }
2314    let expr = concat_list(struct_exprs)
2315        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2316    Ok(crate::column::Column::from_expr(expr, None))
2317}
2318
2319/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2320pub fn map_keys(column: &Column) -> Column {
2321    column.clone().map_keys()
2322}
2323
2324/// Extract values from a map column (PySpark map_values).
2325pub fn map_values(column: &Column) -> Column {
2326    column.clone().map_values()
2327}
2328
2329/// Return map as list of structs {key, value} (PySpark map_entries).
2330pub fn map_entries(column: &Column) -> Column {
2331    column.clone().map_entries()
2332}
2333
2334/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2335pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2336    keys.clone().map_from_arrays(values)
2337}
2338
2339/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2340pub fn map_concat(a: &Column, b: &Column) -> Column {
2341    a.clone().map_concat(b)
2342}
2343
2344/// Array of structs {key, value} to map (PySpark map_from_entries).
2345pub fn map_from_entries(column: &Column) -> Column {
2346    column.clone().map_from_entries()
2347}
2348
2349/// True if map contains key (PySpark map_contains_key).
2350pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2351    map_col.clone().map_contains_key(key)
2352}
2353
2354/// Get value for key from map, or null (PySpark get).
2355pub fn get(map_col: &Column, key: &Column) -> Column {
2356    map_col.clone().get(key)
2357}
2358
2359/// Filter map entries by predicate (PySpark map_filter).
2360pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2361    map_col.clone().map_filter(predicate)
2362}
2363
2364/// Merge two maps by key with merge function (PySpark map_zip_with).
2365pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2366    map1.clone().map_zip_with(map2, merge)
2367}
2368
2369/// Convenience: zip_with with coalesce(left, right) merge.
2370pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2371    use polars::prelude::col;
2372    let left_field = col("").struct_().field_by_name("left");
2373    let right_field = col("").struct_().field_by_name("right");
2374    let merge = crate::column::Column::from_expr(
2375        coalesce(&[
2376            &crate::column::Column::from_expr(left_field, None),
2377            &crate::column::Column::from_expr(right_field, None),
2378        ])
2379        .into_expr(),
2380        None,
2381    );
2382    left.clone().zip_with(right, merge.into_expr())
2383}
2384
2385/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2386pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2387    use polars::prelude::col;
2388    let v1 = col("").struct_().field_by_name("value1");
2389    let v2 = col("").struct_().field_by_name("value2");
2390    let merge = coalesce(&[
2391        &crate::column::Column::from_expr(v1, None),
2392        &crate::column::Column::from_expr(v2, None),
2393    ])
2394    .into_expr();
2395    map1.clone().map_zip_with(map2, merge)
2396}
2397
2398/// Convenience: map_filter with value > threshold predicate.
2399pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2400    use polars::prelude::{col, lit};
2401    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2402    map_col.clone().map_filter(pred)
2403}
2404
2405/// Create struct from columns using column names as field names (PySpark struct).
2406pub fn struct_(columns: &[&Column]) -> Column {
2407    use polars::prelude::as_struct;
2408    if columns.is_empty() {
2409        panic!("struct requires at least one column");
2410    }
2411    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2412    crate::column::Column::from_expr(as_struct(exprs), None)
2413}
2414
2415/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2416pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2417    use polars::prelude::as_struct;
2418    if pairs.is_empty() {
2419        panic!("named_struct requires at least one (name, column) pair");
2420    }
2421    let exprs: Vec<Expr> = pairs
2422        .iter()
2423        .map(|(name, col)| col.expr().clone().alias(*name))
2424        .collect();
2425    crate::column::Column::from_expr(as_struct(exprs), None)
2426}
2427
2428/// Append element to end of list (PySpark array_append).
2429pub fn array_append(array: &Column, elem: &Column) -> Column {
2430    array.clone().array_append(elem)
2431}
2432
2433/// Prepend element to start of list (PySpark array_prepend).
2434pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2435    array.clone().array_prepend(elem)
2436}
2437
2438/// Insert element at 1-based position (PySpark array_insert).
2439pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2440    array.clone().array_insert(pos, elem)
2441}
2442
2443/// Elements in first array not in second (PySpark array_except).
2444pub fn array_except(a: &Column, b: &Column) -> Column {
2445    a.clone().array_except(b)
2446}
2447
2448/// Elements in both arrays (PySpark array_intersect).
2449pub fn array_intersect(a: &Column, b: &Column) -> Column {
2450    a.clone().array_intersect(b)
2451}
2452
2453/// Distinct elements from both arrays (PySpark array_union).
2454pub fn array_union(a: &Column, b: &Column) -> Column {
2455    a.clone().array_union(b)
2456}
2457
2458/// Zip two arrays element-wise with merge function (PySpark zip_with).
2459pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2460    left.clone().zip_with(right, merge)
2461}
2462
2463/// Extract JSON path from string column (PySpark get_json_object).
2464pub fn get_json_object(column: &Column, path: &str) -> Column {
2465    column.clone().get_json_object(path)
2466}
2467
2468/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2469pub fn json_object_keys(column: &Column) -> Column {
2470    column.clone().json_object_keys()
2471}
2472
2473/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2474pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2475    column.clone().json_tuple(keys)
2476}
2477
2478/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2479pub fn from_csv(column: &Column) -> Column {
2480    column.clone().from_csv()
2481}
2482
2483/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2484pub fn to_csv(column: &Column) -> Column {
2485    column.clone().to_csv()
2486}
2487
2488/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2489pub fn schema_of_csv(_column: &Column) -> Column {
2490    Column::from_expr(
2491        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2492        Some("schema_of_csv".to_string()),
2493    )
2494}
2495
2496/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2497pub fn schema_of_json(_column: &Column) -> Column {
2498    Column::from_expr(
2499        lit("STRUCT<>".to_string()),
2500        Some("schema_of_json".to_string()),
2501    )
2502}
2503
2504/// Parse string column as JSON into struct (PySpark from_json).
2505pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2506    column.clone().from_json(schema)
2507}
2508
2509/// Serialize struct column to JSON string (PySpark to_json).
2510pub fn to_json(column: &Column) -> Column {
2511    column.clone().to_json()
2512}
2513
2514/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2515pub fn isin(column: &Column, other: &Column) -> Column {
2516    column.clone().isin(other)
2517}
2518
2519/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2520pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2521    let s = Series::from_iter(values.iter().cloned());
2522    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2523}
2524
2525/// Check if column values are in the given string slice (PySpark isin with literal list).
2526pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2527    let s: Series = Series::from_iter(values.iter().copied());
2528    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2529}
2530
2531/// Percent-decode URL-encoded string (PySpark url_decode).
2532pub fn url_decode(column: &Column) -> Column {
2533    column.clone().url_decode()
2534}
2535
2536/// Percent-encode string for URL (PySpark url_encode).
2537pub fn url_encode(column: &Column) -> Column {
2538    column.clone().url_encode()
2539}
2540
2541/// Bitwise left shift (PySpark shiftLeft). col << n.
2542pub fn shift_left(column: &Column, n: i32) -> Column {
2543    column.clone().shift_left(n)
2544}
2545
2546/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2547pub fn shift_right(column: &Column, n: i32) -> Column {
2548    column.clone().shift_right(n)
2549}
2550
2551/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2552pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2553    column.clone().shift_right_unsigned(n)
2554}
2555
2556/// Session/library version string (PySpark version).
2557pub fn version() -> Column {
2558    Column::from_expr(
2559        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2560        None,
2561    )
2562}
2563
2564/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2565pub fn equal_null(left: &Column, right: &Column) -> Column {
2566    left.clone().eq_null_safe(right)
2567}
2568
2569/// Length of JSON array at path (PySpark json_array_length).
2570pub fn json_array_length(column: &Column, path: &str) -> Column {
2571    column.clone().json_array_length(path)
2572}
2573
2574/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2575/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2576pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2577    column.clone().parse_url(part, key)
2578}
2579
2580/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2581pub fn hash(columns: &[&Column]) -> Column {
2582    use polars::prelude::*;
2583    if columns.is_empty() {
2584        return crate::column::Column::from_expr(lit(0i64), None);
2585    }
2586    if columns.len() == 1 {
2587        return columns[0].clone().hash();
2588    }
2589    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2590    let struct_expr = polars::prelude::as_struct(exprs);
2591    let name = columns[0].name().to_string();
2592    let expr = struct_expr.map(
2593        crate::udfs::apply_hash_struct,
2594        GetOutput::from_type(DataType::Int64),
2595    );
2596    crate::column::Column::from_expr(expr, Some(name))
2597}
2598
2599/// Stack columns into struct (PySpark stack). Alias for struct_.
2600pub fn stack(columns: &[&Column]) -> Column {
2601    struct_(columns)
2602}
2603
2604#[cfg(test)]
2605mod tests {
2606    use super::*;
2607    use polars::prelude::{df, IntoLazy};
2608
2609    #[test]
2610    fn test_col_creates_column() {
2611        let column = col("test");
2612        assert_eq!(column.name(), "test");
2613    }
2614
2615    #[test]
2616    fn test_lit_i32() {
2617        let column = lit_i32(42);
2618        // The column should have a default name since it's a literal
2619        assert_eq!(column.name(), "<expr>");
2620    }
2621
2622    #[test]
2623    fn test_lit_i64() {
2624        let column = lit_i64(123456789012345i64);
2625        assert_eq!(column.name(), "<expr>");
2626    }
2627
2628    #[test]
2629    fn test_lit_f64() {
2630        let column = lit_f64(std::f64::consts::PI);
2631        assert_eq!(column.name(), "<expr>");
2632    }
2633
2634    #[test]
2635    fn test_lit_bool() {
2636        let column = lit_bool(true);
2637        assert_eq!(column.name(), "<expr>");
2638    }
2639
2640    #[test]
2641    fn test_lit_str() {
2642        let column = lit_str("hello");
2643        assert_eq!(column.name(), "<expr>");
2644    }
2645
2646    #[test]
2647    fn test_count_aggregation() {
2648        let column = col("value");
2649        let result = count(&column);
2650        assert_eq!(result.name(), "count");
2651    }
2652
2653    #[test]
2654    fn test_sum_aggregation() {
2655        let column = col("value");
2656        let result = sum(&column);
2657        assert_eq!(result.name(), "sum");
2658    }
2659
2660    #[test]
2661    fn test_avg_aggregation() {
2662        let column = col("value");
2663        let result = avg(&column);
2664        assert_eq!(result.name(), "avg");
2665    }
2666
2667    #[test]
2668    fn test_max_aggregation() {
2669        let column = col("value");
2670        let result = max(&column);
2671        assert_eq!(result.name(), "max");
2672    }
2673
2674    #[test]
2675    fn test_min_aggregation() {
2676        let column = col("value");
2677        let result = min(&column);
2678        assert_eq!(result.name(), "min");
2679    }
2680
2681    #[test]
2682    fn test_when_then_otherwise() {
2683        // Create a simple DataFrame
2684        let df = df!(
2685            "age" => &[15, 25, 35]
2686        )
2687        .unwrap();
2688
2689        // Build a when-then-otherwise expression
2690        let age_col = col("age");
2691        let condition = age_col.gt(polars::prelude::lit(18));
2692        let result = when(&condition)
2693            .then(&lit_str("adult"))
2694            .otherwise(&lit_str("minor"));
2695
2696        // Apply the expression
2697        let result_df = df
2698            .lazy()
2699            .with_column(result.into_expr().alias("status"))
2700            .collect()
2701            .unwrap();
2702
2703        // Verify the result
2704        let status_col = result_df.column("status").unwrap();
2705        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2706
2707        assert_eq!(values[0], Some("minor")); // age 15 < 18
2708        assert_eq!(values[1], Some("adult")); // age 25 > 18
2709        assert_eq!(values[2], Some("adult")); // age 35 > 18
2710    }
2711
2712    #[test]
2713    fn test_coalesce_returns_first_non_null() {
2714        // Create a DataFrame with some nulls
2715        let df = df!(
2716            "a" => &[Some(1), None, None],
2717            "b" => &[None, Some(2), None],
2718            "c" => &[None, None, Some(3)]
2719        )
2720        .unwrap();
2721
2722        let col_a = col("a");
2723        let col_b = col("b");
2724        let col_c = col("c");
2725        let result = coalesce(&[&col_a, &col_b, &col_c]);
2726
2727        // Apply the expression
2728        let result_df = df
2729            .lazy()
2730            .with_column(result.into_expr().alias("coalesced"))
2731            .collect()
2732            .unwrap();
2733
2734        // Verify the result
2735        let coalesced_col = result_df.column("coalesced").unwrap();
2736        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2737
2738        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2739        assert_eq!(values[1], Some(2)); // First non-null is 'b'
2740        assert_eq!(values[2], Some(3)); // First non-null is 'c'
2741    }
2742
2743    #[test]
2744    fn test_coalesce_with_literal_fallback() {
2745        // Create a DataFrame with all nulls in one row
2746        let df = df!(
2747            "a" => &[Some(1), None],
2748            "b" => &[None::<i32>, None::<i32>]
2749        )
2750        .unwrap();
2751
2752        let col_a = col("a");
2753        let col_b = col("b");
2754        let fallback = lit_i32(0);
2755        let result = coalesce(&[&col_a, &col_b, &fallback]);
2756
2757        // Apply the expression
2758        let result_df = df
2759            .lazy()
2760            .with_column(result.into_expr().alias("coalesced"))
2761            .collect()
2762            .unwrap();
2763
2764        // Verify the result
2765        let coalesced_col = result_df.column("coalesced").unwrap();
2766        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2767
2768        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2769        assert_eq!(values[1], Some(0)); // All nulls, use fallback
2770    }
2771
2772    #[test]
2773    #[should_panic(expected = "coalesce requires at least one column")]
2774    fn test_coalesce_empty_panics() {
2775        let columns: [&Column; 0] = [];
2776        let _ = coalesce(&columns);
2777    }
2778}