Skip to main content

robin_sparkless/
functions.rs

1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5// -----------------------------------------------------------------------------
6// -----------------------------------------------------------------------------
7
8/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
9#[derive(Debug, Clone)]
10pub struct SortOrder {
11    pub(crate) expr: Expr,
12    pub(crate) descending: bool,
13    pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17    pub fn expr(&self) -> &Expr {
18        &self.expr
19    }
20}
21
22/// Ascending sort, nulls first (Spark default for ASC).
23pub fn asc(column: &Column) -> SortOrder {
24    SortOrder {
25        expr: column.expr().clone(),
26        descending: false,
27        nulls_last: false,
28    }
29}
30
31/// Ascending sort, nulls first.
32pub fn asc_nulls_first(column: &Column) -> SortOrder {
33    SortOrder {
34        expr: column.expr().clone(),
35        descending: false,
36        nulls_last: false,
37    }
38}
39
40/// Ascending sort, nulls last.
41pub fn asc_nulls_last(column: &Column) -> SortOrder {
42    SortOrder {
43        expr: column.expr().clone(),
44        descending: false,
45        nulls_last: true,
46    }
47}
48
49/// Descending sort, nulls last (Spark default for DESC).
50pub fn desc(column: &Column) -> SortOrder {
51    SortOrder {
52        expr: column.expr().clone(),
53        descending: true,
54        nulls_last: true,
55    }
56}
57
58/// Descending sort, nulls first.
59pub fn desc_nulls_first(column: &Column) -> SortOrder {
60    SortOrder {
61        expr: column.expr().clone(),
62        descending: true,
63        nulls_last: false,
64    }
65}
66
67/// Descending sort, nulls last.
68pub fn desc_nulls_last(column: &Column) -> SortOrder {
69    SortOrder {
70        expr: column.expr().clone(),
71        descending: true,
72        nulls_last: true,
73    }
74}
75
76// -----------------------------------------------------------------------------
77
78/// Parse PySpark-like type name to Polars DataType.
79pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80    let s = name.trim().to_lowercase();
81    Ok(match s.as_str() {
82        "int" | "integer" => DataType::Int32,
83        "long" | "bigint" => DataType::Int64,
84        "float" => DataType::Float32,
85        "double" => DataType::Float64,
86        "string" | "str" => DataType::String,
87        "boolean" | "bool" => DataType::Boolean,
88        "date" => DataType::Date,
89        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
90        _ => return Err(format!("unknown type name: {name}")),
91    })
92}
93
94/// Get a column by name
95pub fn col(name: &str) -> Column {
96    Column::new(name.to_string())
97}
98
99/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
100pub fn grouping(column: &Column) -> Column {
101    let _ = column;
102    Column::from_expr(lit(0i32), Some("grouping".to_string()))
103}
104
105/// Grouping set id (PySpark grouping_id). Stub: returns 0.
106pub fn grouping_id(_columns: &[Column]) -> Column {
107    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
108}
109
110/// Create a literal column from a value
111pub fn lit_i32(value: i32) -> Column {
112    let expr: Expr = lit(value);
113    Column::from_expr(expr, None)
114}
115
116pub fn lit_i64(value: i64) -> Column {
117    let expr: Expr = lit(value);
118    Column::from_expr(expr, None)
119}
120
121pub fn lit_f64(value: f64) -> Column {
122    let expr: Expr = lit(value);
123    Column::from_expr(expr, None)
124}
125
126pub fn lit_bool(value: bool) -> Column {
127    let expr: Expr = lit(value);
128    Column::from_expr(expr, None)
129}
130
131pub fn lit_str(value: &str) -> Column {
132    let expr: Expr = lit(value);
133    Column::from_expr(expr, None)
134}
135
136/// Count aggregation
137pub fn count(col: &Column) -> Column {
138    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
139}
140
141/// Sum aggregation
142pub fn sum(col: &Column) -> Column {
143    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
144}
145
146/// Average aggregation
147pub fn avg(col: &Column) -> Column {
148    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
149}
150
151/// Alias for avg (PySpark mean).
152pub fn mean(col: &Column) -> Column {
153    avg(col)
154}
155
156/// Maximum aggregation
157pub fn max(col: &Column) -> Column {
158    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
159}
160
161/// Minimum aggregation
162pub fn min(col: &Column) -> Column {
163    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
164}
165
166/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
167pub fn stddev(col: &Column) -> Column {
168    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
169}
170
171/// Variance (sample) aggregation (PySpark variance / var_samp)
172pub fn variance(col: &Column) -> Column {
173    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
174}
175
176/// Population standard deviation (ddof=0). PySpark stddev_pop.
177pub fn stddev_pop(col: &Column) -> Column {
178    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
179}
180
181/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
182pub fn stddev_samp(col: &Column) -> Column {
183    stddev(col)
184}
185
186/// Alias for stddev (PySpark std).
187pub fn std(col: &Column) -> Column {
188    stddev(col)
189}
190
191/// Population variance (ddof=0). PySpark var_pop.
192pub fn var_pop(col: &Column) -> Column {
193    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
194}
195
196/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
197pub fn var_samp(col: &Column) -> Column {
198    variance(col)
199}
200
201/// Median aggregation. PySpark median.
202pub fn median(col: &Column) -> Column {
203    use polars::prelude::QuantileMethod;
204    Column::from_expr(
205        col.expr()
206            .clone()
207            .quantile(lit(0.5), QuantileMethod::Linear),
208        Some("median".to_string()),
209    )
210}
211
212/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0.
213pub fn approx_percentile(col: &Column, percentage: f64) -> Column {
214    use polars::prelude::QuantileMethod;
215    Column::from_expr(
216        col.expr()
217            .clone()
218            .quantile(lit(percentage), QuantileMethod::Linear),
219        Some(format!("approx_percentile({percentage})")),
220    )
221}
222
223/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
224pub fn percentile_approx(col: &Column, percentage: f64) -> Column {
225    approx_percentile(col, percentage)
226}
227
228/// Mode aggregation - most frequent value. PySpark mode.
229pub fn mode(col: &Column) -> Column {
230    col.clone().mode()
231}
232
233/// Count distinct aggregation (PySpark countDistinct)
234pub fn count_distinct(col: &Column) -> Column {
235    use polars::prelude::DataType;
236    Column::from_expr(
237        col.expr().clone().n_unique().cast(DataType::Int64),
238        Some("count_distinct".to_string()),
239    )
240}
241
242/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
243pub fn kurtosis(col: &Column) -> Column {
244    Column::from_expr(
245        col.expr()
246            .clone()
247            .cast(DataType::Float64)
248            .kurtosis(true, true),
249        Some("kurtosis".to_string()),
250    )
251}
252
253/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
254pub fn skewness(col: &Column) -> Column {
255    Column::from_expr(
256        col.expr().clone().cast(DataType::Float64).skew(true),
257        Some("skewness".to_string()),
258    )
259}
260
261/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
262pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
263    use polars::prelude::{col as pl_col, len};
264    let c1 = pl_col(col1).cast(DataType::Float64);
265    let c2 = pl_col(col2).cast(DataType::Float64);
266    let n = len().cast(DataType::Float64);
267    let sum_ab = (c1.clone() * c2.clone()).sum();
268    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
269    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
270    (sum_ab - sum_a * sum_b / n.clone()) / n
271}
272
273/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
274pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
275    use polars::prelude::{col as pl_col, len, lit, when};
276    let c1 = pl_col(col1).cast(DataType::Float64);
277    let c2 = pl_col(col2).cast(DataType::Float64);
278    let n = len().cast(DataType::Float64);
279    let sum_ab = (c1.clone() * c2.clone()).sum();
280    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
281    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
282    when(len().gt(lit(1)))
283        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
284        .otherwise(lit(f64::NAN))
285}
286
287/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
288pub fn corr_expr(col1: &str, col2: &str) -> Expr {
289    use polars::prelude::{col as pl_col, len, lit, when};
290    let c1 = pl_col(col1).cast(DataType::Float64);
291    let c2 = pl_col(col2).cast(DataType::Float64);
292    let n = len().cast(DataType::Float64);
293    let n1 = (len() - lit(1)).cast(DataType::Float64);
294    let sum_ab = (c1.clone() * c2.clone()).sum();
295    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
296    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
297    let sum_a2 = (c1.clone() * c1).sum();
298    let sum_b2 = (c2.clone() * c2).sum();
299    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
300    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
301    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
302    let std_a = var_a.sqrt();
303    let std_b = var_b.sqrt();
304    when(len().gt(lit(1)))
305        .then(cov_samp / (std_a * std_b))
306        .otherwise(lit(f64::NAN))
307}
308
309// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
310
311fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312    use polars::prelude::col as pl_col;
313    let y = pl_col(y_col).cast(DataType::Float64);
314    let x = pl_col(x_col).cast(DataType::Float64);
315    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316    let n = y
317        .clone()
318        .filter(cond.clone())
319        .count()
320        .cast(DataType::Float64);
321    let sum_x = x.clone().filter(cond.clone()).sum();
322    let sum_y = y.clone().filter(cond.clone()).sum();
323    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325    let sum_xy = (x * y).filter(cond).sum();
326    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
330pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331    let (n, ..) = regr_cond_and_sums(y_col, x_col);
332    n
333}
334
335/// Regression: average of x (PySpark regr_avgx).
336pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337    use polars::prelude::{lit, when};
338    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339    when(n.clone().gt(lit(0.0)))
340        .then(sum_x / n)
341        .otherwise(lit(f64::NAN))
342}
343
344/// Regression: average of y (PySpark regr_avgy).
345pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346    use polars::prelude::{lit, when};
347    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348    when(n.clone().gt(lit(0.0)))
349        .then(sum_y / n)
350        .otherwise(lit(f64::NAN))
351}
352
353/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
354pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355    use polars::prelude::{lit, when};
356    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357    when(n.clone().gt(lit(0.0)))
358        .then(sum_xx - sum_x.clone() * sum_x / n)
359        .otherwise(lit(f64::NAN))
360}
361
362/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
363pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364    use polars::prelude::{lit, when};
365    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366    when(n.clone().gt(lit(0.0)))
367        .then(sum_yy - sum_y.clone() * sum_y / n)
368        .otherwise(lit(f64::NAN))
369}
370
371/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
372pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373    use polars::prelude::{lit, when};
374    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375    when(n.clone().gt(lit(0.0)))
376        .then(sum_xy - sum_x * sum_y / n)
377        .otherwise(lit(f64::NAN))
378}
379
380/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
381pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382    use polars::prelude::{lit, when};
383    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387        .then(regr_sxy / regr_sxx)
388        .otherwise(lit(f64::NAN))
389}
390
391/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
392pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393    use polars::prelude::{lit, when};
394    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397    let slope = regr_sxy.clone() / regr_sxx.clone();
398    let avg_y = sum_y / n.clone();
399    let avg_x = sum_x / n.clone();
400    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401        .then(avg_y - slope * avg_x)
402        .otherwise(lit(f64::NAN))
403}
404
405/// Regression R-squared (PySpark regr_r2).
406pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407    use polars::prelude::{lit, when};
408    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411    let regr_sxy = sum_xy - sum_x * sum_y / n;
412    when(
413        regr_sxx
414            .clone()
415            .gt(lit(0.0))
416            .and(regr_syy.clone().gt(lit(0.0))),
417    )
418    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419    .otherwise(lit(f64::NAN))
420}
421
422/// PySpark-style conditional expression builder.
423///
424/// # Example
425/// ```
426/// use robin_sparkless::{col, lit_i64, lit_str, when};
427///
428/// // when(condition).then(value).otherwise(fallback)
429/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
430///     .then(&lit_str("adult"))
431///     .otherwise(&lit_str("minor"));
432/// ```
433pub fn when(condition: &Column) -> WhenBuilder {
434    WhenBuilder::new(condition.expr().clone())
435}
436
437/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
438pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439    use polars::prelude::*;
440    let null_expr = Expr::Literal(LiteralValue::Null);
441    let expr = polars::prelude::when(condition.expr().clone())
442        .then(value.expr().clone())
443        .otherwise(null_expr);
444    crate::column::Column::from_expr(expr, None)
445}
446
447/// Builder for when-then-otherwise expressions
448pub struct WhenBuilder {
449    condition: Expr,
450}
451
452impl WhenBuilder {
453    fn new(condition: Expr) -> Self {
454        WhenBuilder { condition }
455    }
456
457    /// Specify the value when condition is true
458    pub fn then(self, value: &Column) -> ThenBuilder {
459        use polars::prelude::*;
460        let when_then = when(self.condition).then(value.expr().clone());
461        ThenBuilder::new(when_then)
462    }
463
464    /// Specify the value when condition is false
465    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
466    /// For this implementation, we require .then() to be called explicitly.
467    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
468    pub fn otherwise(self, _value: &Column) -> Column {
469        // This should not be called directly - when().otherwise() without .then() is not supported
470        // Users should use when(cond).then(val1).otherwise(val2)
471        panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
472    }
473}
474
475/// Builder for chaining when-then clauses before finalizing with otherwise
476pub struct ThenBuilder {
477    when_then: polars::prelude::Then, // The Polars WhenThen state
478}
479
480impl ThenBuilder {
481    fn new(when_then: polars::prelude::Then) -> Self {
482        ThenBuilder { when_then }
483    }
484
485    /// Chain an additional when-then clause
486    /// Note: Chaining multiple when-then clauses is not yet fully supported.
487    /// For now, use a single when().then().otherwise() pattern.
488    pub fn when(self, _condition: &Column) -> ThenBuilder {
489        // TODO: Implement proper chaining support
490        // For now, return self to allow compilation but chaining won't work correctly
491        self
492    }
493
494    /// Finalize the expression with the fallback value
495    pub fn otherwise(self, value: &Column) -> Column {
496        let expr = self.when_then.otherwise(value.expr().clone());
497        crate::column::Column::from_expr(expr, None)
498    }
499}
500
501/// Convert string column to uppercase (PySpark upper)
502pub fn upper(column: &Column) -> Column {
503    column.clone().upper()
504}
505
506/// Convert string column to lowercase (PySpark lower)
507pub fn lower(column: &Column) -> Column {
508    column.clone().lower()
509}
510
511/// Substring with 1-based start (PySpark substring semantics)
512pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
513    column.clone().substr(start, length)
514}
515
516/// String length in characters (PySpark length)
517pub fn length(column: &Column) -> Column {
518    column.clone().length()
519}
520
521/// Trim leading and trailing whitespace (PySpark trim)
522pub fn trim(column: &Column) -> Column {
523    column.clone().trim()
524}
525
526/// Trim leading whitespace (PySpark ltrim)
527pub fn ltrim(column: &Column) -> Column {
528    column.clone().ltrim()
529}
530
531/// Trim trailing whitespace (PySpark rtrim)
532pub fn rtrim(column: &Column) -> Column {
533    column.clone().rtrim()
534}
535
536/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
537pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
538    column.clone().btrim(trim_str)
539}
540
541/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
542pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
543    column.clone().locate(substr, pos)
544}
545
546/// Base conversion (PySpark conv). num from from_base to to_base.
547pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
548    column.clone().conv(from_base, to_base)
549}
550
551/// Convert to hex string (PySpark hex).
552pub fn hex(column: &Column) -> Column {
553    column.clone().hex()
554}
555
556/// Convert hex string to binary/string (PySpark unhex).
557pub fn unhex(column: &Column) -> Column {
558    column.clone().unhex()
559}
560
561/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
562pub fn encode(column: &Column, charset: &str) -> Column {
563    column.clone().encode(charset)
564}
565
566/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
567pub fn decode(column: &Column, charset: &str) -> Column {
568    column.clone().decode(charset)
569}
570
571/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
572pub fn to_binary(column: &Column, fmt: &str) -> Column {
573    column.clone().to_binary(fmt)
574}
575
576/// Try convert to binary; null on failure (PySpark try_to_binary).
577pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
578    column.clone().try_to_binary(fmt)
579}
580
581/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
582pub fn aes_encrypt(column: &Column, key: &str) -> Column {
583    column.clone().aes_encrypt(key)
584}
585
586/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
587pub fn aes_decrypt(column: &Column, key: &str) -> Column {
588    column.clone().aes_decrypt(key)
589}
590
591/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
592pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
593    column.clone().try_aes_decrypt(key)
594}
595
596/// Convert integer to binary string (PySpark bin).
597pub fn bin(column: &Column) -> Column {
598    column.clone().bin()
599}
600
601/// Get bit at 0-based position (PySpark getbit).
602pub fn getbit(column: &Column, pos: i64) -> Column {
603    column.clone().getbit(pos)
604}
605
606/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
607pub fn bit_and(left: &Column, right: &Column) -> Column {
608    left.clone().bit_and(right)
609}
610
611/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
612pub fn bit_or(left: &Column, right: &Column) -> Column {
613    left.clone().bit_or(right)
614}
615
616/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
617pub fn bit_xor(left: &Column, right: &Column) -> Column {
618    left.clone().bit_xor(right)
619}
620
621/// Count of set bits in the integer representation (PySpark bit_count).
622pub fn bit_count(column: &Column) -> Column {
623    column.clone().bit_count()
624}
625
626/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
627pub fn bitwise_not(column: &Column) -> Column {
628    column.clone().bitwise_not()
629}
630
631// --- Bitmap (PySpark 3.5+) ---
632
633/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
634pub fn bitmap_bit_position(column: &Column) -> Column {
635    use polars::prelude::DataType;
636    let expr = column.expr().clone().cast(DataType::Int32);
637    Column::from_expr(expr, None)
638}
639
640/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
641pub fn bitmap_bucket_number(column: &Column) -> Column {
642    use polars::prelude::DataType;
643    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
644    Column::from_expr(expr, None)
645}
646
647/// Count set bits in a bitmap binary column (PySpark bitmap_count).
648pub fn bitmap_count(column: &Column) -> Column {
649    use polars::prelude::{DataType, GetOutput};
650    let expr = column.expr().clone().map(
651        crate::udfs::apply_bitmap_count,
652        GetOutput::from_type(DataType::Int64),
653    );
654    Column::from_expr(expr, None)
655}
656
657/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
658/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
659pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
660    use polars::prelude::{DataType, GetOutput};
661    column.expr().clone().implode().map(
662        crate::udfs::apply_bitmap_construct_agg,
663        GetOutput::from_type(DataType::Binary),
664    )
665}
666
667/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
668pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
669    use polars::prelude::{DataType, GetOutput};
670    column.expr().clone().implode().map(
671        crate::udfs::apply_bitmap_or_agg,
672        GetOutput::from_type(DataType::Binary),
673    )
674}
675
676/// Alias for getbit (PySpark bit_get).
677pub fn bit_get(column: &Column, pos: i64) -> Column {
678    getbit(column, pos)
679}
680
681/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
682/// When err_msg is Some, it is used in the error message when assertion fails.
683pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
684    column.clone().assert_true(err_msg)
685}
686
687/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
688pub fn raise_error(message: &str) -> Column {
689    let msg = message.to_string();
690    let expr = lit(0i64).map(
691        move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
692            Err(PolarsError::ComputeError(msg.clone().into()))
693        },
694        GetOutput::from_type(DataType::Int64),
695    );
696    Column::from_expr(expr, Some("raise_error".to_string()))
697}
698
699/// Broadcast hint - no-op that returns the same DataFrame (PySpark broadcast).
700pub fn broadcast(df: &DataFrame) -> DataFrame {
701    df.clone()
702}
703
704/// Stub partition id - always 0 (PySpark spark_partition_id).
705pub fn spark_partition_id() -> Column {
706    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
707}
708
709/// Stub input file name - empty string (PySpark input_file_name).
710pub fn input_file_name() -> Column {
711    Column::from_expr(lit(""), Some("input_file_name".to_string()))
712}
713
714/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
715/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
716pub fn monotonically_increasing_id() -> Column {
717    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
718}
719
720/// Current catalog name stub (PySpark current_catalog).
721pub fn current_catalog() -> Column {
722    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
723}
724
725/// Current database/schema name stub (PySpark current_database).
726pub fn current_database() -> Column {
727    Column::from_expr(lit("default"), Some("current_database".to_string()))
728}
729
730/// Current schema name stub (PySpark current_schema).
731pub fn current_schema() -> Column {
732    Column::from_expr(lit("default"), Some("current_schema".to_string()))
733}
734
735/// Current user stub (PySpark current_user).
736pub fn current_user() -> Column {
737    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
738}
739
740/// User stub (PySpark user).
741pub fn user() -> Column {
742    Column::from_expr(lit("unknown"), Some("user".to_string()))
743}
744
745/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
746/// When added via with_column, generates one distinct value per row (PySpark-like).
747pub fn rand(seed: Option<u64>) -> Column {
748    Column::from_rand(seed)
749}
750
751/// Random standard normal per row, with optional seed (PySpark randn).
752/// When added via with_column, generates one distinct value per row (PySpark-like).
753pub fn randn(seed: Option<u64>) -> Column {
754    Column::from_randn(seed)
755}
756
757/// True if two arrays have any element in common (PySpark arrays_overlap).
758pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
759    left.clone().arrays_overlap(right)
760}
761
762/// Zip arrays into array of structs (PySpark arrays_zip).
763pub fn arrays_zip(left: &Column, right: &Column) -> Column {
764    left.clone().arrays_zip(right)
765}
766
767/// Explode; null/empty yields one row with null (PySpark explode_outer).
768pub fn explode_outer(column: &Column) -> Column {
769    column.clone().explode_outer()
770}
771
772/// Posexplode with null preservation (PySpark posexplode_outer).
773pub fn posexplode_outer(column: &Column) -> (Column, Column) {
774    column.clone().posexplode_outer()
775}
776
777/// Collect to array (PySpark array_agg).
778pub fn array_agg(column: &Column) -> Column {
779    column.clone().array_agg()
780}
781
782/// Transform map keys by expr (PySpark transform_keys).
783pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
784    column.clone().transform_keys(key_expr)
785}
786
787/// Transform map values by expr (PySpark transform_values).
788pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
789    column.clone().transform_values(value_expr)
790}
791
792/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
793pub fn str_to_map(
794    column: &Column,
795    pair_delim: Option<&str>,
796    key_value_delim: Option<&str>,
797) -> Column {
798    let pd = pair_delim.unwrap_or(",");
799    let kvd = key_value_delim.unwrap_or(":");
800    column.clone().str_to_map(pd, kvd)
801}
802
803/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
804pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
805    column.clone().regexp_extract(pattern, group_index)
806}
807
808/// Replace first match of regex (PySpark regexp_replace)
809pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
810    column.clone().regexp_replace(pattern, replacement)
811}
812
813/// Split string by delimiter (PySpark split)
814pub fn split(column: &Column, delimiter: &str) -> Column {
815    column.clone().split(delimiter)
816}
817
818/// Title case (PySpark initcap)
819pub fn initcap(column: &Column) -> Column {
820    column.clone().initcap()
821}
822
823/// Extract all matches of regex (PySpark regexp_extract_all).
824pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
825    column.clone().regexp_extract_all(pattern)
826}
827
828/// Check if string matches regex (PySpark regexp_like / rlike).
829pub fn regexp_like(column: &Column, pattern: &str) -> Column {
830    column.clone().regexp_like(pattern)
831}
832
833/// Count of non-overlapping regex matches (PySpark regexp_count).
834pub fn regexp_count(column: &Column, pattern: &str) -> Column {
835    column.clone().regexp_count(pattern)
836}
837
838/// First substring matching regex (PySpark regexp_substr). Null if no match.
839pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
840    column.clone().regexp_substr(pattern)
841}
842
843/// Split by delimiter and return 1-based part (PySpark split_part).
844pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
845    column.clone().split_part(delimiter, part_num)
846}
847
848/// 1-based position of first regex match (PySpark regexp_instr).
849pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
850    column.clone().regexp_instr(pattern, group_idx)
851}
852
853/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
854pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
855    str_column.clone().find_in_set(set_column)
856}
857
858/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
859pub fn format_string(format: &str, columns: &[&Column]) -> Column {
860    use polars::prelude::*;
861    if columns.is_empty() {
862        panic!("format_string needs at least one column");
863    }
864    let format_owned = format.to_string();
865    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
866    let expr = columns[0].expr().clone().map_many(
867        move |cols| crate::udfs::apply_format_string(cols, &format_owned),
868        &args,
869        GetOutput::from_type(DataType::String),
870    );
871    crate::column::Column::from_expr(expr, None)
872}
873
874/// Alias for format_string (PySpark printf).
875pub fn printf(format: &str, columns: &[&Column]) -> Column {
876    format_string(format, columns)
877}
878
879/// Repeat string n times (PySpark repeat).
880pub fn repeat(column: &Column, n: i32) -> Column {
881    column.clone().repeat(n)
882}
883
884/// Reverse string (PySpark reverse).
885pub fn reverse(column: &Column) -> Column {
886    column.clone().reverse()
887}
888
889/// Find substring position 1-based; 0 if not found (PySpark instr).
890pub fn instr(column: &Column, substr: &str) -> Column {
891    column.clone().instr(substr)
892}
893
894/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
895pub fn position(substr: &str, column: &Column) -> Column {
896    column.clone().instr(substr)
897}
898
899/// ASCII value of first character (PySpark ascii). Returns Int32.
900pub fn ascii(column: &Column) -> Column {
901    column.clone().ascii()
902}
903
904/// Format numeric as string with fixed decimal places (PySpark format_number).
905pub fn format_number(column: &Column, decimals: u32) -> Column {
906    column.clone().format_number(decimals)
907}
908
909/// Replace substring at 1-based position (PySpark overlay). replace is literal.
910pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
911    column.clone().overlay(replace, pos, length)
912}
913
914/// Int to single-character string (PySpark char). Valid codepoint only.
915pub fn char(column: &Column) -> Column {
916    column.clone().char()
917}
918
919/// Alias for char (PySpark chr).
920pub fn chr(column: &Column) -> Column {
921    column.clone().chr()
922}
923
924/// Base64 encode string bytes (PySpark base64).
925pub fn base64(column: &Column) -> Column {
926    column.clone().base64()
927}
928
929/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
930pub fn unbase64(column: &Column) -> Column {
931    column.clone().unbase64()
932}
933
934/// SHA1 hash of string bytes, return hex string (PySpark sha1).
935pub fn sha1(column: &Column) -> Column {
936    column.clone().sha1()
937}
938
939/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
940pub fn sha2(column: &Column, bit_length: i32) -> Column {
941    column.clone().sha2(bit_length)
942}
943
944/// MD5 hash of string bytes, return hex string (PySpark md5).
945pub fn md5(column: &Column) -> Column {
946    column.clone().md5()
947}
948
949/// Left-pad string to length with pad char (PySpark lpad).
950pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
951    column.clone().lpad(length, pad)
952}
953
954/// Right-pad string to length with pad char (PySpark rpad).
955pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
956    column.clone().rpad(length, pad)
957}
958
959/// Character-by-character translation (PySpark translate).
960pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
961    column.clone().translate(from_str, to_str)
962}
963
964/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
965pub fn mask(
966    column: &Column,
967    upper_char: Option<char>,
968    lower_char: Option<char>,
969    digit_char: Option<char>,
970    other_char: Option<char>,
971) -> Column {
972    column
973        .clone()
974        .mask(upper_char, lower_char, digit_char, other_char)
975}
976
977/// Substring before/after nth delimiter (PySpark substring_index).
978pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
979    column.clone().substring_index(delimiter, count)
980}
981
982/// Leftmost n characters (PySpark left).
983pub fn left(column: &Column, n: i64) -> Column {
984    column.clone().left(n)
985}
986
987/// Rightmost n characters (PySpark right).
988pub fn right(column: &Column, n: i64) -> Column {
989    column.clone().right(n)
990}
991
992/// Replace all occurrences of search with replacement (literal). PySpark replace.
993pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
994    column.clone().replace(search, replacement)
995}
996
997/// True if string starts with prefix (PySpark startswith).
998pub fn startswith(column: &Column, prefix: &str) -> Column {
999    column.clone().startswith(prefix)
1000}
1001
1002/// True if string ends with suffix (PySpark endswith).
1003pub fn endswith(column: &Column, suffix: &str) -> Column {
1004    column.clone().endswith(suffix)
1005}
1006
1007/// True if string contains substring (literal). PySpark contains.
1008pub fn contains(column: &Column, substring: &str) -> Column {
1009    column.clone().contains(substring)
1010}
1011
1012/// SQL LIKE pattern (% any, _ one char). PySpark like.
1013/// When escape_char is Some(esc), esc + char treats that char as literal.
1014pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1015    column.clone().like(pattern, escape_char)
1016}
1017
1018/// Case-insensitive LIKE. PySpark ilike.
1019/// When escape_char is Some(esc), esc + char treats that char as literal.
1020pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1021    column.clone().ilike(pattern, escape_char)
1022}
1023
1024/// Alias for regexp_like. PySpark rlike / regexp.
1025pub fn rlike(column: &Column, pattern: &str) -> Column {
1026    column.clone().regexp_like(pattern)
1027}
1028
1029/// Alias for rlike (PySpark regexp).
1030pub fn regexp(column: &Column, pattern: &str) -> Column {
1031    rlike(column, pattern)
1032}
1033
1034/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1035pub fn soundex(column: &Column) -> Column {
1036    column.clone().soundex()
1037}
1038
1039/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1040pub fn levenshtein(column: &Column, other: &Column) -> Column {
1041    column.clone().levenshtein(other)
1042}
1043
1044/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1045pub fn crc32(column: &Column) -> Column {
1046    column.clone().crc32()
1047}
1048
1049/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1050pub fn xxhash64(column: &Column) -> Column {
1051    column.clone().xxhash64()
1052}
1053
1054/// Absolute value (PySpark abs)
1055pub fn abs(column: &Column) -> Column {
1056    column.clone().abs()
1057}
1058
1059/// Ceiling (PySpark ceil)
1060pub fn ceil(column: &Column) -> Column {
1061    column.clone().ceil()
1062}
1063
1064/// Floor (PySpark floor)
1065pub fn floor(column: &Column) -> Column {
1066    column.clone().floor()
1067}
1068
1069/// Round (PySpark round)
1070pub fn round(column: &Column, decimals: u32) -> Column {
1071    column.clone().round(decimals)
1072}
1073
1074/// Banker's rounding - round half to even (PySpark bround).
1075pub fn bround(column: &Column, scale: i32) -> Column {
1076    column.clone().bround(scale)
1077}
1078
1079/// Unary minus / negate (PySpark negate, negative).
1080pub fn negate(column: &Column) -> Column {
1081    column.clone().negate()
1082}
1083
1084/// Alias for negate. PySpark negative.
1085pub fn negative(column: &Column) -> Column {
1086    negate(column)
1087}
1088
1089/// Unary plus - no-op, returns column as-is (PySpark positive).
1090pub fn positive(column: &Column) -> Column {
1091    column.clone()
1092}
1093
1094/// Cotangent: 1/tan (PySpark cot).
1095pub fn cot(column: &Column) -> Column {
1096    column.clone().cot()
1097}
1098
1099/// Cosecant: 1/sin (PySpark csc).
1100pub fn csc(column: &Column) -> Column {
1101    column.clone().csc()
1102}
1103
1104/// Secant: 1/cos (PySpark sec).
1105pub fn sec(column: &Column) -> Column {
1106    column.clone().sec()
1107}
1108
1109/// Constant e = 2.718... (PySpark e).
1110pub fn e() -> Column {
1111    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1112}
1113
1114/// Constant pi = 3.14159... (PySpark pi).
1115pub fn pi() -> Column {
1116    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1117}
1118
1119/// Square root (PySpark sqrt)
1120pub fn sqrt(column: &Column) -> Column {
1121    column.clone().sqrt()
1122}
1123
1124/// Power (PySpark pow)
1125pub fn pow(column: &Column, exp: i64) -> Column {
1126    column.clone().pow(exp)
1127}
1128
1129/// Exponential (PySpark exp)
1130pub fn exp(column: &Column) -> Column {
1131    column.clone().exp()
1132}
1133
1134/// Natural logarithm (PySpark log with one arg)
1135pub fn log(column: &Column) -> Column {
1136    column.clone().log()
1137}
1138
1139/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1140pub fn log_with_base(column: &Column, base: f64) -> Column {
1141    crate::column::Column::from_expr(column.expr().clone().log(base), None)
1142}
1143
1144/// Sine in radians (PySpark sin)
1145pub fn sin(column: &Column) -> Column {
1146    column.clone().sin()
1147}
1148
1149/// Cosine in radians (PySpark cos)
1150pub fn cos(column: &Column) -> Column {
1151    column.clone().cos()
1152}
1153
1154/// Tangent in radians (PySpark tan)
1155pub fn tan(column: &Column) -> Column {
1156    column.clone().tan()
1157}
1158
1159/// Arc sine (PySpark asin)
1160pub fn asin(column: &Column) -> Column {
1161    column.clone().asin()
1162}
1163
1164/// Arc cosine (PySpark acos)
1165pub fn acos(column: &Column) -> Column {
1166    column.clone().acos()
1167}
1168
1169/// Arc tangent (PySpark atan)
1170pub fn atan(column: &Column) -> Column {
1171    column.clone().atan()
1172}
1173
1174/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1175pub fn atan2(y: &Column, x: &Column) -> Column {
1176    y.clone().atan2(x)
1177}
1178
1179/// Convert radians to degrees (PySpark degrees)
1180pub fn degrees(column: &Column) -> Column {
1181    column.clone().degrees()
1182}
1183
1184/// Convert degrees to radians (PySpark radians)
1185pub fn radians(column: &Column) -> Column {
1186    column.clone().radians()
1187}
1188
1189/// Sign of the number: -1, 0, or 1 (PySpark signum)
1190pub fn signum(column: &Column) -> Column {
1191    column.clone().signum()
1192}
1193
1194/// Alias for signum (PySpark sign).
1195pub fn sign(column: &Column) -> Column {
1196    signum(column)
1197}
1198
1199/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1200pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1201    let dtype = parse_type_name(type_name)?;
1202    Ok(Column::from_expr(
1203        column.expr().clone().strict_cast(dtype),
1204        None,
1205    ))
1206}
1207
1208/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1209pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1210    let dtype = parse_type_name(type_name)?;
1211    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1212}
1213
1214/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1215/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1216/// Returns Err if the cast to string fails (invalid type name or unsupported column type).
1217pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1218    match format {
1219        Some(fmt) => Ok(column
1220            .clone()
1221            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1222        None => cast(column, "string"),
1223    }
1224}
1225
1226/// Alias for to_char (PySpark to_varchar).
1227pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1228    to_char(column, format)
1229}
1230
1231/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1232/// Returns Err if the cast to double fails (invalid type name or unsupported column type).
1233pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1234    cast(column, "double")
1235}
1236
1237/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1238/// Returns Err if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1239pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1240    try_cast(column, "double")
1241}
1242
1243/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1244pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1245    use polars::prelude::{DataType, GetOutput, TimeUnit};
1246    match format {
1247        None => crate::cast(column, "timestamp"),
1248        Some(fmt) => {
1249            let fmt_owned = fmt.to_string();
1250            let expr = column.expr().clone().map(
1251                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), true),
1252                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1253            );
1254            Ok(crate::column::Column::from_expr(expr, None))
1255        }
1256    }
1257}
1258
1259/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1260/// Returns Err if the try_cast setup fails (invalid type name) when format is None.
1261pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1262    use polars::prelude::*;
1263    match format {
1264        None => try_cast(column, "timestamp"),
1265        Some(fmt) => {
1266            let fmt_owned = fmt.to_string();
1267            let expr = column.expr().clone().map(
1268                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), false),
1269                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1270            );
1271            Ok(crate::column::Column::from_expr(expr, None))
1272        }
1273    }
1274}
1275
1276/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1277pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1278    use polars::prelude::{DataType, GetOutput, TimeUnit};
1279    match format {
1280        None => crate::cast(column, "timestamp"),
1281        Some(fmt) => {
1282            let fmt_owned = fmt.to_string();
1283            let expr = column.expr().clone().map(
1284                move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1285                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1286            );
1287            Ok(crate::column::Column::from_expr(expr, None))
1288        }
1289    }
1290}
1291
1292/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1293pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1294    use polars::prelude::{DataType, GetOutput, TimeUnit};
1295    match format {
1296        None => crate::cast(column, "timestamp"),
1297        Some(fmt) => {
1298            let fmt_owned = fmt.to_string();
1299            let expr = column.expr().clone().map(
1300                move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1301                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1302            );
1303            Ok(crate::column::Column::from_expr(expr, None))
1304        }
1305    }
1306}
1307
1308/// Division that returns null on divide-by-zero (PySpark try_divide).
1309pub fn try_divide(left: &Column, right: &Column) -> Column {
1310    use polars::prelude::*;
1311    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1312    let null_expr = Expr::Literal(LiteralValue::Null);
1313    let div_expr =
1314        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1315    let expr = polars::prelude::when(zero_cond)
1316        .then(null_expr)
1317        .otherwise(div_expr);
1318    crate::column::Column::from_expr(expr, None)
1319}
1320
1321/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1322pub fn try_add(left: &Column, right: &Column) -> Column {
1323    let args = [right.expr().clone()];
1324    let expr =
1325        left.expr()
1326            .clone()
1327            .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1328    Column::from_expr(expr, None)
1329}
1330
1331/// Subtract that returns null on overflow (PySpark try_subtract).
1332pub fn try_subtract(left: &Column, right: &Column) -> Column {
1333    let args = [right.expr().clone()];
1334    let expr = left.expr().clone().map_many(
1335        crate::udfs::apply_try_subtract,
1336        &args,
1337        GetOutput::same_type(),
1338    );
1339    Column::from_expr(expr, None)
1340}
1341
1342/// Multiply that returns null on overflow (PySpark try_multiply).
1343pub fn try_multiply(left: &Column, right: &Column) -> Column {
1344    let args = [right.expr().clone()];
1345    let expr = left.expr().clone().map_many(
1346        crate::udfs::apply_try_multiply,
1347        &args,
1348        GetOutput::same_type(),
1349    );
1350    Column::from_expr(expr, None)
1351}
1352
1353/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1354pub fn try_element_at(column: &Column, index: i64) -> Column {
1355    column.clone().element_at(index)
1356}
1357
1358/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1359pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1360    if num_bucket <= 0 {
1361        panic!(
1362            "width_bucket: num_bucket must be positive, got {}",
1363            num_bucket
1364        );
1365    }
1366    use polars::prelude::*;
1367    let v = value.expr().clone().cast(DataType::Float64);
1368    let min_expr = lit(min_val);
1369    let max_expr = lit(max_val);
1370    let nb = num_bucket as f64;
1371    let width = (max_val - min_val) / nb;
1372    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1373    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1374    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1375    let expr = polars::prelude::when(v.clone().lt(min_expr))
1376        .then(lit(0i64))
1377        .when(v.gt_eq(max_expr))
1378        .then(lit(num_bucket + 1))
1379        .otherwise(bucket_clamped);
1380    crate::column::Column::from_expr(expr, None)
1381}
1382
1383/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1384pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1385    use polars::prelude::*;
1386    if columns.is_empty() {
1387        panic!("elt requires at least one column");
1388    }
1389    let idx_expr = index.expr().clone();
1390    let null_expr = Expr::Literal(LiteralValue::Null);
1391    let mut expr = null_expr;
1392    for (i, c) in columns.iter().enumerate().rev() {
1393        let n = (i + 1) as i64;
1394        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1395            .then(c.expr().clone())
1396            .otherwise(expr);
1397    }
1398    crate::column::Column::from_expr(expr, None)
1399}
1400
1401/// Bit length of string (bytes * 8) (PySpark bit_length).
1402pub fn bit_length(column: &Column) -> Column {
1403    column.clone().bit_length()
1404}
1405
1406/// Length of string in bytes (PySpark octet_length).
1407pub fn octet_length(column: &Column) -> Column {
1408    column.clone().octet_length()
1409}
1410
1411/// Length of string in characters (PySpark char_length). Alias of length().
1412pub fn char_length(column: &Column) -> Column {
1413    column.clone().char_length()
1414}
1415
1416/// Length of string in characters (PySpark character_length). Alias of length().
1417pub fn character_length(column: &Column) -> Column {
1418    column.clone().character_length()
1419}
1420
1421/// Data type of column as string (PySpark typeof). Constant per column from schema.
1422pub fn typeof_(column: &Column) -> Column {
1423    column.clone().typeof_()
1424}
1425
1426/// True where the float value is NaN (PySpark isnan).
1427pub fn isnan(column: &Column) -> Column {
1428    column.clone().is_nan()
1429}
1430
1431/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1432pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1433    if columns.is_empty() {
1434        return Err("greatest requires at least one column".to_string());
1435    }
1436    if columns.len() == 1 {
1437        return Ok((*columns[0]).clone());
1438    }
1439    let mut expr = columns[0].expr().clone();
1440    for c in columns.iter().skip(1) {
1441        let args = [c.expr().clone()];
1442        expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1443    }
1444    Ok(Column::from_expr(expr, None))
1445}
1446
1447/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1448pub fn least(columns: &[&Column]) -> Result<Column, String> {
1449    if columns.is_empty() {
1450        return Err("least requires at least one column".to_string());
1451    }
1452    if columns.len() == 1 {
1453        return Ok((*columns[0]).clone());
1454    }
1455    let mut expr = columns[0].expr().clone();
1456    for c in columns.iter().skip(1) {
1457        let args = [c.expr().clone()];
1458        expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1459    }
1460    Ok(Column::from_expr(expr, None))
1461}
1462
1463/// Extract year from datetime column (PySpark year)
1464pub fn year(column: &Column) -> Column {
1465    column.clone().year()
1466}
1467
1468/// Extract month from datetime column (PySpark month)
1469pub fn month(column: &Column) -> Column {
1470    column.clone().month()
1471}
1472
1473/// Extract day of month from datetime column (PySpark day)
1474pub fn day(column: &Column) -> Column {
1475    column.clone().day()
1476}
1477
1478/// Cast to date (PySpark to_date)
1479pub fn to_date(column: &Column) -> Column {
1480    column.clone().to_date()
1481}
1482
1483/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1484pub fn date_format(column: &Column, format: &str) -> Column {
1485    column
1486        .clone()
1487        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1488}
1489
1490/// Current date (evaluation time). PySpark current_date.
1491pub fn current_date() -> Column {
1492    use polars::prelude::*;
1493    let today = chrono::Utc::now().date_naive();
1494    let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1495    crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1496}
1497
1498/// Current timestamp (evaluation time). PySpark current_timestamp.
1499pub fn current_timestamp() -> Column {
1500    use polars::prelude::*;
1501    let ts = chrono::Utc::now().timestamp_micros();
1502    crate::column::Column::from_expr(
1503        Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1504        None,
1505    )
1506}
1507
1508/// Alias for current_date (PySpark curdate).
1509pub fn curdate() -> Column {
1510    current_date()
1511}
1512
1513/// Alias for current_timestamp (PySpark now).
1514pub fn now() -> Column {
1515    current_timestamp()
1516}
1517
1518/// Alias for current_timestamp (PySpark localtimestamp).
1519pub fn localtimestamp() -> Column {
1520    current_timestamp()
1521}
1522
1523/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1524pub fn date_diff(end: &Column, start: &Column) -> Column {
1525    datediff(end, start)
1526}
1527
1528/// Alias for date_add (PySpark dateadd).
1529pub fn dateadd(column: &Column, n: i32) -> Column {
1530    date_add(column, n)
1531}
1532
1533/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1534pub fn extract(column: &Column, field: &str) -> Column {
1535    column.clone().extract(field)
1536}
1537
1538/// Alias for extract (PySpark date_part).
1539pub fn date_part(column: &Column, field: &str) -> Column {
1540    extract(column, field)
1541}
1542
1543/// Alias for extract (PySpark datepart).
1544pub fn datepart(column: &Column, field: &str) -> Column {
1545    extract(column, field)
1546}
1547
1548/// Timestamp to microseconds since epoch (PySpark unix_micros).
1549pub fn unix_micros(column: &Column) -> Column {
1550    column.clone().unix_micros()
1551}
1552
1553/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1554pub fn unix_millis(column: &Column) -> Column {
1555    column.clone().unix_millis()
1556}
1557
1558/// Timestamp to seconds since epoch (PySpark unix_seconds).
1559pub fn unix_seconds(column: &Column) -> Column {
1560    column.clone().unix_seconds()
1561}
1562
1563/// Weekday name "Mon","Tue",... (PySpark dayname).
1564pub fn dayname(column: &Column) -> Column {
1565    column.clone().dayname()
1566}
1567
1568/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1569pub fn weekday(column: &Column) -> Column {
1570    column.clone().weekday()
1571}
1572
1573/// Extract hour from datetime column (PySpark hour).
1574pub fn hour(column: &Column) -> Column {
1575    column.clone().hour()
1576}
1577
1578/// Extract minute from datetime column (PySpark minute).
1579pub fn minute(column: &Column) -> Column {
1580    column.clone().minute()
1581}
1582
1583/// Extract second from datetime column (PySpark second).
1584pub fn second(column: &Column) -> Column {
1585    column.clone().second()
1586}
1587
1588/// Add n days to date column (PySpark date_add).
1589pub fn date_add(column: &Column, n: i32) -> Column {
1590    column.clone().date_add(n)
1591}
1592
1593/// Subtract n days from date column (PySpark date_sub).
1594pub fn date_sub(column: &Column, n: i32) -> Column {
1595    column.clone().date_sub(n)
1596}
1597
1598/// Number of days between two date columns (PySpark datediff).
1599pub fn datediff(end: &Column, start: &Column) -> Column {
1600    start.clone().datediff(end)
1601}
1602
1603/// Last day of month for date column (PySpark last_day).
1604pub fn last_day(column: &Column) -> Column {
1605    column.clone().last_day()
1606}
1607
1608/// Truncate date/datetime to unit (PySpark trunc).
1609pub fn trunc(column: &Column, format: &str) -> Column {
1610    column.clone().trunc(format)
1611}
1612
1613/// Alias for trunc (PySpark date_trunc).
1614pub fn date_trunc(format: &str, column: &Column) -> Column {
1615    trunc(column, format)
1616}
1617
1618/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1619pub fn quarter(column: &Column) -> Column {
1620    column.clone().quarter()
1621}
1622
1623/// Extract ISO week of year (1-53) (PySpark weekofyear).
1624pub fn weekofyear(column: &Column) -> Column {
1625    column.clone().weekofyear()
1626}
1627
1628/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1629pub fn dayofweek(column: &Column) -> Column {
1630    column.clone().dayofweek()
1631}
1632
1633/// Extract day of year (1-366) (PySpark dayofyear).
1634pub fn dayofyear(column: &Column) -> Column {
1635    column.clone().dayofyear()
1636}
1637
1638/// Add n months to date column (PySpark add_months).
1639pub fn add_months(column: &Column, n: i32) -> Column {
1640    column.clone().add_months(n)
1641}
1642
1643/// Months between end and start dates as fractional (PySpark months_between).
1644/// When round_off is true, rounds to 8 decimal places (PySpark default).
1645pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1646    end.clone().months_between(start, round_off)
1647}
1648
1649/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
1650pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1651    column.clone().next_day(day_of_week)
1652}
1653
1654/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
1655pub fn unix_timestamp_now() -> Column {
1656    use polars::prelude::*;
1657    let secs = chrono::Utc::now().timestamp();
1658    crate::column::Column::from_expr(lit(secs), None)
1659}
1660
1661/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
1662pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1663    column.clone().unix_timestamp(format)
1664}
1665
1666/// Alias for unix_timestamp.
1667pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1668    unix_timestamp(column, format)
1669}
1670
1671/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1672pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1673    column.clone().from_unixtime(format)
1674}
1675
1676/// Build date from year, month, day columns (PySpark make_date).
1677pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1678    use polars::prelude::*;
1679    let args = [month.expr().clone(), day.expr().clone()];
1680    let expr = year.expr().clone().map_many(
1681        crate::udfs::apply_make_date,
1682        &args,
1683        GetOutput::from_type(DataType::Date),
1684    );
1685    crate::column::Column::from_expr(expr, None)
1686}
1687
1688/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
1689/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
1690pub fn make_timestamp(
1691    year: &Column,
1692    month: &Column,
1693    day: &Column,
1694    hour: &Column,
1695    minute: &Column,
1696    sec: &Column,
1697    timezone: Option<&str>,
1698) -> Column {
1699    use polars::prelude::*;
1700    let tz_owned = timezone.map(|s| s.to_string());
1701    let args = [
1702        month.expr().clone(),
1703        day.expr().clone(),
1704        hour.expr().clone(),
1705        minute.expr().clone(),
1706        sec.expr().clone(),
1707    ];
1708    let expr = year.expr().clone().map_many(
1709        move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1710        &args,
1711        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1712    );
1713    crate::column::Column::from_expr(expr, None)
1714}
1715
1716/// Add amount of unit to timestamp (PySpark timestampadd).
1717pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1718    ts.clone().timestampadd(unit, amount)
1719}
1720
1721/// Difference between timestamps in unit (PySpark timestampdiff).
1722pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1723    start.clone().timestampdiff(unit, end)
1724}
1725
1726/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
1727pub fn days(n: i64) -> Column {
1728    make_interval(0, 0, 0, n, 0, 0, 0)
1729}
1730
1731/// Interval of n hours (PySpark hours).
1732pub fn hours(n: i64) -> Column {
1733    make_interval(0, 0, 0, 0, n, 0, 0)
1734}
1735
1736/// Interval of n minutes (PySpark minutes).
1737pub fn minutes(n: i64) -> Column {
1738    make_interval(0, 0, 0, 0, 0, n, 0)
1739}
1740
1741/// Interval of n months (PySpark months). Approximated as 30*n days.
1742pub fn months(n: i64) -> Column {
1743    make_interval(0, n, 0, 0, 0, 0, 0)
1744}
1745
1746/// Interval of n years (PySpark years). Approximated as 365*n days.
1747pub fn years(n: i64) -> Column {
1748    make_interval(n, 0, 0, 0, 0, 0, 0)
1749}
1750
1751/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
1752pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1753    column.clone().from_utc_timestamp(tz)
1754}
1755
1756/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1757pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1758    column.clone().to_utc_timestamp(tz)
1759}
1760
1761/// Convert timestamp between timezones (PySpark convert_timezone).
1762pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1763    let source_tz = source_tz.to_string();
1764    let target_tz = target_tz.to_string();
1765    let expr = column.expr().clone().map(
1766        move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
1767        GetOutput::same_type(),
1768    );
1769    crate::column::Column::from_expr(expr, None)
1770}
1771
1772/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
1773pub fn current_timezone() -> Column {
1774    use polars::prelude::*;
1775    crate::column::Column::from_expr(lit("UTC"), None)
1776}
1777
1778/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
1779pub fn make_interval(
1780    years: i64,
1781    months: i64,
1782    weeks: i64,
1783    days: i64,
1784    hours: i64,
1785    mins: i64,
1786    secs: i64,
1787) -> Column {
1788    use polars::prelude::*;
1789    // Approximate: 1 year = 365 days, 1 month = 30 days
1790    let total_days = years * 365 + months * 30 + weeks * 7 + days;
1791    let args = DurationArgs::new()
1792        .with_days(lit(total_days))
1793        .with_hours(lit(hours))
1794        .with_minutes(lit(mins))
1795        .with_seconds(lit(secs));
1796    let dur = duration(args);
1797    crate::column::Column::from_expr(dur, None)
1798}
1799
1800/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
1801pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
1802    use polars::prelude::*;
1803    let args = DurationArgs::new()
1804        .with_days(lit(days))
1805        .with_hours(lit(hours))
1806        .with_minutes(lit(minutes))
1807        .with_seconds(lit(seconds));
1808    let dur = duration(args);
1809    crate::column::Column::from_expr(dur, None)
1810}
1811
1812/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
1813pub fn make_ym_interval(years: i32, months: i32) -> Column {
1814    use polars::prelude::*;
1815    let total_months = years * 12 + months;
1816    crate::column::Column::from_expr(lit(total_months), None)
1817}
1818
1819/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
1820pub fn make_timestamp_ntz(
1821    year: &Column,
1822    month: &Column,
1823    day: &Column,
1824    hour: &Column,
1825    minute: &Column,
1826    sec: &Column,
1827) -> Column {
1828    make_timestamp(year, month, day, hour, minute, sec, None)
1829}
1830
1831/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
1832pub fn timestamp_seconds(column: &Column) -> Column {
1833    column.clone().timestamp_seconds()
1834}
1835
1836/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
1837pub fn timestamp_millis(column: &Column) -> Column {
1838    column.clone().timestamp_millis()
1839}
1840
1841/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
1842pub fn timestamp_micros(column: &Column) -> Column {
1843    column.clone().timestamp_micros()
1844}
1845
1846/// Date to days since 1970-01-01 (PySpark unix_date).
1847pub fn unix_date(column: &Column) -> Column {
1848    column.clone().unix_date()
1849}
1850
1851/// Days since epoch to date (PySpark date_from_unix_date).
1852pub fn date_from_unix_date(column: &Column) -> Column {
1853    column.clone().date_from_unix_date()
1854}
1855
1856/// Positive modulus (PySpark pmod).
1857pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
1858    dividend.clone().pmod(divisor)
1859}
1860
1861/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
1862pub fn factorial(column: &Column) -> Column {
1863    column.clone().factorial()
1864}
1865
1866/// Concatenate string columns without separator (PySpark concat)
1867pub fn concat(columns: &[&Column]) -> Column {
1868    use polars::prelude::*;
1869    if columns.is_empty() {
1870        panic!("concat requires at least one column");
1871    }
1872    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1873    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
1874}
1875
1876/// Concatenate string columns with separator (PySpark concat_ws)
1877pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
1878    use polars::prelude::*;
1879    if columns.is_empty() {
1880        panic!("concat_ws requires at least one column");
1881    }
1882    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1883    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
1884}
1885
1886/// Row number window function (1, 2, 3 by order within partition).
1887/// Use with `.over(partition_by)` after ranking by an order column.
1888///
1889/// # Example
1890/// ```
1891/// use robin_sparkless::{col, Column};
1892/// let salary_col = col("salary");
1893/// let rn = salary_col.row_number(true).over(&["dept"]);
1894/// ```
1895pub fn row_number(column: &Column) -> Column {
1896    column.clone().row_number(false)
1897}
1898
1899/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
1900pub fn rank(column: &Column, descending: bool) -> Column {
1901    column.clone().rank(descending)
1902}
1903
1904/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
1905pub fn dense_rank(column: &Column, descending: bool) -> Column {
1906    column.clone().dense_rank(descending)
1907}
1908
1909/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
1910pub fn lag(column: &Column, n: i64) -> Column {
1911    column.clone().lag(n)
1912}
1913
1914/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
1915pub fn lead(column: &Column, n: i64) -> Column {
1916    column.clone().lead(n)
1917}
1918
1919/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
1920pub fn first_value(column: &Column) -> Column {
1921    column.clone().first_value()
1922}
1923
1924/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
1925pub fn last_value(column: &Column) -> Column {
1926    column.clone().last_value()
1927}
1928
1929/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
1930pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
1931    column.clone().percent_rank(partition_by, descending)
1932}
1933
1934/// Cumulative distribution in partition: row_number / count. Window is applied.
1935pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
1936    column.clone().cume_dist(partition_by, descending)
1937}
1938
1939/// Ntile: bucket 1..n by rank within partition. Window is applied.
1940pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
1941    column.clone().ntile(n, partition_by, descending)
1942}
1943
1944/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
1945pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
1946    column.clone().nth_value(n, partition_by, descending)
1947}
1948
1949/// Coalesce - returns the first non-null value from multiple columns.
1950///
1951/// # Example
1952/// ```
1953/// use robin_sparkless::{col, lit_i64, coalesce};
1954///
1955/// // coalesce(col("a"), col("b"), lit(0))
1956/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
1957/// ```
1958pub fn coalesce(columns: &[&Column]) -> Column {
1959    use polars::prelude::*;
1960    if columns.is_empty() {
1961        panic!("coalesce requires at least one column");
1962    }
1963    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1964    let expr = coalesce(&exprs);
1965    crate::column::Column::from_expr(expr, None)
1966}
1967
1968/// Alias for coalesce(col, value). PySpark nvl / ifnull.
1969pub fn nvl(column: &Column, value: &Column) -> Column {
1970    coalesce(&[column, value])
1971}
1972
1973/// Alias for nvl. PySpark ifnull.
1974pub fn ifnull(column: &Column, value: &Column) -> Column {
1975    nvl(column, value)
1976}
1977
1978/// Return null if column equals value, else column. PySpark nullif.
1979pub fn nullif(column: &Column, value: &Column) -> Column {
1980    use polars::prelude::*;
1981    let cond = column.expr().clone().eq(value.expr().clone());
1982    let null_lit = Expr::Literal(LiteralValue::Null);
1983    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
1984    crate::column::Column::from_expr(expr, None)
1985}
1986
1987/// Replace NaN with value. PySpark nanvl.
1988pub fn nanvl(column: &Column, value: &Column) -> Column {
1989    use polars::prelude::*;
1990    let cond = column.expr().clone().is_nan();
1991    let expr = when(cond)
1992        .then(value.expr().clone())
1993        .otherwise(column.expr().clone());
1994    crate::column::Column::from_expr(expr, None)
1995}
1996
1997/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
1998pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
1999    use polars::prelude::*;
2000    let cond = col1.expr().clone().is_not_null();
2001    let expr = when(cond)
2002        .then(col2.expr().clone())
2003        .otherwise(col3.expr().clone());
2004    crate::column::Column::from_expr(expr, None)
2005}
2006
2007/// Alias for substring. PySpark substr.
2008pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2009    substring(column, start, length)
2010}
2011
2012/// Alias for pow. PySpark power.
2013pub fn power(column: &Column, exp: i64) -> Column {
2014    pow(column, exp)
2015}
2016
2017/// Alias for log (natural log). PySpark ln.
2018pub fn ln(column: &Column) -> Column {
2019    log(column)
2020}
2021
2022/// Alias for ceil. PySpark ceiling.
2023pub fn ceiling(column: &Column) -> Column {
2024    ceil(column)
2025}
2026
2027/// Alias for lower. PySpark lcase.
2028pub fn lcase(column: &Column) -> Column {
2029    lower(column)
2030}
2031
2032/// Alias for upper. PySpark ucase.
2033pub fn ucase(column: &Column) -> Column {
2034    upper(column)
2035}
2036
2037/// Alias for day. PySpark dayofmonth.
2038pub fn dayofmonth(column: &Column) -> Column {
2039    day(column)
2040}
2041
2042/// Alias for degrees. PySpark toDegrees.
2043pub fn to_degrees(column: &Column) -> Column {
2044    degrees(column)
2045}
2046
2047/// Alias for radians. PySpark toRadians.
2048pub fn to_radians(column: &Column) -> Column {
2049    radians(column)
2050}
2051
2052/// Hyperbolic cosine (PySpark cosh).
2053pub fn cosh(column: &Column) -> Column {
2054    column.clone().cosh()
2055}
2056/// Hyperbolic sine (PySpark sinh).
2057pub fn sinh(column: &Column) -> Column {
2058    column.clone().sinh()
2059}
2060/// Hyperbolic tangent (PySpark tanh).
2061pub fn tanh(column: &Column) -> Column {
2062    column.clone().tanh()
2063}
2064/// Inverse hyperbolic cosine (PySpark acosh).
2065pub fn acosh(column: &Column) -> Column {
2066    column.clone().acosh()
2067}
2068/// Inverse hyperbolic sine (PySpark asinh).
2069pub fn asinh(column: &Column) -> Column {
2070    column.clone().asinh()
2071}
2072/// Inverse hyperbolic tangent (PySpark atanh).
2073pub fn atanh(column: &Column) -> Column {
2074    column.clone().atanh()
2075}
2076/// Cube root (PySpark cbrt).
2077pub fn cbrt(column: &Column) -> Column {
2078    column.clone().cbrt()
2079}
2080/// exp(x) - 1 (PySpark expm1).
2081pub fn expm1(column: &Column) -> Column {
2082    column.clone().expm1()
2083}
2084/// log(1 + x) (PySpark log1p).
2085pub fn log1p(column: &Column) -> Column {
2086    column.clone().log1p()
2087}
2088/// Base-10 log (PySpark log10).
2089pub fn log10(column: &Column) -> Column {
2090    column.clone().log10()
2091}
2092/// Base-2 log (PySpark log2).
2093pub fn log2(column: &Column) -> Column {
2094    column.clone().log2()
2095}
2096/// Round to nearest integer (PySpark rint).
2097pub fn rint(column: &Column) -> Column {
2098    column.clone().rint()
2099}
2100/// sqrt(x*x + y*y) (PySpark hypot).
2101pub fn hypot(x: &Column, y: &Column) -> Column {
2102    let xx = x.expr().clone() * x.expr().clone();
2103    let yy = y.expr().clone() * y.expr().clone();
2104    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2105}
2106
2107/// True if column is null. PySpark isnull.
2108pub fn isnull(column: &Column) -> Column {
2109    column.clone().is_null()
2110}
2111
2112/// True if column is not null. PySpark isnotnull.
2113pub fn isnotnull(column: &Column) -> Column {
2114    column.clone().is_not_null()
2115}
2116
2117/// Create an array column from multiple columns (PySpark array).
2118pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2119    use polars::prelude::*;
2120    if columns.is_empty() {
2121        panic!("array requires at least one column");
2122    }
2123    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2124    let expr = concat_list(exprs)
2125        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2126    Ok(crate::column::Column::from_expr(expr, None))
2127}
2128
2129/// Number of elements in list (PySpark size / array_size). Returns Int32.
2130pub fn array_size(column: &Column) -> Column {
2131    column.clone().array_size()
2132}
2133
2134/// Alias for array_size (PySpark size).
2135pub fn size(column: &Column) -> Column {
2136    column.clone().array_size()
2137}
2138
2139/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2140pub fn cardinality(column: &Column) -> Column {
2141    column.clone().cardinality()
2142}
2143
2144/// Check if list contains value (PySpark array_contains).
2145pub fn array_contains(column: &Column, value: &Column) -> Column {
2146    column.clone().array_contains(value.expr().clone())
2147}
2148
2149/// Join list of strings with separator (PySpark array_join).
2150pub fn array_join(column: &Column, separator: &str) -> Column {
2151    column.clone().array_join(separator)
2152}
2153
2154/// Maximum element in list (PySpark array_max).
2155pub fn array_max(column: &Column) -> Column {
2156    column.clone().array_max()
2157}
2158
2159/// Minimum element in list (PySpark array_min).
2160pub fn array_min(column: &Column) -> Column {
2161    column.clone().array_min()
2162}
2163
2164/// Get element at 1-based index (PySpark element_at).
2165pub fn element_at(column: &Column, index: i64) -> Column {
2166    column.clone().element_at(index)
2167}
2168
2169/// Sort list elements (PySpark array_sort).
2170pub fn array_sort(column: &Column) -> Column {
2171    column.clone().array_sort()
2172}
2173
2174/// Distinct elements in list (PySpark array_distinct).
2175pub fn array_distinct(column: &Column) -> Column {
2176    column.clone().array_distinct()
2177}
2178
2179/// Slice list from 1-based start with optional length (PySpark slice).
2180pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2181    column.clone().array_slice(start, length)
2182}
2183
2184/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2185/// step defaults to 1.
2186pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2187    use polars::prelude::{as_struct, lit, DataType, GetOutput};
2188    let step_expr = step
2189        .map(|c| c.expr().clone().alias("2"))
2190        .unwrap_or_else(|| lit(1i64).alias("2"));
2191    let struct_expr = as_struct(vec![
2192        start.expr().clone().alias("0"),
2193        stop.expr().clone().alias("1"),
2194        step_expr,
2195    ]);
2196    let out_dtype = DataType::List(Box::new(DataType::Int64));
2197    let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2198    crate::column::Column::from_expr(expr, None)
2199}
2200
2201/// Random permutation of list elements (PySpark shuffle).
2202pub fn shuffle(column: &Column) -> Column {
2203    use polars::prelude::GetOutput;
2204    let expr = column
2205        .expr()
2206        .clone()
2207        .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2208    crate::column::Column::from_expr(expr, None)
2209}
2210
2211/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2212/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2213pub fn inline(column: &Column) -> Column {
2214    column.clone().explode()
2215}
2216
2217/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2218pub fn inline_outer(column: &Column) -> Column {
2219    column.clone().explode_outer()
2220}
2221
2222/// Explode list into one row per element (PySpark explode).
2223pub fn explode(column: &Column) -> Column {
2224    column.clone().explode()
2225}
2226
2227/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2228/// Implemented via Polars list.eval with col("") as element.
2229pub fn array_position(column: &Column, value: &Column) -> Column {
2230    column.clone().array_position(value.expr().clone())
2231}
2232
2233/// Remove null elements from list (PySpark array_compact).
2234pub fn array_compact(column: &Column) -> Column {
2235    column.clone().array_compact()
2236}
2237
2238/// New list with all elements equal to value removed (PySpark array_remove).
2239/// Implemented via Polars list.eval + list.drop_nulls.
2240pub fn array_remove(column: &Column, value: &Column) -> Column {
2241    column.clone().array_remove(value.expr().clone())
2242}
2243
2244/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2245pub fn array_repeat(column: &Column, n: i64) -> Column {
2246    column.clone().array_repeat(n)
2247}
2248
2249/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2250pub fn array_flatten(column: &Column) -> Column {
2251    column.clone().array_flatten()
2252}
2253
2254/// True if any list element satisfies the predicate (PySpark exists).
2255pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2256    column.clone().array_exists(predicate)
2257}
2258
2259/// True if all list elements satisfy the predicate (PySpark forall).
2260pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2261    column.clone().array_forall(predicate)
2262}
2263
2264/// Filter list elements by predicate (PySpark filter).
2265pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2266    column.clone().array_filter(predicate)
2267}
2268
2269/// Transform list elements by expression (PySpark transform).
2270pub fn array_transform(column: &Column, f: Expr) -> Column {
2271    column.clone().array_transform(f)
2272}
2273
2274/// Sum of list elements (PySpark aggregate sum).
2275pub fn array_sum(column: &Column) -> Column {
2276    column.clone().array_sum()
2277}
2278
2279/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2280pub fn aggregate(column: &Column, zero: &Column) -> Column {
2281    column.clone().array_aggregate(zero)
2282}
2283
2284/// Mean of list elements (PySpark aggregate avg).
2285pub fn array_mean(column: &Column) -> Column {
2286    column.clone().array_mean()
2287}
2288
2289/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2290/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2291pub fn posexplode(column: &Column) -> (Column, Column) {
2292    column.clone().posexplode()
2293}
2294
2295/// Build a map column from alternating key/value expressions (PySpark create_map).
2296/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2297pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2298    use polars::prelude::{as_struct, concat_list};
2299    if key_values.is_empty() {
2300        panic!("create_map requires at least one key-value pair");
2301    }
2302    let mut struct_exprs: Vec<Expr> = Vec::new();
2303    for i in (0..key_values.len()).step_by(2) {
2304        if i + 1 < key_values.len() {
2305            let k = key_values[i].expr().clone().alias("key");
2306            let v = key_values[i + 1].expr().clone().alias("value");
2307            struct_exprs.push(as_struct(vec![k, v]));
2308        }
2309    }
2310    let expr = concat_list(struct_exprs)
2311        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2312    Ok(crate::column::Column::from_expr(expr, None))
2313}
2314
2315/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2316pub fn map_keys(column: &Column) -> Column {
2317    column.clone().map_keys()
2318}
2319
2320/// Extract values from a map column (PySpark map_values).
2321pub fn map_values(column: &Column) -> Column {
2322    column.clone().map_values()
2323}
2324
2325/// Return map as list of structs {key, value} (PySpark map_entries).
2326pub fn map_entries(column: &Column) -> Column {
2327    column.clone().map_entries()
2328}
2329
2330/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2331pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2332    keys.clone().map_from_arrays(values)
2333}
2334
2335/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2336pub fn map_concat(a: &Column, b: &Column) -> Column {
2337    a.clone().map_concat(b)
2338}
2339
2340/// Array of structs {key, value} to map (PySpark map_from_entries).
2341pub fn map_from_entries(column: &Column) -> Column {
2342    column.clone().map_from_entries()
2343}
2344
2345/// True if map contains key (PySpark map_contains_key).
2346pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2347    map_col.clone().map_contains_key(key)
2348}
2349
2350/// Get value for key from map, or null (PySpark get).
2351pub fn get(map_col: &Column, key: &Column) -> Column {
2352    map_col.clone().get(key)
2353}
2354
2355/// Filter map entries by predicate (PySpark map_filter).
2356pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2357    map_col.clone().map_filter(predicate)
2358}
2359
2360/// Merge two maps by key with merge function (PySpark map_zip_with).
2361pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2362    map1.clone().map_zip_with(map2, merge)
2363}
2364
2365/// Convenience: zip_with with coalesce(left, right) merge.
2366pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2367    use polars::prelude::col;
2368    let left_field = col("").struct_().field_by_name("left");
2369    let right_field = col("").struct_().field_by_name("right");
2370    let merge = crate::column::Column::from_expr(
2371        coalesce(&[
2372            &crate::column::Column::from_expr(left_field, None),
2373            &crate::column::Column::from_expr(right_field, None),
2374        ])
2375        .into_expr(),
2376        None,
2377    );
2378    left.clone().zip_with(right, merge.into_expr())
2379}
2380
2381/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2382pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2383    use polars::prelude::col;
2384    let v1 = col("").struct_().field_by_name("value1");
2385    let v2 = col("").struct_().field_by_name("value2");
2386    let merge = coalesce(&[
2387        &crate::column::Column::from_expr(v1, None),
2388        &crate::column::Column::from_expr(v2, None),
2389    ])
2390    .into_expr();
2391    map1.clone().map_zip_with(map2, merge)
2392}
2393
2394/// Convenience: map_filter with value > threshold predicate.
2395pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2396    use polars::prelude::{col, lit};
2397    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2398    map_col.clone().map_filter(pred)
2399}
2400
2401/// Create struct from columns using column names as field names (PySpark struct).
2402pub fn struct_(columns: &[&Column]) -> Column {
2403    use polars::prelude::as_struct;
2404    if columns.is_empty() {
2405        panic!("struct requires at least one column");
2406    }
2407    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2408    crate::column::Column::from_expr(as_struct(exprs), None)
2409}
2410
2411/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2412pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2413    use polars::prelude::as_struct;
2414    if pairs.is_empty() {
2415        panic!("named_struct requires at least one (name, column) pair");
2416    }
2417    let exprs: Vec<Expr> = pairs
2418        .iter()
2419        .map(|(name, col)| col.expr().clone().alias(*name))
2420        .collect();
2421    crate::column::Column::from_expr(as_struct(exprs), None)
2422}
2423
2424/// Append element to end of list (PySpark array_append).
2425pub fn array_append(array: &Column, elem: &Column) -> Column {
2426    array.clone().array_append(elem)
2427}
2428
2429/// Prepend element to start of list (PySpark array_prepend).
2430pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2431    array.clone().array_prepend(elem)
2432}
2433
2434/// Insert element at 1-based position (PySpark array_insert).
2435pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2436    array.clone().array_insert(pos, elem)
2437}
2438
2439/// Elements in first array not in second (PySpark array_except).
2440pub fn array_except(a: &Column, b: &Column) -> Column {
2441    a.clone().array_except(b)
2442}
2443
2444/// Elements in both arrays (PySpark array_intersect).
2445pub fn array_intersect(a: &Column, b: &Column) -> Column {
2446    a.clone().array_intersect(b)
2447}
2448
2449/// Distinct elements from both arrays (PySpark array_union).
2450pub fn array_union(a: &Column, b: &Column) -> Column {
2451    a.clone().array_union(b)
2452}
2453
2454/// Zip two arrays element-wise with merge function (PySpark zip_with).
2455pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2456    left.clone().zip_with(right, merge)
2457}
2458
2459/// Extract JSON path from string column (PySpark get_json_object).
2460pub fn get_json_object(column: &Column, path: &str) -> Column {
2461    column.clone().get_json_object(path)
2462}
2463
2464/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2465pub fn json_object_keys(column: &Column) -> Column {
2466    column.clone().json_object_keys()
2467}
2468
2469/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2470pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2471    column.clone().json_tuple(keys)
2472}
2473
2474/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2475pub fn from_csv(column: &Column) -> Column {
2476    column.clone().from_csv()
2477}
2478
2479/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2480pub fn to_csv(column: &Column) -> Column {
2481    column.clone().to_csv()
2482}
2483
2484/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2485pub fn schema_of_csv(_column: &Column) -> Column {
2486    Column::from_expr(
2487        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2488        Some("schema_of_csv".to_string()),
2489    )
2490}
2491
2492/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2493pub fn schema_of_json(_column: &Column) -> Column {
2494    Column::from_expr(
2495        lit("STRUCT<>".to_string()),
2496        Some("schema_of_json".to_string()),
2497    )
2498}
2499
2500/// Parse string column as JSON into struct (PySpark from_json).
2501pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2502    column.clone().from_json(schema)
2503}
2504
2505/// Serialize struct column to JSON string (PySpark to_json).
2506pub fn to_json(column: &Column) -> Column {
2507    column.clone().to_json()
2508}
2509
2510/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2511pub fn isin(column: &Column, other: &Column) -> Column {
2512    column.clone().isin(other)
2513}
2514
2515/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2516pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2517    let s = Series::from_iter(values.iter().cloned());
2518    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2519}
2520
2521/// Check if column values are in the given string slice (PySpark isin with literal list).
2522pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2523    let s: Series = Series::from_iter(values.iter().copied());
2524    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2525}
2526
2527/// Percent-decode URL-encoded string (PySpark url_decode).
2528pub fn url_decode(column: &Column) -> Column {
2529    column.clone().url_decode()
2530}
2531
2532/// Percent-encode string for URL (PySpark url_encode).
2533pub fn url_encode(column: &Column) -> Column {
2534    column.clone().url_encode()
2535}
2536
2537/// Bitwise left shift (PySpark shiftLeft). col << n.
2538pub fn shift_left(column: &Column, n: i32) -> Column {
2539    column.clone().shift_left(n)
2540}
2541
2542/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2543pub fn shift_right(column: &Column, n: i32) -> Column {
2544    column.clone().shift_right(n)
2545}
2546
2547/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2548pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2549    column.clone().shift_right_unsigned(n)
2550}
2551
2552/// Session/library version string (PySpark version).
2553pub fn version() -> Column {
2554    Column::from_expr(
2555        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2556        None,
2557    )
2558}
2559
2560/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2561pub fn equal_null(left: &Column, right: &Column) -> Column {
2562    left.clone().eq_null_safe(right)
2563}
2564
2565/// Length of JSON array at path (PySpark json_array_length).
2566pub fn json_array_length(column: &Column, path: &str) -> Column {
2567    column.clone().json_array_length(path)
2568}
2569
2570/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2571/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2572pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2573    column.clone().parse_url(part, key)
2574}
2575
2576/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2577pub fn hash(columns: &[&Column]) -> Column {
2578    use polars::prelude::*;
2579    if columns.is_empty() {
2580        return crate::column::Column::from_expr(lit(0i64), None);
2581    }
2582    if columns.len() == 1 {
2583        return columns[0].clone().hash();
2584    }
2585    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2586    let struct_expr = polars::prelude::as_struct(exprs);
2587    let name = columns[0].name().to_string();
2588    let expr = struct_expr.map(
2589        crate::udfs::apply_hash_struct,
2590        GetOutput::from_type(DataType::Int64),
2591    );
2592    crate::column::Column::from_expr(expr, Some(name))
2593}
2594
2595/// Stack columns into struct (PySpark stack). Alias for struct_.
2596pub fn stack(columns: &[&Column]) -> Column {
2597    struct_(columns)
2598}
2599
2600#[cfg(test)]
2601mod tests {
2602    use super::*;
2603    use polars::prelude::{df, IntoLazy};
2604
2605    #[test]
2606    fn test_col_creates_column() {
2607        let column = col("test");
2608        assert_eq!(column.name(), "test");
2609    }
2610
2611    #[test]
2612    fn test_lit_i32() {
2613        let column = lit_i32(42);
2614        // The column should have a default name since it's a literal
2615        assert_eq!(column.name(), "<expr>");
2616    }
2617
2618    #[test]
2619    fn test_lit_i64() {
2620        let column = lit_i64(123456789012345i64);
2621        assert_eq!(column.name(), "<expr>");
2622    }
2623
2624    #[test]
2625    fn test_lit_f64() {
2626        let column = lit_f64(std::f64::consts::PI);
2627        assert_eq!(column.name(), "<expr>");
2628    }
2629
2630    #[test]
2631    fn test_lit_bool() {
2632        let column = lit_bool(true);
2633        assert_eq!(column.name(), "<expr>");
2634    }
2635
2636    #[test]
2637    fn test_lit_str() {
2638        let column = lit_str("hello");
2639        assert_eq!(column.name(), "<expr>");
2640    }
2641
2642    #[test]
2643    fn test_count_aggregation() {
2644        let column = col("value");
2645        let result = count(&column);
2646        assert_eq!(result.name(), "count");
2647    }
2648
2649    #[test]
2650    fn test_sum_aggregation() {
2651        let column = col("value");
2652        let result = sum(&column);
2653        assert_eq!(result.name(), "sum");
2654    }
2655
2656    #[test]
2657    fn test_avg_aggregation() {
2658        let column = col("value");
2659        let result = avg(&column);
2660        assert_eq!(result.name(), "avg");
2661    }
2662
2663    #[test]
2664    fn test_max_aggregation() {
2665        let column = col("value");
2666        let result = max(&column);
2667        assert_eq!(result.name(), "max");
2668    }
2669
2670    #[test]
2671    fn test_min_aggregation() {
2672        let column = col("value");
2673        let result = min(&column);
2674        assert_eq!(result.name(), "min");
2675    }
2676
2677    #[test]
2678    fn test_when_then_otherwise() {
2679        // Create a simple DataFrame
2680        let df = df!(
2681            "age" => &[15, 25, 35]
2682        )
2683        .unwrap();
2684
2685        // Build a when-then-otherwise expression
2686        let age_col = col("age");
2687        let condition = age_col.gt(polars::prelude::lit(18));
2688        let result = when(&condition)
2689            .then(&lit_str("adult"))
2690            .otherwise(&lit_str("minor"));
2691
2692        // Apply the expression
2693        let result_df = df
2694            .lazy()
2695            .with_column(result.into_expr().alias("status"))
2696            .collect()
2697            .unwrap();
2698
2699        // Verify the result
2700        let status_col = result_df.column("status").unwrap();
2701        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2702
2703        assert_eq!(values[0], Some("minor")); // age 15 < 18
2704        assert_eq!(values[1], Some("adult")); // age 25 > 18
2705        assert_eq!(values[2], Some("adult")); // age 35 > 18
2706    }
2707
2708    #[test]
2709    fn test_coalesce_returns_first_non_null() {
2710        // Create a DataFrame with some nulls
2711        let df = df!(
2712            "a" => &[Some(1), None, None],
2713            "b" => &[None, Some(2), None],
2714            "c" => &[None, None, Some(3)]
2715        )
2716        .unwrap();
2717
2718        let col_a = col("a");
2719        let col_b = col("b");
2720        let col_c = col("c");
2721        let result = coalesce(&[&col_a, &col_b, &col_c]);
2722
2723        // Apply the expression
2724        let result_df = df
2725            .lazy()
2726            .with_column(result.into_expr().alias("coalesced"))
2727            .collect()
2728            .unwrap();
2729
2730        // Verify the result
2731        let coalesced_col = result_df.column("coalesced").unwrap();
2732        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2733
2734        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2735        assert_eq!(values[1], Some(2)); // First non-null is 'b'
2736        assert_eq!(values[2], Some(3)); // First non-null is 'c'
2737    }
2738
2739    #[test]
2740    fn test_coalesce_with_literal_fallback() {
2741        // Create a DataFrame with all nulls in one row
2742        let df = df!(
2743            "a" => &[Some(1), None],
2744            "b" => &[None::<i32>, None::<i32>]
2745        )
2746        .unwrap();
2747
2748        let col_a = col("a");
2749        let col_b = col("b");
2750        let fallback = lit_i32(0);
2751        let result = coalesce(&[&col_a, &col_b, &fallback]);
2752
2753        // Apply the expression
2754        let result_df = df
2755            .lazy()
2756            .with_column(result.into_expr().alias("coalesced"))
2757            .collect()
2758            .unwrap();
2759
2760        // Verify the result
2761        let coalesced_col = result_df.column("coalesced").unwrap();
2762        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2763
2764        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2765        assert_eq!(values[1], Some(0)); // All nulls, use fallback
2766    }
2767
2768    #[test]
2769    #[should_panic(expected = "coalesce requires at least one column")]
2770    fn test_coalesce_empty_panics() {
2771        let columns: [&Column; 0] = [];
2772        let _ = coalesce(&columns);
2773    }
2774}