Skip to main content

robin_sparkless/
functions.rs

1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5// -----------------------------------------------------------------------------
6// -----------------------------------------------------------------------------
7
8/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
9#[derive(Debug, Clone)]
10pub struct SortOrder {
11    pub(crate) expr: Expr,
12    pub(crate) descending: bool,
13    pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17    pub fn expr(&self) -> &Expr {
18        &self.expr
19    }
20}
21
22/// Ascending sort, nulls first (Spark default for ASC).
23pub fn asc(column: &Column) -> SortOrder {
24    SortOrder {
25        expr: column.expr().clone(),
26        descending: false,
27        nulls_last: false,
28    }
29}
30
31/// Ascending sort, nulls first.
32pub fn asc_nulls_first(column: &Column) -> SortOrder {
33    SortOrder {
34        expr: column.expr().clone(),
35        descending: false,
36        nulls_last: false,
37    }
38}
39
40/// Ascending sort, nulls last.
41pub fn asc_nulls_last(column: &Column) -> SortOrder {
42    SortOrder {
43        expr: column.expr().clone(),
44        descending: false,
45        nulls_last: true,
46    }
47}
48
49/// Descending sort, nulls last (Spark default for DESC).
50pub fn desc(column: &Column) -> SortOrder {
51    SortOrder {
52        expr: column.expr().clone(),
53        descending: true,
54        nulls_last: true,
55    }
56}
57
58/// Descending sort, nulls first.
59pub fn desc_nulls_first(column: &Column) -> SortOrder {
60    SortOrder {
61        expr: column.expr().clone(),
62        descending: true,
63        nulls_last: false,
64    }
65}
66
67/// Descending sort, nulls last.
68pub fn desc_nulls_last(column: &Column) -> SortOrder {
69    SortOrder {
70        expr: column.expr().clone(),
71        descending: true,
72        nulls_last: true,
73    }
74}
75
76// -----------------------------------------------------------------------------
77
78/// Parse PySpark-like type name to Polars DataType.
79pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80    let s = name.trim().to_lowercase();
81    Ok(match s.as_str() {
82        "int" | "integer" => DataType::Int32,
83        "long" | "bigint" => DataType::Int64,
84        "float" => DataType::Float32,
85        "double" => DataType::Float64,
86        "string" | "str" => DataType::String,
87        "boolean" | "bool" => DataType::Boolean,
88        "date" => DataType::Date,
89        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
90        _ => return Err(format!("unknown type name: {name}")),
91    })
92}
93
94/// Get a column by name
95pub fn col(name: &str) -> Column {
96    Column::new(name.to_string())
97}
98
99/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
100pub fn grouping(column: &Column) -> Column {
101    let _ = column;
102    Column::from_expr(lit(0i32), Some("grouping".to_string()))
103}
104
105/// Grouping set id (PySpark grouping_id). Stub: returns 0.
106pub fn grouping_id(_columns: &[Column]) -> Column {
107    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
108}
109
110/// Create a literal column from a value
111pub fn lit_i32(value: i32) -> Column {
112    let expr: Expr = lit(value);
113    Column::from_expr(expr, None)
114}
115
116pub fn lit_i64(value: i64) -> Column {
117    let expr: Expr = lit(value);
118    Column::from_expr(expr, None)
119}
120
121pub fn lit_f64(value: f64) -> Column {
122    let expr: Expr = lit(value);
123    Column::from_expr(expr, None)
124}
125
126pub fn lit_bool(value: bool) -> Column {
127    let expr: Expr = lit(value);
128    Column::from_expr(expr, None)
129}
130
131pub fn lit_str(value: &str) -> Column {
132    let expr: Expr = lit(value);
133    Column::from_expr(expr, None)
134}
135
136/// Count aggregation
137pub fn count(col: &Column) -> Column {
138    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
139}
140
141/// Sum aggregation
142pub fn sum(col: &Column) -> Column {
143    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
144}
145
146/// Average aggregation
147pub fn avg(col: &Column) -> Column {
148    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
149}
150
151/// Alias for avg (PySpark mean).
152pub fn mean(col: &Column) -> Column {
153    avg(col)
154}
155
156/// Maximum aggregation
157pub fn max(col: &Column) -> Column {
158    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
159}
160
161/// Minimum aggregation
162pub fn min(col: &Column) -> Column {
163    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
164}
165
166/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
167pub fn stddev(col: &Column) -> Column {
168    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
169}
170
171/// Variance (sample) aggregation (PySpark variance / var_samp)
172pub fn variance(col: &Column) -> Column {
173    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
174}
175
176/// Population standard deviation (ddof=0). PySpark stddev_pop.
177pub fn stddev_pop(col: &Column) -> Column {
178    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
179}
180
181/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
182pub fn stddev_samp(col: &Column) -> Column {
183    stddev(col)
184}
185
186/// Alias for stddev (PySpark std).
187pub fn std(col: &Column) -> Column {
188    stddev(col)
189}
190
191/// Population variance (ddof=0). PySpark var_pop.
192pub fn var_pop(col: &Column) -> Column {
193    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
194}
195
196/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
197pub fn var_samp(col: &Column) -> Column {
198    variance(col)
199}
200
201/// Median aggregation. PySpark median.
202pub fn median(col: &Column) -> Column {
203    use polars::prelude::QuantileMethod;
204    Column::from_expr(
205        col.expr()
206            .clone()
207            .quantile(lit(0.5), QuantileMethod::Linear),
208        Some("median".to_string()),
209    )
210}
211
212/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0.
213pub fn approx_percentile(col: &Column, percentage: f64) -> Column {
214    use polars::prelude::QuantileMethod;
215    Column::from_expr(
216        col.expr()
217            .clone()
218            .quantile(lit(percentage), QuantileMethod::Linear),
219        Some(format!("approx_percentile({percentage})")),
220    )
221}
222
223/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
224pub fn percentile_approx(col: &Column, percentage: f64) -> Column {
225    approx_percentile(col, percentage)
226}
227
228/// Mode aggregation - most frequent value. PySpark mode.
229pub fn mode(col: &Column) -> Column {
230    col.clone().mode()
231}
232
233/// Count distinct aggregation (PySpark countDistinct)
234pub fn count_distinct(col: &Column) -> Column {
235    use polars::prelude::DataType;
236    Column::from_expr(
237        col.expr().clone().n_unique().cast(DataType::Int64),
238        Some("count_distinct".to_string()),
239    )
240}
241
242/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
243pub fn kurtosis(col: &Column) -> Column {
244    Column::from_expr(
245        col.expr()
246            .clone()
247            .cast(DataType::Float64)
248            .kurtosis(true, true),
249        Some("kurtosis".to_string()),
250    )
251}
252
253/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
254pub fn skewness(col: &Column) -> Column {
255    Column::from_expr(
256        col.expr().clone().cast(DataType::Float64).skew(true),
257        Some("skewness".to_string()),
258    )
259}
260
261/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
262pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
263    use polars::prelude::{col as pl_col, len};
264    let c1 = pl_col(col1).cast(DataType::Float64);
265    let c2 = pl_col(col2).cast(DataType::Float64);
266    let n = len().cast(DataType::Float64);
267    let sum_ab = (c1.clone() * c2.clone()).sum();
268    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
269    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
270    (sum_ab - sum_a * sum_b / n.clone()) / n
271}
272
273/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
274pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
275    use polars::prelude::{col as pl_col, len, lit, when};
276    let c1 = pl_col(col1).cast(DataType::Float64);
277    let c2 = pl_col(col2).cast(DataType::Float64);
278    let n = len().cast(DataType::Float64);
279    let sum_ab = (c1.clone() * c2.clone()).sum();
280    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
281    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
282    when(len().gt(lit(1)))
283        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
284        .otherwise(lit(f64::NAN))
285}
286
287/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
288pub fn corr_expr(col1: &str, col2: &str) -> Expr {
289    use polars::prelude::{col as pl_col, len, lit, when};
290    let c1 = pl_col(col1).cast(DataType::Float64);
291    let c2 = pl_col(col2).cast(DataType::Float64);
292    let n = len().cast(DataType::Float64);
293    let n1 = (len() - lit(1)).cast(DataType::Float64);
294    let sum_ab = (c1.clone() * c2.clone()).sum();
295    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
296    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
297    let sum_a2 = (c1.clone() * c1).sum();
298    let sum_b2 = (c2.clone() * c2).sum();
299    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
300    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
301    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
302    let std_a = var_a.sqrt();
303    let std_b = var_b.sqrt();
304    when(len().gt(lit(1)))
305        .then(cov_samp / (std_a * std_b))
306        .otherwise(lit(f64::NAN))
307}
308
309// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
310
311fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312    use polars::prelude::col as pl_col;
313    let y = pl_col(y_col).cast(DataType::Float64);
314    let x = pl_col(x_col).cast(DataType::Float64);
315    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316    let n = y
317        .clone()
318        .filter(cond.clone())
319        .count()
320        .cast(DataType::Float64);
321    let sum_x = x.clone().filter(cond.clone()).sum();
322    let sum_y = y.clone().filter(cond.clone()).sum();
323    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325    let sum_xy = (x * y).filter(cond).sum();
326    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
330pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331    let (n, ..) = regr_cond_and_sums(y_col, x_col);
332    n
333}
334
335/// Regression: average of x (PySpark regr_avgx).
336pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337    use polars::prelude::{lit, when};
338    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339    when(n.clone().gt(lit(0.0)))
340        .then(sum_x / n)
341        .otherwise(lit(f64::NAN))
342}
343
344/// Regression: average of y (PySpark regr_avgy).
345pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346    use polars::prelude::{lit, when};
347    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348    when(n.clone().gt(lit(0.0)))
349        .then(sum_y / n)
350        .otherwise(lit(f64::NAN))
351}
352
353/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
354pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355    use polars::prelude::{lit, when};
356    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357    when(n.clone().gt(lit(0.0)))
358        .then(sum_xx - sum_x.clone() * sum_x / n)
359        .otherwise(lit(f64::NAN))
360}
361
362/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
363pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364    use polars::prelude::{lit, when};
365    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366    when(n.clone().gt(lit(0.0)))
367        .then(sum_yy - sum_y.clone() * sum_y / n)
368        .otherwise(lit(f64::NAN))
369}
370
371/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
372pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373    use polars::prelude::{lit, when};
374    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375    when(n.clone().gt(lit(0.0)))
376        .then(sum_xy - sum_x * sum_y / n)
377        .otherwise(lit(f64::NAN))
378}
379
380/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
381pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382    use polars::prelude::{lit, when};
383    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387        .then(regr_sxy / regr_sxx)
388        .otherwise(lit(f64::NAN))
389}
390
391/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
392pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393    use polars::prelude::{lit, when};
394    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397    let slope = regr_sxy.clone() / regr_sxx.clone();
398    let avg_y = sum_y / n.clone();
399    let avg_x = sum_x / n.clone();
400    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401        .then(avg_y - slope * avg_x)
402        .otherwise(lit(f64::NAN))
403}
404
405/// Regression R-squared (PySpark regr_r2).
406pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407    use polars::prelude::{lit, when};
408    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411    let regr_sxy = sum_xy - sum_x * sum_y / n;
412    when(
413        regr_sxx
414            .clone()
415            .gt(lit(0.0))
416            .and(regr_syy.clone().gt(lit(0.0))),
417    )
418    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419    .otherwise(lit(f64::NAN))
420}
421
422/// PySpark-style conditional expression builder.
423///
424/// # Example
425/// ```
426/// use robin_sparkless::{col, lit_i64, lit_str, when};
427///
428/// // when(condition).then(value).otherwise(fallback)
429/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
430///     .then(&lit_str("adult"))
431///     .otherwise(&lit_str("minor"));
432/// ```
433pub fn when(condition: &Column) -> WhenBuilder {
434    WhenBuilder::new(condition.expr().clone())
435}
436
437/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
438pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439    use polars::prelude::*;
440    let null_expr = Expr::Literal(LiteralValue::Null);
441    let expr = polars::prelude::when(condition.expr().clone())
442        .then(value.expr().clone())
443        .otherwise(null_expr);
444    crate::column::Column::from_expr(expr, None)
445}
446
447/// Builder for when-then-otherwise expressions
448pub struct WhenBuilder {
449    condition: Expr,
450}
451
452impl WhenBuilder {
453    fn new(condition: Expr) -> Self {
454        WhenBuilder { condition }
455    }
456
457    /// Specify the value when condition is true
458    pub fn then(self, value: &Column) -> ThenBuilder {
459        use polars::prelude::*;
460        let when_then = when(self.condition).then(value.expr().clone());
461        ThenBuilder::new(when_then)
462    }
463
464    /// Specify the value when condition is false
465    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
466    /// For this implementation, we require .then() to be called explicitly.
467    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
468    pub fn otherwise(self, _value: &Column) -> Column {
469        // This should not be called directly - when().otherwise() without .then() is not supported
470        // Users should use when(cond).then(val1).otherwise(val2)
471        panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
472    }
473}
474
475/// Builder for chaining when-then clauses before finalizing with otherwise
476pub struct ThenBuilder {
477    when_then: polars::prelude::Then, // The Polars WhenThen state
478}
479
480impl ThenBuilder {
481    fn new(when_then: polars::prelude::Then) -> Self {
482        ThenBuilder { when_then }
483    }
484
485    /// Chain an additional when-then clause
486    /// Note: Chaining multiple when-then clauses is not yet fully supported.
487    /// For now, use a single when().then().otherwise() pattern.
488    pub fn when(self, _condition: &Column) -> ThenBuilder {
489        // TODO: Implement proper chaining support
490        // For now, return self to allow compilation but chaining won't work correctly
491        self
492    }
493
494    /// Finalize the expression with the fallback value
495    pub fn otherwise(self, value: &Column) -> Column {
496        let expr = self.when_then.otherwise(value.expr().clone());
497        crate::column::Column::from_expr(expr, None)
498    }
499}
500
501/// Convert string column to uppercase (PySpark upper)
502pub fn upper(column: &Column) -> Column {
503    column.clone().upper()
504}
505
506/// Convert string column to lowercase (PySpark lower)
507pub fn lower(column: &Column) -> Column {
508    column.clone().lower()
509}
510
511/// Substring with 1-based start (PySpark substring semantics)
512pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
513    column.clone().substr(start, length)
514}
515
516/// String length in characters (PySpark length)
517pub fn length(column: &Column) -> Column {
518    column.clone().length()
519}
520
521/// Trim leading and trailing whitespace (PySpark trim)
522pub fn trim(column: &Column) -> Column {
523    column.clone().trim()
524}
525
526/// Trim leading whitespace (PySpark ltrim)
527pub fn ltrim(column: &Column) -> Column {
528    column.clone().ltrim()
529}
530
531/// Trim trailing whitespace (PySpark rtrim)
532pub fn rtrim(column: &Column) -> Column {
533    column.clone().rtrim()
534}
535
536/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
537pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
538    column.clone().btrim(trim_str)
539}
540
541/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
542pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
543    column.clone().locate(substr, pos)
544}
545
546/// Base conversion (PySpark conv). num from from_base to to_base.
547pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
548    column.clone().conv(from_base, to_base)
549}
550
551/// Convert to hex string (PySpark hex).
552pub fn hex(column: &Column) -> Column {
553    column.clone().hex()
554}
555
556/// Convert hex string to binary/string (PySpark unhex).
557pub fn unhex(column: &Column) -> Column {
558    column.clone().unhex()
559}
560
561/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
562pub fn encode(column: &Column, charset: &str) -> Column {
563    column.clone().encode(charset)
564}
565
566/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
567pub fn decode(column: &Column, charset: &str) -> Column {
568    column.clone().decode(charset)
569}
570
571/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
572pub fn to_binary(column: &Column, fmt: &str) -> Column {
573    column.clone().to_binary(fmt)
574}
575
576/// Try convert to binary; null on failure (PySpark try_to_binary).
577pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
578    column.clone().try_to_binary(fmt)
579}
580
581/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
582pub fn aes_encrypt(column: &Column, key: &str) -> Column {
583    column.clone().aes_encrypt(key)
584}
585
586/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
587pub fn aes_decrypt(column: &Column, key: &str) -> Column {
588    column.clone().aes_decrypt(key)
589}
590
591/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
592pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
593    column.clone().try_aes_decrypt(key)
594}
595
596/// Convert integer to binary string (PySpark bin).
597pub fn bin(column: &Column) -> Column {
598    column.clone().bin()
599}
600
601/// Get bit at 0-based position (PySpark getbit).
602pub fn getbit(column: &Column, pos: i64) -> Column {
603    column.clone().getbit(pos)
604}
605
606/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
607pub fn bit_and(left: &Column, right: &Column) -> Column {
608    left.clone().bit_and(right)
609}
610
611/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
612pub fn bit_or(left: &Column, right: &Column) -> Column {
613    left.clone().bit_or(right)
614}
615
616/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
617pub fn bit_xor(left: &Column, right: &Column) -> Column {
618    left.clone().bit_xor(right)
619}
620
621/// Count of set bits in the integer representation (PySpark bit_count).
622pub fn bit_count(column: &Column) -> Column {
623    column.clone().bit_count()
624}
625
626/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
627pub fn bitwise_not(column: &Column) -> Column {
628    column.clone().bitwise_not()
629}
630
631// --- Bitmap (PySpark 3.5+) ---
632
633/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
634pub fn bitmap_bit_position(column: &Column) -> Column {
635    use polars::prelude::DataType;
636    let expr = column.expr().clone().cast(DataType::Int32);
637    Column::from_expr(expr, None)
638}
639
640/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
641pub fn bitmap_bucket_number(column: &Column) -> Column {
642    use polars::prelude::DataType;
643    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
644    Column::from_expr(expr, None)
645}
646
647/// Count set bits in a bitmap binary column (PySpark bitmap_count).
648pub fn bitmap_count(column: &Column) -> Column {
649    use polars::prelude::{DataType, GetOutput};
650    let expr = column.expr().clone().map(
651        crate::udfs::apply_bitmap_count,
652        GetOutput::from_type(DataType::Int64),
653    );
654    Column::from_expr(expr, None)
655}
656
657/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
658/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
659pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
660    use polars::prelude::{DataType, GetOutput};
661    column.expr().clone().implode().map(
662        crate::udfs::apply_bitmap_construct_agg,
663        GetOutput::from_type(DataType::Binary),
664    )
665}
666
667/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
668pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
669    use polars::prelude::{DataType, GetOutput};
670    column.expr().clone().implode().map(
671        crate::udfs::apply_bitmap_or_agg,
672        GetOutput::from_type(DataType::Binary),
673    )
674}
675
676/// Alias for getbit (PySpark bit_get).
677pub fn bit_get(column: &Column, pos: i64) -> Column {
678    getbit(column, pos)
679}
680
681/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
682/// When err_msg is Some, it is used in the error message when assertion fails.
683pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
684    column.clone().assert_true(err_msg)
685}
686
687/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
688pub fn raise_error(message: &str) -> Column {
689    let msg = message.to_string();
690    let expr = lit(0i64).map(
691        move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
692            Err(PolarsError::ComputeError(
693                format!("raise_error: {msg}").into(),
694            ))
695        },
696        GetOutput::from_type(DataType::Int64),
697    );
698    Column::from_expr(expr, Some("raise_error".to_string()))
699}
700
701/// Broadcast hint - no-op that returns the same DataFrame (PySpark broadcast).
702pub fn broadcast(df: &DataFrame) -> DataFrame {
703    df.clone()
704}
705
706/// Stub partition id - always 0 (PySpark spark_partition_id).
707pub fn spark_partition_id() -> Column {
708    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
709}
710
711/// Stub input file name - empty string (PySpark input_file_name).
712pub fn input_file_name() -> Column {
713    Column::from_expr(lit(""), Some("input_file_name".to_string()))
714}
715
716/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
717/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
718pub fn monotonically_increasing_id() -> Column {
719    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
720}
721
722/// Current catalog name stub (PySpark current_catalog).
723pub fn current_catalog() -> Column {
724    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
725}
726
727/// Current database/schema name stub (PySpark current_database).
728pub fn current_database() -> Column {
729    Column::from_expr(lit("default"), Some("current_database".to_string()))
730}
731
732/// Current schema name stub (PySpark current_schema).
733pub fn current_schema() -> Column {
734    Column::from_expr(lit("default"), Some("current_schema".to_string()))
735}
736
737/// Current user stub (PySpark current_user).
738pub fn current_user() -> Column {
739    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
740}
741
742/// User stub (PySpark user).
743pub fn user() -> Column {
744    Column::from_expr(lit("unknown"), Some("user".to_string()))
745}
746
747/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
748/// When added via with_column, generates one distinct value per row (PySpark-like).
749pub fn rand(seed: Option<u64>) -> Column {
750    Column::from_rand(seed)
751}
752
753/// Random standard normal per row, with optional seed (PySpark randn).
754/// When added via with_column, generates one distinct value per row (PySpark-like).
755pub fn randn(seed: Option<u64>) -> Column {
756    Column::from_randn(seed)
757}
758
759/// True if two arrays have any element in common (PySpark arrays_overlap).
760pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
761    left.clone().arrays_overlap(right)
762}
763
764/// Zip arrays into array of structs (PySpark arrays_zip).
765pub fn arrays_zip(left: &Column, right: &Column) -> Column {
766    left.clone().arrays_zip(right)
767}
768
769/// Explode; null/empty yields one row with null (PySpark explode_outer).
770pub fn explode_outer(column: &Column) -> Column {
771    column.clone().explode_outer()
772}
773
774/// Posexplode with null preservation (PySpark posexplode_outer).
775pub fn posexplode_outer(column: &Column) -> (Column, Column) {
776    column.clone().posexplode_outer()
777}
778
779/// Collect to array (PySpark array_agg).
780pub fn array_agg(column: &Column) -> Column {
781    column.clone().array_agg()
782}
783
784/// Transform map keys by expr (PySpark transform_keys).
785pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
786    column.clone().transform_keys(key_expr)
787}
788
789/// Transform map values by expr (PySpark transform_values).
790pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
791    column.clone().transform_values(value_expr)
792}
793
794/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
795pub fn str_to_map(
796    column: &Column,
797    pair_delim: Option<&str>,
798    key_value_delim: Option<&str>,
799) -> Column {
800    let pd = pair_delim.unwrap_or(",");
801    let kvd = key_value_delim.unwrap_or(":");
802    column.clone().str_to_map(pd, kvd)
803}
804
805/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
806pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
807    column.clone().regexp_extract(pattern, group_index)
808}
809
810/// Replace first match of regex (PySpark regexp_replace)
811pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
812    column.clone().regexp_replace(pattern, replacement)
813}
814
815/// Split string by delimiter (PySpark split)
816pub fn split(column: &Column, delimiter: &str) -> Column {
817    column.clone().split(delimiter)
818}
819
820/// Title case (PySpark initcap)
821pub fn initcap(column: &Column) -> Column {
822    column.clone().initcap()
823}
824
825/// Extract all matches of regex (PySpark regexp_extract_all).
826pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
827    column.clone().regexp_extract_all(pattern)
828}
829
830/// Check if string matches regex (PySpark regexp_like / rlike).
831pub fn regexp_like(column: &Column, pattern: &str) -> Column {
832    column.clone().regexp_like(pattern)
833}
834
835/// Count of non-overlapping regex matches (PySpark regexp_count).
836pub fn regexp_count(column: &Column, pattern: &str) -> Column {
837    column.clone().regexp_count(pattern)
838}
839
840/// First substring matching regex (PySpark regexp_substr). Null if no match.
841pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
842    column.clone().regexp_substr(pattern)
843}
844
845/// Split by delimiter and return 1-based part (PySpark split_part).
846pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
847    column.clone().split_part(delimiter, part_num)
848}
849
850/// 1-based position of first regex match (PySpark regexp_instr).
851pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
852    column.clone().regexp_instr(pattern, group_idx)
853}
854
855/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
856pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
857    str_column.clone().find_in_set(set_column)
858}
859
860/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
861pub fn format_string(format: &str, columns: &[&Column]) -> Column {
862    use polars::prelude::*;
863    if columns.is_empty() {
864        panic!("format_string needs at least one column");
865    }
866    let format_owned = format.to_string();
867    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
868    let expr = columns[0].expr().clone().map_many(
869        move |cols| crate::udfs::apply_format_string(cols, &format_owned),
870        &args,
871        GetOutput::from_type(DataType::String),
872    );
873    crate::column::Column::from_expr(expr, None)
874}
875
876/// Alias for format_string (PySpark printf).
877pub fn printf(format: &str, columns: &[&Column]) -> Column {
878    format_string(format, columns)
879}
880
881/// Repeat string n times (PySpark repeat).
882pub fn repeat(column: &Column, n: i32) -> Column {
883    column.clone().repeat(n)
884}
885
886/// Reverse string (PySpark reverse).
887pub fn reverse(column: &Column) -> Column {
888    column.clone().reverse()
889}
890
891/// Find substring position 1-based; 0 if not found (PySpark instr).
892pub fn instr(column: &Column, substr: &str) -> Column {
893    column.clone().instr(substr)
894}
895
896/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
897pub fn position(substr: &str, column: &Column) -> Column {
898    column.clone().instr(substr)
899}
900
901/// ASCII value of first character (PySpark ascii). Returns Int32.
902pub fn ascii(column: &Column) -> Column {
903    column.clone().ascii()
904}
905
906/// Format numeric as string with fixed decimal places (PySpark format_number).
907pub fn format_number(column: &Column, decimals: u32) -> Column {
908    column.clone().format_number(decimals)
909}
910
911/// Replace substring at 1-based position (PySpark overlay). replace is literal.
912pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
913    column.clone().overlay(replace, pos, length)
914}
915
916/// Int to single-character string (PySpark char). Valid codepoint only.
917pub fn char(column: &Column) -> Column {
918    column.clone().char()
919}
920
921/// Alias for char (PySpark chr).
922pub fn chr(column: &Column) -> Column {
923    column.clone().chr()
924}
925
926/// Base64 encode string bytes (PySpark base64).
927pub fn base64(column: &Column) -> Column {
928    column.clone().base64()
929}
930
931/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
932pub fn unbase64(column: &Column) -> Column {
933    column.clone().unbase64()
934}
935
936/// SHA1 hash of string bytes, return hex string (PySpark sha1).
937pub fn sha1(column: &Column) -> Column {
938    column.clone().sha1()
939}
940
941/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
942pub fn sha2(column: &Column, bit_length: i32) -> Column {
943    column.clone().sha2(bit_length)
944}
945
946/// MD5 hash of string bytes, return hex string (PySpark md5).
947pub fn md5(column: &Column) -> Column {
948    column.clone().md5()
949}
950
951/// Left-pad string to length with pad char (PySpark lpad).
952pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
953    column.clone().lpad(length, pad)
954}
955
956/// Right-pad string to length with pad char (PySpark rpad).
957pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
958    column.clone().rpad(length, pad)
959}
960
961/// Character-by-character translation (PySpark translate).
962pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
963    column.clone().translate(from_str, to_str)
964}
965
966/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
967pub fn mask(
968    column: &Column,
969    upper_char: Option<char>,
970    lower_char: Option<char>,
971    digit_char: Option<char>,
972    other_char: Option<char>,
973) -> Column {
974    column
975        .clone()
976        .mask(upper_char, lower_char, digit_char, other_char)
977}
978
979/// Substring before/after nth delimiter (PySpark substring_index).
980pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
981    column.clone().substring_index(delimiter, count)
982}
983
984/// Leftmost n characters (PySpark left).
985pub fn left(column: &Column, n: i64) -> Column {
986    column.clone().left(n)
987}
988
989/// Rightmost n characters (PySpark right).
990pub fn right(column: &Column, n: i64) -> Column {
991    column.clone().right(n)
992}
993
994/// Replace all occurrences of search with replacement (literal). PySpark replace.
995pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
996    column.clone().replace(search, replacement)
997}
998
999/// True if string starts with prefix (PySpark startswith).
1000pub fn startswith(column: &Column, prefix: &str) -> Column {
1001    column.clone().startswith(prefix)
1002}
1003
1004/// True if string ends with suffix (PySpark endswith).
1005pub fn endswith(column: &Column, suffix: &str) -> Column {
1006    column.clone().endswith(suffix)
1007}
1008
1009/// True if string contains substring (literal). PySpark contains.
1010pub fn contains(column: &Column, substring: &str) -> Column {
1011    column.clone().contains(substring)
1012}
1013
1014/// SQL LIKE pattern (% any, _ one char). PySpark like.
1015/// When escape_char is Some(esc), esc + char treats that char as literal.
1016pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1017    column.clone().like(pattern, escape_char)
1018}
1019
1020/// Case-insensitive LIKE. PySpark ilike.
1021/// When escape_char is Some(esc), esc + char treats that char as literal.
1022pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1023    column.clone().ilike(pattern, escape_char)
1024}
1025
1026/// Alias for regexp_like. PySpark rlike / regexp.
1027pub fn rlike(column: &Column, pattern: &str) -> Column {
1028    column.clone().regexp_like(pattern)
1029}
1030
1031/// Alias for rlike (PySpark regexp).
1032pub fn regexp(column: &Column, pattern: &str) -> Column {
1033    rlike(column, pattern)
1034}
1035
1036/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1037pub fn soundex(column: &Column) -> Column {
1038    column.clone().soundex()
1039}
1040
1041/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1042pub fn levenshtein(column: &Column, other: &Column) -> Column {
1043    column.clone().levenshtein(other)
1044}
1045
1046/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1047pub fn crc32(column: &Column) -> Column {
1048    column.clone().crc32()
1049}
1050
1051/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1052pub fn xxhash64(column: &Column) -> Column {
1053    column.clone().xxhash64()
1054}
1055
1056/// Absolute value (PySpark abs)
1057pub fn abs(column: &Column) -> Column {
1058    column.clone().abs()
1059}
1060
1061/// Ceiling (PySpark ceil)
1062pub fn ceil(column: &Column) -> Column {
1063    column.clone().ceil()
1064}
1065
1066/// Floor (PySpark floor)
1067pub fn floor(column: &Column) -> Column {
1068    column.clone().floor()
1069}
1070
1071/// Round (PySpark round)
1072pub fn round(column: &Column, decimals: u32) -> Column {
1073    column.clone().round(decimals)
1074}
1075
1076/// Banker's rounding - round half to even (PySpark bround).
1077pub fn bround(column: &Column, scale: i32) -> Column {
1078    column.clone().bround(scale)
1079}
1080
1081/// Unary minus / negate (PySpark negate, negative).
1082pub fn negate(column: &Column) -> Column {
1083    column.clone().negate()
1084}
1085
1086/// Alias for negate. PySpark negative.
1087pub fn negative(column: &Column) -> Column {
1088    negate(column)
1089}
1090
1091/// Unary plus - no-op, returns column as-is (PySpark positive).
1092pub fn positive(column: &Column) -> Column {
1093    column.clone()
1094}
1095
1096/// Cotangent: 1/tan (PySpark cot).
1097pub fn cot(column: &Column) -> Column {
1098    column.clone().cot()
1099}
1100
1101/// Cosecant: 1/sin (PySpark csc).
1102pub fn csc(column: &Column) -> Column {
1103    column.clone().csc()
1104}
1105
1106/// Secant: 1/cos (PySpark sec).
1107pub fn sec(column: &Column) -> Column {
1108    column.clone().sec()
1109}
1110
1111/// Constant e = 2.718... (PySpark e).
1112pub fn e() -> Column {
1113    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1114}
1115
1116/// Constant pi = 3.14159... (PySpark pi).
1117pub fn pi() -> Column {
1118    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1119}
1120
1121/// Square root (PySpark sqrt)
1122pub fn sqrt(column: &Column) -> Column {
1123    column.clone().sqrt()
1124}
1125
1126/// Power (PySpark pow)
1127pub fn pow(column: &Column, exp: i64) -> Column {
1128    column.clone().pow(exp)
1129}
1130
1131/// Exponential (PySpark exp)
1132pub fn exp(column: &Column) -> Column {
1133    column.clone().exp()
1134}
1135
1136/// Natural logarithm (PySpark log with one arg)
1137pub fn log(column: &Column) -> Column {
1138    column.clone().log()
1139}
1140
1141/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1142pub fn log_with_base(column: &Column, base: f64) -> Column {
1143    crate::column::Column::from_expr(column.expr().clone().log(base), None)
1144}
1145
1146/// Sine in radians (PySpark sin)
1147pub fn sin(column: &Column) -> Column {
1148    column.clone().sin()
1149}
1150
1151/// Cosine in radians (PySpark cos)
1152pub fn cos(column: &Column) -> Column {
1153    column.clone().cos()
1154}
1155
1156/// Tangent in radians (PySpark tan)
1157pub fn tan(column: &Column) -> Column {
1158    column.clone().tan()
1159}
1160
1161/// Arc sine (PySpark asin)
1162pub fn asin(column: &Column) -> Column {
1163    column.clone().asin()
1164}
1165
1166/// Arc cosine (PySpark acos)
1167pub fn acos(column: &Column) -> Column {
1168    column.clone().acos()
1169}
1170
1171/// Arc tangent (PySpark atan)
1172pub fn atan(column: &Column) -> Column {
1173    column.clone().atan()
1174}
1175
1176/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1177pub fn atan2(y: &Column, x: &Column) -> Column {
1178    y.clone().atan2(x)
1179}
1180
1181/// Convert radians to degrees (PySpark degrees)
1182pub fn degrees(column: &Column) -> Column {
1183    column.clone().degrees()
1184}
1185
1186/// Convert degrees to radians (PySpark radians)
1187pub fn radians(column: &Column) -> Column {
1188    column.clone().radians()
1189}
1190
1191/// Sign of the number: -1, 0, or 1 (PySpark signum)
1192pub fn signum(column: &Column) -> Column {
1193    column.clone().signum()
1194}
1195
1196/// Alias for signum (PySpark sign).
1197pub fn sign(column: &Column) -> Column {
1198    signum(column)
1199}
1200
1201/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1202pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1203    let dtype = parse_type_name(type_name)?;
1204    Ok(Column::from_expr(
1205        column.expr().clone().strict_cast(dtype),
1206        None,
1207    ))
1208}
1209
1210/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1211pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1212    let dtype = parse_type_name(type_name)?;
1213    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1214}
1215
1216/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1217/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1218pub fn to_char(column: &Column, format: Option<&str>) -> Column {
1219    match format {
1220        Some(fmt) => column
1221            .clone()
1222            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt)),
1223        None => cast(column, "string").unwrap(),
1224    }
1225}
1226
1227/// Alias for to_char (PySpark to_varchar).
1228pub fn to_varchar(column: &Column, format: Option<&str>) -> Column {
1229    to_char(column, format)
1230}
1231
1232/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1233pub fn to_number(column: &Column, _format: Option<&str>) -> Column {
1234    cast(column, "double").unwrap()
1235}
1236
1237/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1238pub fn try_to_number(column: &Column, _format: Option<&str>) -> Column {
1239    try_cast(column, "double").unwrap()
1240}
1241
1242/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1243pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1244    use polars::prelude::{DataType, GetOutput, TimeUnit};
1245    match format {
1246        None => crate::cast(column, "timestamp"),
1247        Some(fmt) => {
1248            let fmt_owned = fmt.to_string();
1249            let expr = column.expr().clone().map(
1250                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), true),
1251                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1252            );
1253            Ok(crate::column::Column::from_expr(expr, None))
1254        }
1255    }
1256}
1257
1258/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1259pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Column {
1260    use polars::prelude::*;
1261    match format {
1262        None => try_cast(column, "timestamp").unwrap(),
1263        Some(fmt) => {
1264            let fmt_owned = fmt.to_string();
1265            let expr = column.expr().clone().map(
1266                move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), false),
1267                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1268            );
1269            crate::column::Column::from_expr(expr, None)
1270        }
1271    }
1272}
1273
1274/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1275pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1276    use polars::prelude::{DataType, GetOutput, TimeUnit};
1277    match format {
1278        None => crate::cast(column, "timestamp"),
1279        Some(fmt) => {
1280            let fmt_owned = fmt.to_string();
1281            let expr = column.expr().clone().map(
1282                move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1283                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1284            );
1285            Ok(crate::column::Column::from_expr(expr, None))
1286        }
1287    }
1288}
1289
1290/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1291pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1292    use polars::prelude::{DataType, GetOutput, TimeUnit};
1293    match format {
1294        None => crate::cast(column, "timestamp"),
1295        Some(fmt) => {
1296            let fmt_owned = fmt.to_string();
1297            let expr = column.expr().clone().map(
1298                move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1299                GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1300            );
1301            Ok(crate::column::Column::from_expr(expr, None))
1302        }
1303    }
1304}
1305
1306/// Division that returns null on divide-by-zero (PySpark try_divide).
1307pub fn try_divide(left: &Column, right: &Column) -> Column {
1308    use polars::prelude::*;
1309    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1310    let null_expr = Expr::Literal(LiteralValue::Null);
1311    let div_expr =
1312        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1313    let expr = polars::prelude::when(zero_cond)
1314        .then(null_expr)
1315        .otherwise(div_expr);
1316    crate::column::Column::from_expr(expr, None)
1317}
1318
1319/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1320pub fn try_add(left: &Column, right: &Column) -> Column {
1321    let args = [right.expr().clone()];
1322    let expr =
1323        left.expr()
1324            .clone()
1325            .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1326    Column::from_expr(expr, None)
1327}
1328
1329/// Subtract that returns null on overflow (PySpark try_subtract).
1330pub fn try_subtract(left: &Column, right: &Column) -> Column {
1331    let args = [right.expr().clone()];
1332    let expr = left.expr().clone().map_many(
1333        crate::udfs::apply_try_subtract,
1334        &args,
1335        GetOutput::same_type(),
1336    );
1337    Column::from_expr(expr, None)
1338}
1339
1340/// Multiply that returns null on overflow (PySpark try_multiply).
1341pub fn try_multiply(left: &Column, right: &Column) -> Column {
1342    let args = [right.expr().clone()];
1343    let expr = left.expr().clone().map_many(
1344        crate::udfs::apply_try_multiply,
1345        &args,
1346        GetOutput::same_type(),
1347    );
1348    Column::from_expr(expr, None)
1349}
1350
1351/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1352pub fn try_element_at(column: &Column, index: i64) -> Column {
1353    column.clone().element_at(index)
1354}
1355
1356/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1357pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1358    use polars::prelude::*;
1359    let v = value.expr().clone().cast(DataType::Float64);
1360    let min_expr = lit(min_val);
1361    let max_expr = lit(max_val);
1362    let nb = num_bucket as f64;
1363    let width = (max_val - min_val) / nb;
1364    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1365    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1366    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1367    let expr = polars::prelude::when(v.clone().lt(min_expr))
1368        .then(lit(0i64))
1369        .when(v.gt_eq(max_expr))
1370        .then(lit(num_bucket + 1))
1371        .otherwise(bucket_clamped);
1372    crate::column::Column::from_expr(expr, None)
1373}
1374
1375/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1376pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1377    use polars::prelude::*;
1378    if columns.is_empty() {
1379        panic!("elt requires at least one column");
1380    }
1381    let idx_expr = index.expr().clone();
1382    let null_expr = Expr::Literal(LiteralValue::Null);
1383    let mut expr = null_expr;
1384    for (i, c) in columns.iter().enumerate().rev() {
1385        let n = (i + 1) as i64;
1386        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1387            .then(c.expr().clone())
1388            .otherwise(expr);
1389    }
1390    crate::column::Column::from_expr(expr, None)
1391}
1392
1393/// Bit length of string (bytes * 8) (PySpark bit_length).
1394pub fn bit_length(column: &Column) -> Column {
1395    column.clone().bit_length()
1396}
1397
1398/// Length of string in bytes (PySpark octet_length).
1399pub fn octet_length(column: &Column) -> Column {
1400    column.clone().octet_length()
1401}
1402
1403/// Length of string in characters (PySpark char_length). Alias of length().
1404pub fn char_length(column: &Column) -> Column {
1405    column.clone().char_length()
1406}
1407
1408/// Length of string in characters (PySpark character_length). Alias of length().
1409pub fn character_length(column: &Column) -> Column {
1410    column.clone().character_length()
1411}
1412
1413/// Data type of column as string (PySpark typeof). Constant per column from schema.
1414pub fn typeof_(column: &Column) -> Column {
1415    column.clone().typeof_()
1416}
1417
1418/// True where the float value is NaN (PySpark isnan).
1419pub fn isnan(column: &Column) -> Column {
1420    column.clone().is_nan()
1421}
1422
1423/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1424pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1425    if columns.is_empty() {
1426        return Err("greatest requires at least one column".to_string());
1427    }
1428    if columns.len() == 1 {
1429        return Ok((*columns[0]).clone());
1430    }
1431    let mut expr = columns[0].expr().clone();
1432    for c in columns.iter().skip(1) {
1433        let args = [c.expr().clone()];
1434        expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1435    }
1436    Ok(Column::from_expr(expr, None))
1437}
1438
1439/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1440pub fn least(columns: &[&Column]) -> Result<Column, String> {
1441    if columns.is_empty() {
1442        return Err("least requires at least one column".to_string());
1443    }
1444    if columns.len() == 1 {
1445        return Ok((*columns[0]).clone());
1446    }
1447    let mut expr = columns[0].expr().clone();
1448    for c in columns.iter().skip(1) {
1449        let args = [c.expr().clone()];
1450        expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1451    }
1452    Ok(Column::from_expr(expr, None))
1453}
1454
1455/// Extract year from datetime column (PySpark year)
1456pub fn year(column: &Column) -> Column {
1457    column.clone().year()
1458}
1459
1460/// Extract month from datetime column (PySpark month)
1461pub fn month(column: &Column) -> Column {
1462    column.clone().month()
1463}
1464
1465/// Extract day of month from datetime column (PySpark day)
1466pub fn day(column: &Column) -> Column {
1467    column.clone().day()
1468}
1469
1470/// Cast to date (PySpark to_date)
1471pub fn to_date(column: &Column) -> Column {
1472    column.clone().to_date()
1473}
1474
1475/// Format date/datetime as string (PySpark date_format). Uses chrono strftime format (e.g. "%Y-%m-%d").
1476pub fn date_format(column: &Column, format: &str) -> Column {
1477    column.clone().date_format(format)
1478}
1479
1480/// Current date (evaluation time). PySpark current_date.
1481pub fn current_date() -> Column {
1482    use polars::prelude::*;
1483    let today = chrono::Utc::now().date_naive();
1484    let days = (today - chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()).num_days() as i32;
1485    crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1486}
1487
1488/// Current timestamp (evaluation time). PySpark current_timestamp.
1489pub fn current_timestamp() -> Column {
1490    use polars::prelude::*;
1491    let ts = chrono::Utc::now().timestamp_micros();
1492    crate::column::Column::from_expr(
1493        Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1494        None,
1495    )
1496}
1497
1498/// Alias for current_date (PySpark curdate).
1499pub fn curdate() -> Column {
1500    current_date()
1501}
1502
1503/// Alias for current_timestamp (PySpark now).
1504pub fn now() -> Column {
1505    current_timestamp()
1506}
1507
1508/// Alias for current_timestamp (PySpark localtimestamp).
1509pub fn localtimestamp() -> Column {
1510    current_timestamp()
1511}
1512
1513/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1514pub fn date_diff(end: &Column, start: &Column) -> Column {
1515    datediff(end, start)
1516}
1517
1518/// Alias for date_add (PySpark dateadd).
1519pub fn dateadd(column: &Column, n: i32) -> Column {
1520    date_add(column, n)
1521}
1522
1523/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1524pub fn extract(column: &Column, field: &str) -> Column {
1525    column.clone().extract(field)
1526}
1527
1528/// Alias for extract (PySpark date_part).
1529pub fn date_part(column: &Column, field: &str) -> Column {
1530    extract(column, field)
1531}
1532
1533/// Alias for extract (PySpark datepart).
1534pub fn datepart(column: &Column, field: &str) -> Column {
1535    extract(column, field)
1536}
1537
1538/// Timestamp to microseconds since epoch (PySpark unix_micros).
1539pub fn unix_micros(column: &Column) -> Column {
1540    column.clone().unix_micros()
1541}
1542
1543/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1544pub fn unix_millis(column: &Column) -> Column {
1545    column.clone().unix_millis()
1546}
1547
1548/// Timestamp to seconds since epoch (PySpark unix_seconds).
1549pub fn unix_seconds(column: &Column) -> Column {
1550    column.clone().unix_seconds()
1551}
1552
1553/// Weekday name "Mon","Tue",... (PySpark dayname).
1554pub fn dayname(column: &Column) -> Column {
1555    column.clone().dayname()
1556}
1557
1558/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1559pub fn weekday(column: &Column) -> Column {
1560    column.clone().weekday()
1561}
1562
1563/// Extract hour from datetime column (PySpark hour).
1564pub fn hour(column: &Column) -> Column {
1565    column.clone().hour()
1566}
1567
1568/// Extract minute from datetime column (PySpark minute).
1569pub fn minute(column: &Column) -> Column {
1570    column.clone().minute()
1571}
1572
1573/// Extract second from datetime column (PySpark second).
1574pub fn second(column: &Column) -> Column {
1575    column.clone().second()
1576}
1577
1578/// Add n days to date column (PySpark date_add).
1579pub fn date_add(column: &Column, n: i32) -> Column {
1580    column.clone().date_add(n)
1581}
1582
1583/// Subtract n days from date column (PySpark date_sub).
1584pub fn date_sub(column: &Column, n: i32) -> Column {
1585    column.clone().date_sub(n)
1586}
1587
1588/// Number of days between two date columns (PySpark datediff).
1589pub fn datediff(end: &Column, start: &Column) -> Column {
1590    start.clone().datediff(end)
1591}
1592
1593/// Last day of month for date column (PySpark last_day).
1594pub fn last_day(column: &Column) -> Column {
1595    column.clone().last_day()
1596}
1597
1598/// Truncate date/datetime to unit (PySpark trunc).
1599pub fn trunc(column: &Column, format: &str) -> Column {
1600    column.clone().trunc(format)
1601}
1602
1603/// Alias for trunc (PySpark date_trunc).
1604pub fn date_trunc(format: &str, column: &Column) -> Column {
1605    trunc(column, format)
1606}
1607
1608/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1609pub fn quarter(column: &Column) -> Column {
1610    column.clone().quarter()
1611}
1612
1613/// Extract ISO week of year (1-53) (PySpark weekofyear).
1614pub fn weekofyear(column: &Column) -> Column {
1615    column.clone().weekofyear()
1616}
1617
1618/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1619pub fn dayofweek(column: &Column) -> Column {
1620    column.clone().dayofweek()
1621}
1622
1623/// Extract day of year (1-366) (PySpark dayofyear).
1624pub fn dayofyear(column: &Column) -> Column {
1625    column.clone().dayofyear()
1626}
1627
1628/// Add n months to date column (PySpark add_months).
1629pub fn add_months(column: &Column, n: i32) -> Column {
1630    column.clone().add_months(n)
1631}
1632
1633/// Months between end and start dates as fractional (PySpark months_between).
1634/// When round_off is true, rounds to 8 decimal places (PySpark default).
1635pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1636    end.clone().months_between(start, round_off)
1637}
1638
1639/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
1640pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1641    column.clone().next_day(day_of_week)
1642}
1643
1644/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
1645pub fn unix_timestamp_now() -> Column {
1646    use polars::prelude::*;
1647    let secs = chrono::Utc::now().timestamp();
1648    crate::column::Column::from_expr(lit(secs), None)
1649}
1650
1651/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
1652pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1653    column.clone().unix_timestamp(format)
1654}
1655
1656/// Alias for unix_timestamp.
1657pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1658    unix_timestamp(column, format)
1659}
1660
1661/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1662pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1663    column.clone().from_unixtime(format)
1664}
1665
1666/// Build date from year, month, day columns (PySpark make_date).
1667pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1668    use polars::prelude::*;
1669    let args = [month.expr().clone(), day.expr().clone()];
1670    let expr = year.expr().clone().map_many(
1671        crate::udfs::apply_make_date,
1672        &args,
1673        GetOutput::from_type(DataType::Date),
1674    );
1675    crate::column::Column::from_expr(expr, None)
1676}
1677
1678/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
1679/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
1680pub fn make_timestamp(
1681    year: &Column,
1682    month: &Column,
1683    day: &Column,
1684    hour: &Column,
1685    minute: &Column,
1686    sec: &Column,
1687    timezone: Option<&str>,
1688) -> Column {
1689    use polars::prelude::*;
1690    let tz_owned = timezone.map(|s| s.to_string());
1691    let args = [
1692        month.expr().clone(),
1693        day.expr().clone(),
1694        hour.expr().clone(),
1695        minute.expr().clone(),
1696        sec.expr().clone(),
1697    ];
1698    let expr = year.expr().clone().map_many(
1699        move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1700        &args,
1701        GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1702    );
1703    crate::column::Column::from_expr(expr, None)
1704}
1705
1706/// Add amount of unit to timestamp (PySpark timestampadd).
1707pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1708    ts.clone().timestampadd(unit, amount)
1709}
1710
1711/// Difference between timestamps in unit (PySpark timestampdiff).
1712pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1713    start.clone().timestampdiff(unit, end)
1714}
1715
1716/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
1717pub fn days(n: i64) -> Column {
1718    make_interval(0, 0, 0, n, 0, 0, 0)
1719}
1720
1721/// Interval of n hours (PySpark hours).
1722pub fn hours(n: i64) -> Column {
1723    make_interval(0, 0, 0, 0, n, 0, 0)
1724}
1725
1726/// Interval of n minutes (PySpark minutes).
1727pub fn minutes(n: i64) -> Column {
1728    make_interval(0, 0, 0, 0, 0, n, 0)
1729}
1730
1731/// Interval of n months (PySpark months). Approximated as 30*n days.
1732pub fn months(n: i64) -> Column {
1733    make_interval(0, n, 0, 0, 0, 0, 0)
1734}
1735
1736/// Interval of n years (PySpark years). Approximated as 365*n days.
1737pub fn years(n: i64) -> Column {
1738    make_interval(n, 0, 0, 0, 0, 0, 0)
1739}
1740
1741/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
1742pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1743    column.clone().from_utc_timestamp(tz)
1744}
1745
1746/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1747pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1748    column.clone().to_utc_timestamp(tz)
1749}
1750
1751/// Convert timestamp between timezones (PySpark convert_timezone).
1752pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1753    let source_tz = source_tz.to_string();
1754    let target_tz = target_tz.to_string();
1755    let expr = column.expr().clone().map(
1756        move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
1757        GetOutput::same_type(),
1758    );
1759    crate::column::Column::from_expr(expr, None)
1760}
1761
1762/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
1763pub fn current_timezone() -> Column {
1764    use polars::prelude::*;
1765    crate::column::Column::from_expr(lit("UTC"), None)
1766}
1767
1768/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
1769pub fn make_interval(
1770    years: i64,
1771    months: i64,
1772    weeks: i64,
1773    days: i64,
1774    hours: i64,
1775    mins: i64,
1776    secs: i64,
1777) -> Column {
1778    use polars::prelude::*;
1779    // Approximate: 1 year = 365 days, 1 month = 30 days
1780    let total_days = years * 365 + months * 30 + weeks * 7 + days;
1781    let args = DurationArgs::new()
1782        .with_days(lit(total_days))
1783        .with_hours(lit(hours))
1784        .with_minutes(lit(mins))
1785        .with_seconds(lit(secs));
1786    let dur = duration(args);
1787    crate::column::Column::from_expr(dur, None)
1788}
1789
1790/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
1791pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
1792    use polars::prelude::*;
1793    let args = DurationArgs::new()
1794        .with_days(lit(days))
1795        .with_hours(lit(hours))
1796        .with_minutes(lit(minutes))
1797        .with_seconds(lit(seconds));
1798    let dur = duration(args);
1799    crate::column::Column::from_expr(dur, None)
1800}
1801
1802/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
1803pub fn make_ym_interval(years: i32, months: i32) -> Column {
1804    use polars::prelude::*;
1805    let total_months = years * 12 + months;
1806    crate::column::Column::from_expr(lit(total_months), None)
1807}
1808
1809/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
1810pub fn make_timestamp_ntz(
1811    year: &Column,
1812    month: &Column,
1813    day: &Column,
1814    hour: &Column,
1815    minute: &Column,
1816    sec: &Column,
1817) -> Column {
1818    make_timestamp(year, month, day, hour, minute, sec, None)
1819}
1820
1821/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
1822pub fn timestamp_seconds(column: &Column) -> Column {
1823    column.clone().timestamp_seconds()
1824}
1825
1826/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
1827pub fn timestamp_millis(column: &Column) -> Column {
1828    column.clone().timestamp_millis()
1829}
1830
1831/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
1832pub fn timestamp_micros(column: &Column) -> Column {
1833    column.clone().timestamp_micros()
1834}
1835
1836/// Date to days since 1970-01-01 (PySpark unix_date).
1837pub fn unix_date(column: &Column) -> Column {
1838    column.clone().unix_date()
1839}
1840
1841/// Days since epoch to date (PySpark date_from_unix_date).
1842pub fn date_from_unix_date(column: &Column) -> Column {
1843    column.clone().date_from_unix_date()
1844}
1845
1846/// Positive modulus (PySpark pmod).
1847pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
1848    dividend.clone().pmod(divisor)
1849}
1850
1851/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
1852pub fn factorial(column: &Column) -> Column {
1853    column.clone().factorial()
1854}
1855
1856/// Concatenate string columns without separator (PySpark concat)
1857pub fn concat(columns: &[&Column]) -> Column {
1858    use polars::prelude::*;
1859    if columns.is_empty() {
1860        panic!("concat requires at least one column");
1861    }
1862    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1863    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
1864}
1865
1866/// Concatenate string columns with separator (PySpark concat_ws)
1867pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
1868    use polars::prelude::*;
1869    if columns.is_empty() {
1870        panic!("concat_ws requires at least one column");
1871    }
1872    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1873    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
1874}
1875
1876/// Row number window function (1, 2, 3 by order within partition).
1877/// Use with `.over(partition_by)` after ranking by an order column.
1878///
1879/// # Example
1880/// ```
1881/// use robin_sparkless::{col, Column};
1882/// let salary_col = col("salary");
1883/// let rn = salary_col.row_number(true).over(&["dept"]);
1884/// ```
1885pub fn row_number(column: &Column) -> Column {
1886    column.clone().row_number(false)
1887}
1888
1889/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
1890pub fn rank(column: &Column, descending: bool) -> Column {
1891    column.clone().rank(descending)
1892}
1893
1894/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
1895pub fn dense_rank(column: &Column, descending: bool) -> Column {
1896    column.clone().dense_rank(descending)
1897}
1898
1899/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
1900pub fn lag(column: &Column, n: i64) -> Column {
1901    column.clone().lag(n)
1902}
1903
1904/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
1905pub fn lead(column: &Column, n: i64) -> Column {
1906    column.clone().lead(n)
1907}
1908
1909/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
1910pub fn first_value(column: &Column) -> Column {
1911    column.clone().first_value()
1912}
1913
1914/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
1915pub fn last_value(column: &Column) -> Column {
1916    column.clone().last_value()
1917}
1918
1919/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
1920pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
1921    column.clone().percent_rank(partition_by, descending)
1922}
1923
1924/// Cumulative distribution in partition: row_number / count. Window is applied.
1925pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
1926    column.clone().cume_dist(partition_by, descending)
1927}
1928
1929/// Ntile: bucket 1..n by rank within partition. Window is applied.
1930pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
1931    column.clone().ntile(n, partition_by, descending)
1932}
1933
1934/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
1935pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
1936    column.clone().nth_value(n, partition_by, descending)
1937}
1938
1939/// Coalesce - returns the first non-null value from multiple columns.
1940///
1941/// # Example
1942/// ```
1943/// use robin_sparkless::{col, lit_i64, coalesce};
1944///
1945/// // coalesce(col("a"), col("b"), lit(0))
1946/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
1947/// ```
1948pub fn coalesce(columns: &[&Column]) -> Column {
1949    use polars::prelude::*;
1950    if columns.is_empty() {
1951        panic!("coalesce requires at least one column");
1952    }
1953    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
1954    let expr = coalesce(&exprs);
1955    crate::column::Column::from_expr(expr, None)
1956}
1957
1958/// Alias for coalesce(col, value). PySpark nvl / ifnull.
1959pub fn nvl(column: &Column, value: &Column) -> Column {
1960    coalesce(&[column, value])
1961}
1962
1963/// Alias for nvl. PySpark ifnull.
1964pub fn ifnull(column: &Column, value: &Column) -> Column {
1965    nvl(column, value)
1966}
1967
1968/// Return null if column equals value, else column. PySpark nullif.
1969pub fn nullif(column: &Column, value: &Column) -> Column {
1970    use polars::prelude::*;
1971    let cond = column.expr().clone().eq(value.expr().clone());
1972    let null_lit = Expr::Literal(LiteralValue::Null);
1973    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
1974    crate::column::Column::from_expr(expr, None)
1975}
1976
1977/// Replace NaN with value. PySpark nanvl.
1978pub fn nanvl(column: &Column, value: &Column) -> Column {
1979    use polars::prelude::*;
1980    let cond = column.expr().clone().is_nan();
1981    let expr = when(cond)
1982        .then(value.expr().clone())
1983        .otherwise(column.expr().clone());
1984    crate::column::Column::from_expr(expr, None)
1985}
1986
1987/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
1988pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
1989    use polars::prelude::*;
1990    let cond = col1.expr().clone().is_not_null();
1991    let expr = when(cond)
1992        .then(col2.expr().clone())
1993        .otherwise(col3.expr().clone());
1994    crate::column::Column::from_expr(expr, None)
1995}
1996
1997/// Alias for substring. PySpark substr.
1998pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
1999    substring(column, start, length)
2000}
2001
2002/// Alias for pow. PySpark power.
2003pub fn power(column: &Column, exp: i64) -> Column {
2004    pow(column, exp)
2005}
2006
2007/// Alias for log (natural log). PySpark ln.
2008pub fn ln(column: &Column) -> Column {
2009    log(column)
2010}
2011
2012/// Alias for ceil. PySpark ceiling.
2013pub fn ceiling(column: &Column) -> Column {
2014    ceil(column)
2015}
2016
2017/// Alias for lower. PySpark lcase.
2018pub fn lcase(column: &Column) -> Column {
2019    lower(column)
2020}
2021
2022/// Alias for upper. PySpark ucase.
2023pub fn ucase(column: &Column) -> Column {
2024    upper(column)
2025}
2026
2027/// Alias for day. PySpark dayofmonth.
2028pub fn dayofmonth(column: &Column) -> Column {
2029    day(column)
2030}
2031
2032/// Alias for degrees. PySpark toDegrees.
2033pub fn to_degrees(column: &Column) -> Column {
2034    degrees(column)
2035}
2036
2037/// Alias for radians. PySpark toRadians.
2038pub fn to_radians(column: &Column) -> Column {
2039    radians(column)
2040}
2041
2042/// Hyperbolic cosine (PySpark cosh).
2043pub fn cosh(column: &Column) -> Column {
2044    column.clone().cosh()
2045}
2046/// Hyperbolic sine (PySpark sinh).
2047pub fn sinh(column: &Column) -> Column {
2048    column.clone().sinh()
2049}
2050/// Hyperbolic tangent (PySpark tanh).
2051pub fn tanh(column: &Column) -> Column {
2052    column.clone().tanh()
2053}
2054/// Inverse hyperbolic cosine (PySpark acosh).
2055pub fn acosh(column: &Column) -> Column {
2056    column.clone().acosh()
2057}
2058/// Inverse hyperbolic sine (PySpark asinh).
2059pub fn asinh(column: &Column) -> Column {
2060    column.clone().asinh()
2061}
2062/// Inverse hyperbolic tangent (PySpark atanh).
2063pub fn atanh(column: &Column) -> Column {
2064    column.clone().atanh()
2065}
2066/// Cube root (PySpark cbrt).
2067pub fn cbrt(column: &Column) -> Column {
2068    column.clone().cbrt()
2069}
2070/// exp(x) - 1 (PySpark expm1).
2071pub fn expm1(column: &Column) -> Column {
2072    column.clone().expm1()
2073}
2074/// log(1 + x) (PySpark log1p).
2075pub fn log1p(column: &Column) -> Column {
2076    column.clone().log1p()
2077}
2078/// Base-10 log (PySpark log10).
2079pub fn log10(column: &Column) -> Column {
2080    column.clone().log10()
2081}
2082/// Base-2 log (PySpark log2).
2083pub fn log2(column: &Column) -> Column {
2084    column.clone().log2()
2085}
2086/// Round to nearest integer (PySpark rint).
2087pub fn rint(column: &Column) -> Column {
2088    column.clone().rint()
2089}
2090/// sqrt(x*x + y*y) (PySpark hypot).
2091pub fn hypot(x: &Column, y: &Column) -> Column {
2092    let xx = x.expr().clone() * x.expr().clone();
2093    let yy = y.expr().clone() * y.expr().clone();
2094    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2095}
2096
2097/// True if column is null. PySpark isnull.
2098pub fn isnull(column: &Column) -> Column {
2099    column.clone().is_null()
2100}
2101
2102/// True if column is not null. PySpark isnotnull.
2103pub fn isnotnull(column: &Column) -> Column {
2104    column.clone().is_not_null()
2105}
2106
2107/// Create an array column from multiple columns (PySpark array).
2108pub fn array(columns: &[&Column]) -> crate::column::Column {
2109    use polars::prelude::*;
2110    if columns.is_empty() {
2111        panic!("array requires at least one column");
2112    }
2113    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2114    let expr = concat_list(exprs).expect("concat_list");
2115    crate::column::Column::from_expr(expr, None)
2116}
2117
2118/// Number of elements in list (PySpark size / array_size). Returns Int32.
2119pub fn array_size(column: &Column) -> Column {
2120    column.clone().array_size()
2121}
2122
2123/// Alias for array_size (PySpark size).
2124pub fn size(column: &Column) -> Column {
2125    column.clone().array_size()
2126}
2127
2128/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2129pub fn cardinality(column: &Column) -> Column {
2130    column.clone().cardinality()
2131}
2132
2133/// Check if list contains value (PySpark array_contains).
2134pub fn array_contains(column: &Column, value: &Column) -> Column {
2135    column.clone().array_contains(value.expr().clone())
2136}
2137
2138/// Join list of strings with separator (PySpark array_join).
2139pub fn array_join(column: &Column, separator: &str) -> Column {
2140    column.clone().array_join(separator)
2141}
2142
2143/// Maximum element in list (PySpark array_max).
2144pub fn array_max(column: &Column) -> Column {
2145    column.clone().array_max()
2146}
2147
2148/// Minimum element in list (PySpark array_min).
2149pub fn array_min(column: &Column) -> Column {
2150    column.clone().array_min()
2151}
2152
2153/// Get element at 1-based index (PySpark element_at).
2154pub fn element_at(column: &Column, index: i64) -> Column {
2155    column.clone().element_at(index)
2156}
2157
2158/// Sort list elements (PySpark array_sort).
2159pub fn array_sort(column: &Column) -> Column {
2160    column.clone().array_sort()
2161}
2162
2163/// Distinct elements in list (PySpark array_distinct).
2164pub fn array_distinct(column: &Column) -> Column {
2165    column.clone().array_distinct()
2166}
2167
2168/// Slice list from 1-based start with optional length (PySpark slice).
2169pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2170    column.clone().array_slice(start, length)
2171}
2172
2173/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2174/// step defaults to 1.
2175pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2176    use polars::prelude::{as_struct, lit, DataType, GetOutput};
2177    let step_expr = step
2178        .map(|c| c.expr().clone().alias("2"))
2179        .unwrap_or_else(|| lit(1i64).alias("2"));
2180    let struct_expr = as_struct(vec![
2181        start.expr().clone().alias("0"),
2182        stop.expr().clone().alias("1"),
2183        step_expr,
2184    ]);
2185    let out_dtype = DataType::List(Box::new(DataType::Int64));
2186    let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2187    crate::column::Column::from_expr(expr, None)
2188}
2189
2190/// Random permutation of list elements (PySpark shuffle).
2191pub fn shuffle(column: &Column) -> Column {
2192    use polars::prelude::GetOutput;
2193    let expr = column
2194        .expr()
2195        .clone()
2196        .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2197    crate::column::Column::from_expr(expr, None)
2198}
2199
2200/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2201/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2202pub fn inline(column: &Column) -> Column {
2203    column.clone().explode()
2204}
2205
2206/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2207pub fn inline_outer(column: &Column) -> Column {
2208    column.clone().explode_outer()
2209}
2210
2211/// Explode list into one row per element (PySpark explode).
2212pub fn explode(column: &Column) -> Column {
2213    column.clone().explode()
2214}
2215
2216/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2217/// Implemented via Polars list.eval with col("") as element.
2218pub fn array_position(column: &Column, value: &Column) -> Column {
2219    column.clone().array_position(value.expr().clone())
2220}
2221
2222/// Remove null elements from list (PySpark array_compact).
2223pub fn array_compact(column: &Column) -> Column {
2224    column.clone().array_compact()
2225}
2226
2227/// New list with all elements equal to value removed (PySpark array_remove).
2228/// Implemented via Polars list.eval + list.drop_nulls.
2229pub fn array_remove(column: &Column, value: &Column) -> Column {
2230    column.clone().array_remove(value.expr().clone())
2231}
2232
2233/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2234pub fn array_repeat(column: &Column, n: i64) -> Column {
2235    column.clone().array_repeat(n)
2236}
2237
2238/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2239pub fn array_flatten(column: &Column) -> Column {
2240    column.clone().array_flatten()
2241}
2242
2243/// True if any list element satisfies the predicate (PySpark exists).
2244pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2245    column.clone().array_exists(predicate)
2246}
2247
2248/// True if all list elements satisfy the predicate (PySpark forall).
2249pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2250    column.clone().array_forall(predicate)
2251}
2252
2253/// Filter list elements by predicate (PySpark filter).
2254pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2255    column.clone().array_filter(predicate)
2256}
2257
2258/// Transform list elements by expression (PySpark transform).
2259pub fn array_transform(column: &Column, f: Expr) -> Column {
2260    column.clone().array_transform(f)
2261}
2262
2263/// Sum of list elements (PySpark aggregate sum).
2264pub fn array_sum(column: &Column) -> Column {
2265    column.clone().array_sum()
2266}
2267
2268/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2269pub fn aggregate(column: &Column, zero: &Column) -> Column {
2270    column.clone().array_aggregate(zero)
2271}
2272
2273/// Mean of list elements (PySpark aggregate avg).
2274pub fn array_mean(column: &Column) -> Column {
2275    column.clone().array_mean()
2276}
2277
2278/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2279/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2280pub fn posexplode(column: &Column) -> (Column, Column) {
2281    column.clone().posexplode()
2282}
2283
2284/// Build a map column from alternating key/value expressions (PySpark create_map).
2285/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2286pub fn create_map(key_values: &[&Column]) -> Column {
2287    use polars::prelude::{as_struct, concat_list};
2288    if key_values.is_empty() {
2289        panic!("create_map requires at least one key-value pair");
2290    }
2291    let mut struct_exprs: Vec<Expr> = Vec::new();
2292    for i in (0..key_values.len()).step_by(2) {
2293        if i + 1 < key_values.len() {
2294            let k = key_values[i].expr().clone().alias("key");
2295            let v = key_values[i + 1].expr().clone().alias("value");
2296            struct_exprs.push(as_struct(vec![k, v]));
2297        }
2298    }
2299    let expr = concat_list(struct_exprs).expect("create_map concat_list");
2300    crate::column::Column::from_expr(expr, None)
2301}
2302
2303/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2304pub fn map_keys(column: &Column) -> Column {
2305    column.clone().map_keys()
2306}
2307
2308/// Extract values from a map column (PySpark map_values).
2309pub fn map_values(column: &Column) -> Column {
2310    column.clone().map_values()
2311}
2312
2313/// Return map as list of structs {key, value} (PySpark map_entries).
2314pub fn map_entries(column: &Column) -> Column {
2315    column.clone().map_entries()
2316}
2317
2318/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2319pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2320    keys.clone().map_from_arrays(values)
2321}
2322
2323/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2324pub fn map_concat(a: &Column, b: &Column) -> Column {
2325    a.clone().map_concat(b)
2326}
2327
2328/// Array of structs {key, value} to map (PySpark map_from_entries).
2329pub fn map_from_entries(column: &Column) -> Column {
2330    column.clone().map_from_entries()
2331}
2332
2333/// True if map contains key (PySpark map_contains_key).
2334pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2335    map_col.clone().map_contains_key(key)
2336}
2337
2338/// Get value for key from map, or null (PySpark get).
2339pub fn get(map_col: &Column, key: &Column) -> Column {
2340    map_col.clone().get(key)
2341}
2342
2343/// Filter map entries by predicate (PySpark map_filter).
2344pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2345    map_col.clone().map_filter(predicate)
2346}
2347
2348/// Merge two maps by key with merge function (PySpark map_zip_with).
2349pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2350    map1.clone().map_zip_with(map2, merge)
2351}
2352
2353/// Convenience: zip_with with coalesce(left, right) merge.
2354pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2355    use polars::prelude::col;
2356    let left_field = col("").struct_().field_by_name("left");
2357    let right_field = col("").struct_().field_by_name("right");
2358    let merge = crate::column::Column::from_expr(
2359        coalesce(&[
2360            &crate::column::Column::from_expr(left_field, None),
2361            &crate::column::Column::from_expr(right_field, None),
2362        ])
2363        .into_expr(),
2364        None,
2365    );
2366    left.clone().zip_with(right, merge.into_expr())
2367}
2368
2369/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2370pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2371    use polars::prelude::col;
2372    let v1 = col("").struct_().field_by_name("value1");
2373    let v2 = col("").struct_().field_by_name("value2");
2374    let merge = coalesce(&[
2375        &crate::column::Column::from_expr(v1, None),
2376        &crate::column::Column::from_expr(v2, None),
2377    ])
2378    .into_expr();
2379    map1.clone().map_zip_with(map2, merge)
2380}
2381
2382/// Convenience: map_filter with value > threshold predicate.
2383pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2384    use polars::prelude::{col, lit};
2385    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2386    map_col.clone().map_filter(pred)
2387}
2388
2389/// Create struct from columns using column names as field names (PySpark struct).
2390pub fn struct_(columns: &[&Column]) -> Column {
2391    use polars::prelude::as_struct;
2392    if columns.is_empty() {
2393        panic!("struct requires at least one column");
2394    }
2395    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2396    crate::column::Column::from_expr(as_struct(exprs), None)
2397}
2398
2399/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2400pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2401    use polars::prelude::as_struct;
2402    if pairs.is_empty() {
2403        panic!("named_struct requires at least one (name, column) pair");
2404    }
2405    let exprs: Vec<Expr> = pairs
2406        .iter()
2407        .map(|(name, col)| col.expr().clone().alias(*name))
2408        .collect();
2409    crate::column::Column::from_expr(as_struct(exprs), None)
2410}
2411
2412/// Append element to end of list (PySpark array_append).
2413pub fn array_append(array: &Column, elem: &Column) -> Column {
2414    array.clone().array_append(elem)
2415}
2416
2417/// Prepend element to start of list (PySpark array_prepend).
2418pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2419    array.clone().array_prepend(elem)
2420}
2421
2422/// Insert element at 1-based position (PySpark array_insert).
2423pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2424    array.clone().array_insert(pos, elem)
2425}
2426
2427/// Elements in first array not in second (PySpark array_except).
2428pub fn array_except(a: &Column, b: &Column) -> Column {
2429    a.clone().array_except(b)
2430}
2431
2432/// Elements in both arrays (PySpark array_intersect).
2433pub fn array_intersect(a: &Column, b: &Column) -> Column {
2434    a.clone().array_intersect(b)
2435}
2436
2437/// Distinct elements from both arrays (PySpark array_union).
2438pub fn array_union(a: &Column, b: &Column) -> Column {
2439    a.clone().array_union(b)
2440}
2441
2442/// Zip two arrays element-wise with merge function (PySpark zip_with).
2443pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2444    left.clone().zip_with(right, merge)
2445}
2446
2447/// Extract JSON path from string column (PySpark get_json_object).
2448pub fn get_json_object(column: &Column, path: &str) -> Column {
2449    column.clone().get_json_object(path)
2450}
2451
2452/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2453pub fn json_object_keys(column: &Column) -> Column {
2454    column.clone().json_object_keys()
2455}
2456
2457/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2458pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2459    column.clone().json_tuple(keys)
2460}
2461
2462/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2463pub fn from_csv(column: &Column) -> Column {
2464    column.clone().from_csv()
2465}
2466
2467/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2468pub fn to_csv(column: &Column) -> Column {
2469    column.clone().to_csv()
2470}
2471
2472/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2473pub fn schema_of_csv(_column: &Column) -> Column {
2474    Column::from_expr(
2475        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2476        Some("schema_of_csv".to_string()),
2477    )
2478}
2479
2480/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2481pub fn schema_of_json(_column: &Column) -> Column {
2482    Column::from_expr(
2483        lit("STRUCT<>".to_string()),
2484        Some("schema_of_json".to_string()),
2485    )
2486}
2487
2488/// Parse string column as JSON into struct (PySpark from_json).
2489pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2490    column.clone().from_json(schema)
2491}
2492
2493/// Serialize struct column to JSON string (PySpark to_json).
2494pub fn to_json(column: &Column) -> Column {
2495    column.clone().to_json()
2496}
2497
2498/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2499pub fn isin(column: &Column, other: &Column) -> Column {
2500    column.clone().isin(other)
2501}
2502
2503/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2504pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2505    let s = Series::from_iter(values.iter().cloned());
2506    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2507}
2508
2509/// Check if column values are in the given string slice (PySpark isin with literal list).
2510pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2511    let s: Series = Series::from_iter(values.iter().copied());
2512    Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2513}
2514
2515/// Percent-decode URL-encoded string (PySpark url_decode).
2516pub fn url_decode(column: &Column) -> Column {
2517    column.clone().url_decode()
2518}
2519
2520/// Percent-encode string for URL (PySpark url_encode).
2521pub fn url_encode(column: &Column) -> Column {
2522    column.clone().url_encode()
2523}
2524
2525/// Bitwise left shift (PySpark shiftLeft). col << n.
2526pub fn shift_left(column: &Column, n: i32) -> Column {
2527    column.clone().shift_left(n)
2528}
2529
2530/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2531pub fn shift_right(column: &Column, n: i32) -> Column {
2532    column.clone().shift_right(n)
2533}
2534
2535/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2536pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2537    column.clone().shift_right_unsigned(n)
2538}
2539
2540/// Session/library version string (PySpark version).
2541pub fn version() -> Column {
2542    Column::from_expr(lit("robin-sparkless-0.1.1"), None)
2543}
2544
2545/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2546pub fn equal_null(left: &Column, right: &Column) -> Column {
2547    left.clone().eq_null_safe(right)
2548}
2549
2550/// Length of JSON array at path (PySpark json_array_length).
2551pub fn json_array_length(column: &Column, path: &str) -> Column {
2552    column.clone().json_array_length(path)
2553}
2554
2555/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2556/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2557pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2558    column.clone().parse_url(part, key)
2559}
2560
2561/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2562pub fn hash(columns: &[&Column]) -> Column {
2563    use polars::prelude::*;
2564    if columns.is_empty() {
2565        return crate::column::Column::from_expr(lit(0i64), None);
2566    }
2567    if columns.len() == 1 {
2568        return columns[0].clone().hash();
2569    }
2570    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2571    let struct_expr = polars::prelude::as_struct(exprs);
2572    let name = columns[0].name().to_string();
2573    let expr = struct_expr.map(
2574        crate::udfs::apply_hash_struct,
2575        GetOutput::from_type(DataType::Int64),
2576    );
2577    crate::column::Column::from_expr(expr, Some(name))
2578}
2579
2580/// Stack columns into struct (PySpark stack). Alias for struct_.
2581pub fn stack(columns: &[&Column]) -> Column {
2582    struct_(columns)
2583}
2584
2585#[cfg(test)]
2586mod tests {
2587    use super::*;
2588    use polars::prelude::{df, IntoLazy};
2589
2590    #[test]
2591    fn test_col_creates_column() {
2592        let column = col("test");
2593        assert_eq!(column.name(), "test");
2594    }
2595
2596    #[test]
2597    fn test_lit_i32() {
2598        let column = lit_i32(42);
2599        // The column should have a default name since it's a literal
2600        assert_eq!(column.name(), "<expr>");
2601    }
2602
2603    #[test]
2604    fn test_lit_i64() {
2605        let column = lit_i64(123456789012345i64);
2606        assert_eq!(column.name(), "<expr>");
2607    }
2608
2609    #[test]
2610    fn test_lit_f64() {
2611        let column = lit_f64(std::f64::consts::PI);
2612        assert_eq!(column.name(), "<expr>");
2613    }
2614
2615    #[test]
2616    fn test_lit_bool() {
2617        let column = lit_bool(true);
2618        assert_eq!(column.name(), "<expr>");
2619    }
2620
2621    #[test]
2622    fn test_lit_str() {
2623        let column = lit_str("hello");
2624        assert_eq!(column.name(), "<expr>");
2625    }
2626
2627    #[test]
2628    fn test_count_aggregation() {
2629        let column = col("value");
2630        let result = count(&column);
2631        assert_eq!(result.name(), "count");
2632    }
2633
2634    #[test]
2635    fn test_sum_aggregation() {
2636        let column = col("value");
2637        let result = sum(&column);
2638        assert_eq!(result.name(), "sum");
2639    }
2640
2641    #[test]
2642    fn test_avg_aggregation() {
2643        let column = col("value");
2644        let result = avg(&column);
2645        assert_eq!(result.name(), "avg");
2646    }
2647
2648    #[test]
2649    fn test_max_aggregation() {
2650        let column = col("value");
2651        let result = max(&column);
2652        assert_eq!(result.name(), "max");
2653    }
2654
2655    #[test]
2656    fn test_min_aggregation() {
2657        let column = col("value");
2658        let result = min(&column);
2659        assert_eq!(result.name(), "min");
2660    }
2661
2662    #[test]
2663    fn test_when_then_otherwise() {
2664        // Create a simple DataFrame
2665        let df = df!(
2666            "age" => &[15, 25, 35]
2667        )
2668        .unwrap();
2669
2670        // Build a when-then-otherwise expression
2671        let age_col = col("age");
2672        let condition = age_col.gt(polars::prelude::lit(18));
2673        let result = when(&condition)
2674            .then(&lit_str("adult"))
2675            .otherwise(&lit_str("minor"));
2676
2677        // Apply the expression
2678        let result_df = df
2679            .lazy()
2680            .with_column(result.into_expr().alias("status"))
2681            .collect()
2682            .unwrap();
2683
2684        // Verify the result
2685        let status_col = result_df.column("status").unwrap();
2686        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2687
2688        assert_eq!(values[0], Some("minor")); // age 15 < 18
2689        assert_eq!(values[1], Some("adult")); // age 25 > 18
2690        assert_eq!(values[2], Some("adult")); // age 35 > 18
2691    }
2692
2693    #[test]
2694    fn test_coalesce_returns_first_non_null() {
2695        // Create a DataFrame with some nulls
2696        let df = df!(
2697            "a" => &[Some(1), None, None],
2698            "b" => &[None, Some(2), None],
2699            "c" => &[None, None, Some(3)]
2700        )
2701        .unwrap();
2702
2703        let col_a = col("a");
2704        let col_b = col("b");
2705        let col_c = col("c");
2706        let result = coalesce(&[&col_a, &col_b, &col_c]);
2707
2708        // Apply the expression
2709        let result_df = df
2710            .lazy()
2711            .with_column(result.into_expr().alias("coalesced"))
2712            .collect()
2713            .unwrap();
2714
2715        // Verify the result
2716        let coalesced_col = result_df.column("coalesced").unwrap();
2717        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2718
2719        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2720        assert_eq!(values[1], Some(2)); // First non-null is 'b'
2721        assert_eq!(values[2], Some(3)); // First non-null is 'c'
2722    }
2723
2724    #[test]
2725    fn test_coalesce_with_literal_fallback() {
2726        // Create a DataFrame with all nulls in one row
2727        let df = df!(
2728            "a" => &[Some(1), None],
2729            "b" => &[None::<i32>, None::<i32>]
2730        )
2731        .unwrap();
2732
2733        let col_a = col("a");
2734        let col_b = col("b");
2735        let fallback = lit_i32(0);
2736        let result = coalesce(&[&col_a, &col_b, &fallback]);
2737
2738        // Apply the expression
2739        let result_df = df
2740            .lazy()
2741            .with_column(result.into_expr().alias("coalesced"))
2742            .collect()
2743            .unwrap();
2744
2745        // Verify the result
2746        let coalesced_col = result_df.column("coalesced").unwrap();
2747        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2748
2749        assert_eq!(values[0], Some(1)); // First non-null is 'a'
2750        assert_eq!(values[1], Some(0)); // All nulls, use fallback
2751    }
2752
2753    #[test]
2754    #[should_panic(expected = "coalesce requires at least one column")]
2755    fn test_coalesce_empty_panics() {
2756        let columns: [&Column; 0] = [];
2757        let _ = coalesce(&columns);
2758    }
2759}