Skip to main content

robin_sparkless_polars/
column.rs

1use polars::prelude::{
2    DataType, Expr, Field, PolarsError, PolarsResult, RankMethod, RankOptions, SortOptions,
3    TimeUnit, WindowMapping, col, lit,
4};
5use polars_plan::dsl::AggExpr;
6use std::ops::Neg;
7
8/// Unwrap UDF result to Column (map() expects Result<Column>, UDFs return Result<Option<Column>>).
9#[inline]
10pub(crate) fn expect_col(
11    r: PolarsResult<Option<polars::prelude::Column>>,
12) -> PolarsResult<polars::prelude::Column> {
13    r.and_then(|o| o.ok_or_else(|| PolarsError::ComputeError("expected column".into())))
14}
15
16/// Convert SQL LIKE pattern (% = any sequence, _ = one char) to regex. Escapes regex specials.
17/// When escape_char is Some(esc), esc + any char treats that char as literal (no %/_ expansion).
18fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
19    let mut out = String::with_capacity(pattern.len() * 2);
20    let mut it = pattern.chars();
21    while let Some(c) = it.next() {
22        if escape_char == Some(c) {
23            if let Some(next) = it.next() {
24                // Literal: escape for regex
25                if "\\.*+?[](){}^$|".contains(next) {
26                    out.push('\\');
27                }
28                out.push(next);
29            } else {
30                out.push('\\');
31                out.push(c);
32            }
33        } else {
34            match c {
35                '%' => out.push_str(".*"),
36                '_' => out.push('.'),
37                '\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
38                | '|' => {
39                    out.push('\\');
40                    out.push(c);
41                }
42                _ => out.push(c),
43            }
44        }
45    }
46    format!("^{out}$")
47}
48
49/// Map PySpark date_trunc/trunc format (year, month, day, hour, etc.) to Polars duration string (1y, 1mo, 1d, 1h).
50/// Polars dt.truncate expects a leading integer; PySpark uses unit names without a number.
51fn pyspark_trunc_format_to_polars_duration(format: &str) -> String {
52    match format.to_lowercase().as_str() {
53        "year" | "years" => "1y".to_string(),
54        "month" | "months" => "1mo".to_string(),
55        "week" | "weeks" | "wk" => "1w".to_string(),
56        "day" | "days" => "1d".to_string(),
57        "hour" | "hours" => "1h".to_string(),
58        "minute" | "minutes" | "min" => "1m".to_string(),
59        "second" | "seconds" | "sec" => "1s".to_string(),
60        "quarter" | "quarters" | "q" => "1q".to_string(),
61        _ => format.to_string(), // already Polars-style (e.g. 1d) or pass through
62    }
63}
64
65/// Deferred random column: when added via with_column, we generate a full-length series in one go (PySpark-like).
66#[derive(Debug, Clone, Copy)]
67pub enum DeferredRandom {
68    Rand(Option<u64>),
69    Randn(Option<u64>),
70}
71
72/// Marker for order-sensitive first_value/last_value (PySpark semantics: first/last by window order).
73#[derive(Debug, Clone)]
74pub struct FirstLastValue {
75    /// The expression whose value we take (e.g. col("salary")).
76    pub value_expr: Expr,
77    /// false = first in order, true = last in order.
78    pub is_last: bool,
79}
80
81/// Column - represents a column in a DataFrame, used for building expressions
82/// Thin wrapper around Polars `Expr`. May carry a DeferredRandom for rand/randn so with_column can produce one value per row.
83/// May carry UdfCall for Python UDFs (eager execution at with_column).
84#[derive(Debug, Clone)]
85pub struct Column {
86    name: String,
87    expr: Expr, // Polars expression for lazy evaluation
88    /// True when this Column was constructed via array() so DataFrame-level logic
89    /// can apply PySpark-specific type strictness (e.g., mixed bool + non-bool arrays; issue #1115).
90    pub(crate) is_array_expr: bool,
91    /// When Some, with_column generates a full-length random series instead of using expr (PySpark-like per-row rand/randn).
92    pub deferred: Option<DeferredRandom>,
93    /// When Some, with_column executes Python UDF eagerly (name, arg columns).
94    pub udf_call: Option<(String, Vec<Column>)>,
95    /// When Some, this aggregate (e.g. sum) can use cum_sum for running window when orderBy differs from partitionBy.
96    pub source_for_running: Option<String>,
97    /// When Some, over_window uses running mean (cum_sum/cum_count) when orderBy is present (#1241).
98    pub source_for_running_mean: Option<String>,
99    /// When Some, over_window uses order-sensitive first/last (PySpark first_value/last_value semantics; #1145).
100    pub first_last_value: Option<FirstLastValue>,
101    /// When Some, over_window uses cum_count for running count semantics (PySpark count().over(order); #1218).
102    pub source_for_running_count: Option<String>,
103}
104
105/// True if the expression is or contains a count_distinct/n_unique aggregate (PySpark rejects distinct window functions).
106fn expr_is_or_contains_n_unique(expr: &Expr) -> bool {
107    match expr {
108        Expr::Agg(AggExpr::NUnique(_)) => true,
109        Expr::Cast { expr: inner, .. } => expr_is_or_contains_n_unique(inner.as_ref()),
110        Expr::Alias(inner, _) => expr_is_or_contains_n_unique(inner.as_ref()),
111        _ => false,
112    }
113}
114
115impl Column {
116    /// Create a new Column from a column name
117    pub fn new(name: String) -> Self {
118        Column {
119            name: name.clone(),
120            expr: col(&name),
121            is_array_expr: false,
122            deferred: None,
123            udf_call: None,
124            source_for_running: None,
125            source_for_running_mean: None,
126            first_last_value: None,
127            source_for_running_count: None,
128        }
129    }
130
131    /// Create a Column from a Polars Expr
132    pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
133        let display_name = name.unwrap_or_else(|| "<expr>".to_string());
134        Column {
135            name: display_name,
136            expr,
137            is_array_expr: false,
138            deferred: None,
139            udf_call: None,
140            source_for_running: None,
141            source_for_running_mean: None,
142            first_last_value: None,
143            source_for_running_count: None,
144        }
145    }
146
147    /// Last aggregate for groupBy.agg() and for .over(window). When used with .over() and orderBy,
148    /// over_window uses first_last_value so "last" = current row (PySpark default frame parity).
149    pub fn from_last_agg(col: &Column) -> Self {
150        let value_expr = col.expr().clone();
151        let expr = value_expr.clone().last();
152        Column {
153            name: "last".to_string(),
154            expr,
155            is_array_expr: false,
156            deferred: None,
157            udf_call: None,
158            source_for_running: None,
159            source_for_running_mean: None,
160            first_last_value: Some(FirstLastValue {
161                value_expr,
162                is_last: true,
163            }),
164            source_for_running_count: None,
165        }
166    }
167
168    /// Create a Column for Python UDF call (eager execution at with_column).
169    pub fn from_udf_call(name: String, args: Vec<Column>) -> Self {
170        Column {
171            name: format!("{name}()"),
172            expr: lit(0i32), // dummy, never used
173            is_array_expr: false,
174            deferred: None,
175            udf_call: Some((name, args)),
176            source_for_running: None,
177            source_for_running_mean: None,
178            first_last_value: None,
179            source_for_running_count: None,
180        }
181    }
182
183    /// Create a Column for rand(seed). When used in with_column, generates one value per row (PySpark-like).
184    pub fn from_rand(seed: Option<u64>) -> Self {
185        let expr = lit(1i64).cum_sum(false).map(
186            move |c| expect_col(crate::udfs::apply_rand_with_seed(c, seed)),
187            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
188        );
189        Column {
190            name: "rand".to_string(),
191            expr,
192            is_array_expr: false,
193            deferred: Some(DeferredRandom::Rand(seed)),
194            udf_call: None,
195            source_for_running: None,
196            source_for_running_mean: None,
197            first_last_value: None,
198            source_for_running_count: None,
199        }
200    }
201
202    /// Create a Column for randn(seed). When used in with_column, generates one value per row (PySpark-like).
203    pub fn from_randn(seed: Option<u64>) -> Self {
204        let expr = lit(1i64).cum_sum(false).map(
205            move |c| expect_col(crate::udfs::apply_randn_with_seed(c, seed)),
206            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
207        );
208        Column {
209            name: "randn".to_string(),
210            expr,
211            is_array_expr: false,
212            deferred: Some(DeferredRandom::Randn(seed)),
213            udf_call: None,
214            source_for_running: None,
215            source_for_running_mean: None,
216            first_last_value: None,
217            source_for_running_count: None,
218        }
219    }
220
221    /// Get the underlying Polars Expr
222    pub fn expr(&self) -> &Expr {
223        &self.expr
224    }
225
226    /// Convert to Polars Expr (consumes self).
227    /// Applies the Column's display name as alias so row keys match (PySpark parity #1014, #1017, #1022).
228    pub fn into_expr(self) -> Expr {
229        self.expr.alias(&self.name)
230    }
231
232    /// Get the column name
233    pub fn name(&self) -> &str {
234        &self.name
235    }
236
237    /// If this column is a Python UDF call, return (udf_name, arg_column_names). Used by Python to run UDFs at with_column.
238    pub fn udf_call_info(&self) -> Option<(String, Vec<String>)> {
239        self.udf_call.as_ref().map(|(name, args)| {
240            (
241                name.clone(),
242                args.iter().map(|c| c.name().to_string()).collect(),
243            )
244        })
245    }
246
247    /// If this column is a Python UDF call, return (udf_name, args slice). Used to materialize expression args before calling the executor.
248    pub fn udf_call_with_args(&self) -> Option<(&str, &[Column])> {
249        self.udf_call
250            .as_ref()
251            .map(|(name, args)| (name.as_str(), args.as_slice()))
252    }
253
254    /// If this column is a literal expression, return its value as JSON string for Python UDF executor (literal args).
255    pub fn literal_as_json_string(&self) -> Option<String> {
256        match &self.expr {
257            Expr::Literal(lv) => crate::dataframe::literal_value_to_serde_value(lv)
258                .and_then(|v| serde_json::to_string(&v).ok()),
259            _ => None,
260        }
261    }
262
263    /// If this column is a Python UDF call, return (udf_name, arg_names, arg_literal_json_strings).
264    /// For each arg: None = use row[arg_name]; Some(json) = literal value (not in row).
265    pub fn udf_call_info_with_literals(
266        &self,
267    ) -> Option<(String, Vec<String>, Vec<Option<String>>)> {
268        self.udf_call.as_ref().map(|(name, args)| {
269            let arg_names: Vec<String> = args.iter().map(|c| c.name().to_string()).collect();
270            let literals: Vec<Option<String>> =
271                args.iter().map(|c| c.literal_as_json_string()).collect();
272            (name.clone(), arg_names, literals)
273        })
274    }
275
276    /// Alias the column
277    pub fn alias(&self, name: &str) -> Column {
278        Column {
279            name: name.to_string(),
280            expr: self.expr.clone().alias(name),
281            is_array_expr: self.is_array_expr,
282            deferred: self.deferred,
283            udf_call: self.udf_call.clone(),
284            source_for_running: self.source_for_running.clone(),
285            source_for_running_mean: self.source_for_running_mean.clone(),
286            first_last_value: self.first_last_value.clone(),
287            source_for_running_count: self.source_for_running_count.clone(),
288        }
289    }
290
291    /// Ascending sort, nulls first (Spark default for ASC). PySpark asc.
292    pub fn asc(&self) -> crate::functions::SortOrder {
293        crate::functions::asc(self)
294    }
295
296    /// Ascending sort, nulls first. PySpark asc_nulls_first.
297    pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
298        crate::functions::asc_nulls_first(self)
299    }
300
301    /// Ascending sort, nulls last. PySpark asc_nulls_last.
302    pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
303        crate::functions::asc_nulls_last(self)
304    }
305
306    /// Descending sort, nulls last (Spark default for DESC). PySpark desc.
307    pub fn desc(&self) -> crate::functions::SortOrder {
308        crate::functions::desc(self)
309    }
310
311    /// Descending sort, nulls first. PySpark desc_nulls_first.
312    pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
313        crate::functions::desc_nulls_first(self)
314    }
315
316    /// Descending sort, nulls last. PySpark desc_nulls_last.
317    pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
318        crate::functions::desc_nulls_last(self)
319    }
320
321    /// Check if column is null
322    pub fn is_null(&self) -> Column {
323        Column {
324            name: format!("({} IS NULL)", self.name),
325            expr: self.expr.clone().is_null(),
326            is_array_expr: false,
327            deferred: None,
328            udf_call: None,
329            source_for_running: None,
330            source_for_running_mean: None,
331            first_last_value: None,
332            source_for_running_count: None,
333        }
334    }
335
336    /// Check if column is not null
337    pub fn is_not_null(&self) -> Column {
338        Column {
339            name: format!("({} IS NOT NULL)", self.name),
340            expr: self.expr.clone().is_not_null(),
341            is_array_expr: false,
342            deferred: None,
343            udf_call: None,
344            source_for_running: None,
345            source_for_running_mean: None,
346            first_last_value: None,
347            source_for_running_count: None,
348        }
349    }
350
351    /// Alias for is_null. PySpark isnull.
352    pub fn isnull(&self) -> Column {
353        self.is_null()
354    }
355
356    /// Alias for is_not_null. PySpark isnotnull.
357    pub fn isnotnull(&self) -> Column {
358        self.is_not_null()
359    }
360
361    /// Create a null boolean expression
362    fn null_boolean_expr() -> Expr {
363        use polars::prelude::*;
364        // Create an expression that is always a null boolean
365        lit(NULL).cast(DataType::Boolean)
366    }
367
368    /// SQL LIKE pattern matching (% = any chars, _ = one char). PySpark like.
369    /// When escape_char is Some(esc), esc + char treats that char as literal (e.g. \\% = literal %).
370    pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
371        let regex = like_pattern_to_regex(pattern, escape_char);
372        self.regexp_like(&regex)
373    }
374
375    /// Case-insensitive LIKE. PySpark ilike.
376    /// When escape_char is Some(esc), esc + char treats that char as literal.
377    pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
378        use polars::prelude::*;
379        let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
380        Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
381    }
382
383    /// PySpark-style equality comparison (NULL == NULL returns NULL, not True)
384    /// Any comparison involving NULL returns NULL
385    ///
386    /// Explicitly wraps comparisons with null checks to ensure PySpark semantics.
387    /// If either side is NULL, the result is NULL.
388    pub fn eq_pyspark(&self, other: &Column) -> Column {
389        // Check if either side is NULL
390        let left_null = self.expr().clone().is_null();
391        let right_null = other.expr().clone().is_null();
392        let either_null = left_null.clone().or(right_null.clone());
393
394        // Standard equality comparison
395        let eq_result = self.expr().clone().eq(other.expr().clone());
396
397        // Wrap: if either is null, return null boolean, else return comparison result
398        let null_boolean = Self::null_boolean_expr();
399        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
400            .then(&Self::from_expr(null_boolean, None))
401            .otherwise(&Self::from_expr(eq_result, None));
402
403        Self::from_expr(null_aware_expr.into_expr(), None)
404    }
405
406    /// PySpark-style inequality comparison (NULL != NULL returns NULL, not False)
407    /// Any comparison involving NULL returns NULL
408    pub fn ne_pyspark(&self, other: &Column) -> Column {
409        // Check if either side is NULL
410        let left_null = self.expr().clone().is_null();
411        let right_null = other.expr().clone().is_null();
412        let either_null = left_null.clone().or(right_null.clone());
413
414        // Standard inequality comparison
415        let ne_result = self.expr().clone().neq(other.expr().clone());
416
417        // Wrap: if either is null, return null boolean, else return comparison result
418        let null_boolean = Self::null_boolean_expr();
419        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
420            .then(&Self::from_expr(null_boolean, None))
421            .otherwise(&Self::from_expr(ne_result, None));
422
423        Self::from_expr(null_aware_expr.into_expr(), None)
424    }
425
426    /// Null-safe equality (NULL <=> NULL returns True)
427    /// PySpark's eqNullSafe() method. Applies type coercion (e.g. string vs int) for PySpark parity (#266).
428    pub fn eq_null_safe(&self, other: &Column) -> Column {
429        use crate::functions::{lit_bool, when};
430
431        let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(
432            self.expr().clone(),
433            other.expr().clone(),
434        )
435        .unwrap_or_else(|_| (self.expr().clone(), other.expr().clone()));
436
437        let left_null = left_c.clone().is_null();
438        let right_null = right_c.clone().is_null();
439        let both_null = left_null.clone().and(right_null.clone());
440        let either_null = left_null.clone().or(right_null.clone());
441
442        // Standard equality (on coerced exprs)
443        let eq_result = left_c.eq(right_c);
444
445        // If both are null, return True
446        // If either is null (but not both), return False
447        // Otherwise, return standard equality result
448        when(&Self::from_expr(both_null, None))
449            .then(&lit_bool(true))
450            .otherwise(
451                &when(&Self::from_expr(either_null, None))
452                    .then(&lit_bool(false))
453                    .otherwise(&Self::from_expr(eq_result, None)),
454            )
455    }
456
457    /// Create a Column that is always a null boolean.
458    /// This is useful for downstream bindings (e.g. PyO3) that need a null literal
459    /// without depending directly on Polars types like `Expr` or `LiteralValue`.
460    pub fn null_boolean() -> Column {
461        Column::from_expr(Self::null_boolean_expr(), None)
462    }
463
464    /// Create a Column that is always a null value of the given type.
465    /// `dtype` is a type name string (e.g. `"boolean"`, `"string"`, `"bigint"`, `"double"`).
466    /// See [`crate::functions::parse_type_name`] for supported names.
467    /// Returns `Err` on unknown type name so bindings get a clear error.
468    pub fn lit_null(dtype: &str) -> Result<Column, String> {
469        use polars::prelude::{NULL, lit};
470        let dt = crate::functions::parse_type_name(dtype)?;
471        Ok(Column::from_expr(lit(NULL).cast(dt), None))
472    }
473
474    /// Create a Column from a boolean literal. Convenience for bindings that prefer method form.
475    pub fn from_bool(b: bool) -> Column {
476        crate::functions::lit_bool(b)
477    }
478
479    /// Create a Column from an i64 literal. Convenience for bindings that prefer method form.
480    pub fn from_i64(n: i64) -> Column {
481        crate::functions::lit_i64(n)
482    }
483
484    /// Create a Column from a string literal. Convenience for bindings that prefer method form.
485    pub fn from_string(s: &str) -> Column {
486        crate::functions::lit_str(s)
487    }
488
489    /// PySpark-style greater-than comparison (NULL > value returns NULL)
490    /// Any comparison involving NULL returns NULL
491    pub fn gt_pyspark(&self, other: &Column) -> Column {
492        // Check if either side is NULL
493        let left_null = self.expr().clone().is_null();
494        let right_null = other.expr().clone().is_null();
495        let either_null = left_null.clone().or(right_null.clone());
496
497        // Standard greater-than comparison
498        let gt_result = self.expr().clone().gt(other.expr().clone());
499
500        // Wrap: if either is null, return null boolean, else return comparison result
501        let null_boolean = Self::null_boolean_expr();
502        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
503            .then(&Self::from_expr(null_boolean, None))
504            .otherwise(&Self::from_expr(gt_result, None));
505
506        Self::from_expr(null_aware_expr.into_expr(), None)
507    }
508
509    /// PySpark-style greater-than-or-equal comparison
510    /// Any comparison involving NULL returns NULL
511    pub fn ge_pyspark(&self, other: &Column) -> Column {
512        // Check if either side is NULL
513        let left_null = self.expr().clone().is_null();
514        let right_null = other.expr().clone().is_null();
515        let either_null = left_null.clone().or(right_null.clone());
516
517        // Standard greater-than-or-equal comparison
518        let ge_result = self.expr().clone().gt_eq(other.expr().clone());
519
520        // Wrap: if either is null, return null boolean, else return comparison result
521        let null_boolean = Self::null_boolean_expr();
522        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
523            .then(&Self::from_expr(null_boolean, None))
524            .otherwise(&Self::from_expr(ge_result, None));
525
526        Self::from_expr(null_aware_expr.into_expr(), None)
527    }
528
529    /// PySpark-style less-than comparison
530    /// Any comparison involving NULL returns NULL
531    pub fn lt_pyspark(&self, other: &Column) -> Column {
532        // Check if either side is NULL
533        let left_null = self.expr().clone().is_null();
534        let right_null = other.expr().clone().is_null();
535        let either_null = left_null.clone().or(right_null.clone());
536
537        // Standard less-than comparison
538        let lt_result = self.expr().clone().lt(other.expr().clone());
539
540        // Wrap: if either is null, return null boolean, else return comparison result
541        let null_boolean = Self::null_boolean_expr();
542        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
543            .then(&Self::from_expr(null_boolean, None))
544            .otherwise(&Self::from_expr(lt_result, None));
545
546        Self::from_expr(null_aware_expr.into_expr(), None)
547    }
548
549    /// PySpark-style less-than-or-equal comparison
550    /// Any comparison involving NULL returns NULL
551    pub fn le_pyspark(&self, other: &Column) -> Column {
552        // Check if either side is NULL
553        let left_null = self.expr().clone().is_null();
554        let right_null = other.expr().clone().is_null();
555        let either_null = left_null.clone().or(right_null.clone());
556
557        // Standard less-than-or-equal comparison
558        let le_result = self.expr().clone().lt_eq(other.expr().clone());
559
560        // Wrap: if either is null, return null boolean, else return comparison result
561        let null_boolean = Self::null_boolean_expr();
562        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
563            .then(&Self::from_expr(null_boolean, None))
564            .otherwise(&Self::from_expr(le_result, None));
565
566        Self::from_expr(null_aware_expr.into_expr(), None)
567    }
568
569    // Standard comparison methods that work with Expr (for literals and columns)
570    // These delegate to Polars and may not match PySpark null semantics exactly.
571    // Use _pyspark variants for explicit PySpark semantics.
572
573    /// Greater than comparison
574    pub fn gt(&self, other: Expr) -> Column {
575        Self::from_expr(self.expr().clone().gt(other), None)
576    }
577
578    /// Greater than or equal comparison
579    pub fn gt_eq(&self, other: Expr) -> Column {
580        Self::from_expr(self.expr().clone().gt_eq(other), None)
581    }
582
583    /// Less than comparison
584    pub fn lt(&self, other: Expr) -> Column {
585        Self::from_expr(self.expr().clone().lt(other), None)
586    }
587
588    /// Less than or equal comparison
589    pub fn lt_eq(&self, other: Expr) -> Column {
590        Self::from_expr(self.expr().clone().lt_eq(other), None)
591    }
592
593    /// True if column value is between lower and upper (inclusive). PySpark between(low, high).
594    /// Applies string–numeric coercion so col("val").between(1, 10) works when val is string (#628).
595    pub fn between(&self, lower: &Column, upper: &Column) -> Column {
596        use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
597        use polars::prelude::*;
598
599        let left = self.expr().clone();
600        let lower_expr = lower.expr().clone();
601        let upper_expr = upper.expr().clone();
602
603        let infer_lit_type = |e: &Expr| -> Option<DataType> {
604            if let Expr::Literal(lv) = e {
605                let dt = lv.get_datatype();
606                if matches!(dt, DataType::Unknown(_)) {
607                    None
608                } else {
609                    Some(dt)
610                }
611            } else {
612                None
613            }
614        };
615
616        let lower_ty = infer_lit_type(&lower_expr).unwrap_or(DataType::String);
617        let upper_ty = infer_lit_type(&upper_expr).unwrap_or(DataType::String);
618        let lt = DataType::String;
619
620        let (left_c, lower_c) = match coerce_for_pyspark_comparison(
621            left.clone(),
622            lower_expr.clone(),
623            &lt,
624            &lower_ty,
625            &CompareOp::GtEq,
626        ) {
627            Ok((a, b)) => (a, b),
628            Err(_) => (left.clone(), lower_expr),
629        };
630
631        let upper_clone = upper.expr().clone();
632        let (left_cc, upper_c) = match coerce_for_pyspark_comparison(
633            left_c.clone(),
634            upper_expr,
635            &lt,
636            &upper_ty,
637            &CompareOp::LtEq,
638        ) {
639            Ok((a, b)) => (a, b),
640            Err(_) => (left_c.clone(), upper_clone),
641        };
642
643        let ge = left_cc.clone().gt_eq(lower_c);
644        let le = left_cc.lt_eq(upper_c);
645        Self::from_expr(ge.and(le), None)
646    }
647
648    /// Equality comparison
649    pub fn eq(&self, other: Expr) -> Column {
650        Self::from_expr(self.expr().clone().eq(other), None)
651    }
652
653    /// Inequality comparison
654    pub fn neq(&self, other: Expr) -> Column {
655        Self::from_expr(self.expr().clone().neq(other), None)
656    }
657
658    /// Logical AND of two boolean columns (PySpark and_).
659    pub fn and_(&self, other: &Column) -> Column {
660        Self::from_expr(self.expr().clone().and(other.expr().clone()), None)
661    }
662
663    /// Logical OR of two boolean columns (PySpark or_).
664    pub fn or_(&self, other: &Column) -> Column {
665        Self::from_expr(self.expr().clone().or(other.expr().clone()), None)
666    }
667
668    // Equality comparison with special handling for string-vs-numeric literals (issue #235).
669    //
670    // When comparing a column to a numeric literal (e.g. col("s") == lit(123)), Polars
671    // normally raises `cannot compare string with numeric type` if the column is a
672    // string column. PySpark, however, coerces types (string → numeric) and performs
673    // the comparison, treating invalid strings as null (non-matching in filters).
674
675    // --- String functions ---
676
677    /// Convert string column to uppercase (PySpark upper)
678    pub fn upper(&self) -> Column {
679        Self::from_expr(self.expr().clone().str().to_uppercase(), None)
680    }
681
682    /// Convert string column to lowercase (PySpark lower)
683    pub fn lower(&self) -> Column {
684        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
685    }
686
687    /// Alias for lower. PySpark lcase.
688    pub fn lcase(&self) -> Column {
689        self.lower()
690    }
691
692    /// Alias for upper. PySpark ucase.
693    pub fn ucase(&self) -> Column {
694        self.upper()
695    }
696
697    /// Substring with 1-based start (PySpark substring/substr semantics).
698    /// - Start 0: 0-based (first char), for Sparkless/Python parity (#875).
699    /// - Positive start: 1-based index (1 = first char).
700    /// - Negative start: count from end (e.g. -3 = third char from end).
701    /// - Length less than 1: empty string; null input yields null (Phase 7 / PySpark parity).
702    pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
703        use polars::prelude::*;
704        // PySpark: len < 1 -> empty string; null input must stay null (test_substr_pyspark_parity_comprehensive)
705        if length.map(|l| l < 1).unwrap_or(false) {
706            let expr = when(self.expr().clone().is_null())
707                .then(lit(NULL))
708                .otherwise(lit(""));
709            return Self::from_expr(expr, None);
710        }
711        let len_chars = self.expr().clone().str().len_chars();
712        // Start 0: 0-based (substr(0, 3) = "Hel"); start >= 1: 1-based; negative: from end.
713        let offset_expr = if start == 0 {
714            lit(0i64)
715        } else if start >= 1 {
716            lit((start - 1).max(0))
717        } else {
718            let from_end = len_chars + lit(start);
719            when(from_end.clone().lt(lit(0i64)))
720                .then(lit(0i64))
721                .otherwise(from_end)
722        };
723        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
724        Self::from_expr(
725            self.expr().clone().str().slice(offset_expr, length_expr),
726            None,
727        )
728    }
729
730    /// String length in characters (PySpark length)
731    pub fn length(&self) -> Column {
732        Self::from_expr(self.expr().clone().str().len_chars(), None)
733    }
734
735    /// Bit length of string in bytes * 8 (PySpark bit_length).
736    pub fn bit_length(&self) -> Column {
737        use polars::prelude::*;
738        let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
739        Self::from_expr((len_bytes * lit(8i32)).cast(DataType::Int32), None)
740    }
741
742    /// Length of string in bytes (PySpark octet_length).
743    pub fn octet_length(&self) -> Column {
744        use polars::prelude::*;
745        Self::from_expr(
746            self.expr().clone().str().len_bytes().cast(DataType::Int32),
747            None,
748        )
749    }
750
751    /// Length of string in characters (PySpark char_length). Alias of length().
752    pub fn char_length(&self) -> Column {
753        self.length()
754    }
755
756    /// Length of string in characters (PySpark character_length). Alias of length().
757    pub fn character_length(&self) -> Column {
758        self.length()
759    }
760
761    /// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
762    pub fn encode(&self, charset: &str) -> Column {
763        let charset = charset.to_string();
764        let expr = self.expr().clone().map(
765            move |s| expect_col(crate::udfs::apply_encode(s, &charset)),
766            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
767        );
768        Self::from_expr(expr, None)
769    }
770
771    /// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
772    pub fn decode(&self, charset: &str) -> Column {
773        let charset = charset.to_string();
774        let expr = self.expr().clone().map(
775            move |s| expect_col(crate::udfs::apply_decode(s, &charset)),
776            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
777        );
778        Self::from_expr(expr, None)
779    }
780
781    /// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'. Returns hex string.
782    pub fn to_binary(&self, fmt: &str) -> Column {
783        let fmt = fmt.to_string();
784        let expr = self.expr().clone().map(
785            move |s| expect_col(crate::udfs::apply_to_binary(s, &fmt)),
786            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
787        );
788        Self::from_expr(expr, None)
789    }
790
791    /// Try convert to binary; null on failure (PySpark try_to_binary).
792    pub fn try_to_binary(&self, fmt: &str) -> Column {
793        let fmt = fmt.to_string();
794        let expr = self.expr().clone().map(
795            move |s| expect_col(crate::udfs::apply_try_to_binary(s, &fmt)),
796            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
797        );
798        Self::from_expr(expr, None)
799    }
800
801    /// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM. Output hex(nonce||ciphertext).
802    pub fn aes_encrypt(&self, key: &str) -> Column {
803        let key = key.to_string();
804        let expr = self.expr().clone().map(
805            move |s| expect_col(crate::udfs::apply_aes_encrypt(s, &key)),
806            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
807        );
808        Self::from_expr(expr, None)
809    }
810
811    /// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext). Null on failure.
812    pub fn aes_decrypt(&self, key: &str) -> Column {
813        let key = key.to_string();
814        let expr = self.expr().clone().map(
815            move |s| expect_col(crate::udfs::apply_aes_decrypt(s, &key)),
816            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
817        );
818        Self::from_expr(expr, None)
819    }
820
821    /// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
822    pub fn try_aes_decrypt(&self, key: &str) -> Column {
823        let key = key.to_string();
824        let expr = self.expr().clone().map(
825            move |s| expect_col(crate::udfs::apply_try_aes_decrypt(s, &key)),
826            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
827        );
828        Self::from_expr(expr, None)
829    }
830
831    /// Data type as string (PySpark typeof). Uses dtype from schema.
832    pub fn typeof_(&self) -> Column {
833        Self::from_expr(
834            self.expr().clone().map(
835                |s| expect_col(crate::udfs::apply_typeof(s)),
836                |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
837            ),
838            None,
839        )
840    }
841
842    /// Trim leading and trailing whitespace (PySpark trim). Default behavior trims ASCII space only
843    /// so tabs are preserved when using nested LTRIM/RTRIM (issue #434 / #1078).
844    pub fn trim(&self) -> Column {
845        use polars::prelude::*;
846        Self::from_expr(self.expr().clone().str().strip_chars(lit(" ")), None)
847    }
848
849    /// Trim leading whitespace (PySpark ltrim)
850    pub fn ltrim(&self) -> Column {
851        use polars::prelude::*;
852        Self::from_expr(self.expr().clone().str().strip_chars_start(lit(" ")), None)
853    }
854
855    /// Trim trailing whitespace (PySpark rtrim)
856    pub fn rtrim(&self) -> Column {
857        use polars::prelude::*;
858        Self::from_expr(self.expr().clone().str().strip_chars_end(lit(" ")), None)
859    }
860
861    /// Trim leading and trailing characters (PySpark btrim). trim_str defaults to whitespace.
862    pub fn btrim(&self, trim_str: Option<&str>) -> Column {
863        use polars::prelude::*;
864        let chars = trim_str.unwrap_or(" ");
865        Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
866    }
867
868    /// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
869    pub fn locate(&self, substr: &str, pos: i64) -> Column {
870        use polars::prelude::*;
871        if substr.is_empty() {
872            return Self::from_expr(lit(1i64), None);
873        }
874        let start = (pos - 1).max(0);
875        let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
876        let found = slice_expr.str().find_literal(lit(substr.to_string()));
877        let expr = (found.cast(DataType::Int64) + lit(start + 1))
878            .fill_null(lit(0i64))
879            .cast(DataType::Int64);
880        Self::from_expr(expr, None)
881    }
882
883    /// Base conversion (PySpark conv). num_str from from_base to to_base.
884    pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
885        let expr = self.expr().clone().map(
886            move |s| expect_col(crate::udfs::apply_conv(s, from_base, to_base)),
887            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
888        );
889        Self::from_expr(expr, None)
890    }
891
892    /// Convert to hex string (PySpark hex). Int or string input.
893    pub fn hex(&self) -> Column {
894        let expr = self.expr().clone().map(
895            |s| expect_col(crate::udfs::apply_hex(s)),
896            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
897        );
898        Self::from_expr(expr, None)
899    }
900
901    /// Convert hex string to binary/string (PySpark unhex).
902    pub fn unhex(&self) -> Column {
903        let expr = self.expr().clone().map(
904            |s| expect_col(crate::udfs::apply_unhex(s)),
905            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
906        );
907        Self::from_expr(expr, None)
908    }
909
910    /// Convert integer to binary string (PySpark bin).
911    pub fn bin(&self) -> Column {
912        let expr = self.expr().clone().map(
913            |s| expect_col(crate::udfs::apply_bin(s)),
914            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
915        );
916        Self::from_expr(expr, None)
917    }
918
919    /// Get bit at 0-based position (PySpark getbit).
920    pub fn getbit(&self, pos: i64) -> Column {
921        let expr = self.expr().clone().map(
922            move |s| expect_col(crate::udfs::apply_getbit(s, pos)),
923            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
924        );
925        Self::from_expr(expr, None)
926    }
927
928    /// Bitwise AND of two integer/boolean columns (PySpark bit_and).
929    pub fn bit_and(&self, other: &Column) -> Column {
930        let args = [other.expr().clone()];
931        let expr = self.expr().clone().cast(DataType::Int64).map_many(
932            |cols| expect_col(crate::udfs::apply_bit_and(cols)),
933            &args,
934            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
935        );
936        Self::from_expr(expr, None)
937    }
938
939    /// Bitwise OR of two integer/boolean columns (PySpark bit_or).
940    pub fn bit_or(&self, other: &Column) -> Column {
941        let args = [other.expr().clone()];
942        let expr = self.expr().clone().cast(DataType::Int64).map_many(
943            |cols| expect_col(crate::udfs::apply_bit_or(cols)),
944            &args,
945            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
946        );
947        Self::from_expr(expr, None)
948    }
949
950    /// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
951    pub fn bit_xor(&self, other: &Column) -> Column {
952        let args = [other.expr().clone()];
953        let expr = self.expr().clone().cast(DataType::Int64).map_many(
954            |cols| expect_col(crate::udfs::apply_bit_xor(cols)),
955            &args,
956            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
957        );
958        Self::from_expr(expr, None)
959    }
960
961    /// Count of set bits in the integer representation (PySpark bit_count).
962    pub fn bit_count(&self) -> Column {
963        let expr = self.expr().clone().map(
964            |s| expect_col(crate::udfs::apply_bit_count(s)),
965            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
966        );
967        Self::from_expr(expr, None)
968    }
969
970    /// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
971    /// When err_msg is Some, it is used in the error message when assertion fails.
972    pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
973        let msg = err_msg.map(String::from);
974        let expr = self.expr().clone().map(
975            move |c| expect_col(crate::udfs::apply_assert_true(c, msg.as_deref())),
976            |_schema, field| Ok(field.clone()),
977        );
978        Self::from_expr(expr, None)
979    }
980
981    /// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
982    /// #859: Coerce via map so Unknown(Any) from when/otherwise is cast to Int64 at execution time.
983    pub fn bitwise_not(&self) -> Column {
984        use polars::prelude::Field;
985        let expr = self.expr().clone().map(
986            move |col| expect_col(crate::udfs::apply_coerce_to_int64_for_bitwise(col)),
987            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
988        );
989        let expr = (lit(-1i64) - expr).cast(DataType::Int64);
990        Self::from_expr(expr, None)
991    }
992
993    /// Logical NOT of a boolean column (PySpark `~` on Column = boolean NOT).
994    /// Fails at execution time if column is not Boolean (#405, #1236); use F.expr("~x") for bitwise NOT on integers.
995    pub fn logical_not(&self) -> Column {
996        let expr = self.expr().clone().map(
997            move |col| expect_col(crate::udfs::apply_logical_not_boolean_only(col)),
998            |_schema, field| {
999                if field.dtype() == &DataType::Boolean {
1000                    Ok(field.clone())
1001                } else {
1002                    Err(PolarsError::ComputeError(
1003                        "logical NOT (~) requires boolean type".into(),
1004                    ))
1005                }
1006            },
1007        );
1008        Self::from_expr(expr, None)
1009    }
1010
1011    /// Parse string to map (PySpark str_to_map). "k1:v1,k2:v2" -> map.
1012    pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
1013        let pair_delim = pair_delim.to_string();
1014        let key_value_delim = key_value_delim.to_string();
1015        let expr = self.expr().clone().map(
1016            move |s| {
1017                expect_col(crate::udfs::apply_str_to_map(
1018                    s,
1019                    &pair_delim,
1020                    &key_value_delim,
1021                ))
1022            },
1023            |_schema, field| Ok(field.clone()),
1024        );
1025        Self::from_expr(expr, None)
1026    }
1027
1028    /// True if regex pattern contains lookahead/lookbehind (Polars regex does not support these).
1029    fn pattern_has_lookaround(pattern: &str) -> bool {
1030        let p = pattern.as_bytes();
1031        let n = p.len();
1032        let mut i = 0;
1033        while i + 2 < n {
1034            if p[i] == b'(' && p[i + 1] == b'?' {
1035                match p[i + 2] {
1036                    b'=' | b'!' => return true, // (?= (?! lookahead
1037                    b'<' if i + 4 <= n && (p[i + 3] == b'=' || p[i + 3] == b'!') => return true, // (?<= (?<! lookbehind
1038                    _ => {}
1039                }
1040            }
1041            i += 1;
1042        }
1043        false
1044    }
1045
1046    /// Extract first match of regex pattern (PySpark regexp_extract). Group 0 = full match.
1047    /// When pattern contains lookahead/lookbehind, uses fancy-regex (Polars regex does not support them).
1048    pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
1049        use polars::prelude::*;
1050        if Self::pattern_has_lookaround(pattern) {
1051            let pat = pattern.to_string();
1052            let group = group_index;
1053            Self::from_expr(
1054                self.expr().clone().map(
1055                    move |s| {
1056                        expect_col(crate::udfs::apply_regexp_extract_lookaround(s, &pat, group))
1057                    },
1058                    |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1059                ),
1060                None,
1061            )
1062        } else {
1063            let pat = pattern.to_string();
1064            Self::from_expr(
1065                // PySpark implicitly casts non-string inputs to string for regex functions,
1066                // returns empty string for no-match, and null only for null input.
1067                self.expr().clone().map(
1068                    move |s| expect_col(crate::udfs::apply_regexp_extract(s, &pat, group_index)),
1069                    |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1070                ),
1071                None,
1072            )
1073        }
1074    }
1075
1076    /// Replace all matches of regex pattern (PySpark regexp_replace: global replace).
1077    pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
1078        use polars::prelude::*;
1079        let pat = pattern.to_string();
1080        let rep = replacement.to_string();
1081        Self::from_expr(
1082            // PySpark implicitly casts non-string inputs to string for regex functions.
1083            self.expr()
1084                .clone()
1085                .cast(DataType::String)
1086                .str()
1087                .replace_all(lit(pat), lit(rep), false),
1088            None,
1089        )
1090    }
1091
1092    /// Leftmost n characters (PySpark left).
1093    pub fn left(&self, n: i64) -> Column {
1094        use polars::prelude::*;
1095        let len = n.max(0) as u32;
1096        Self::from_expr(
1097            self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
1098            None,
1099        )
1100    }
1101
1102    /// Rightmost n characters (PySpark right).
1103    pub fn right(&self, n: i64) -> Column {
1104        use polars::prelude::*;
1105        let n_val = n.max(0);
1106        let n_expr = lit(n_val);
1107        let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
1108        let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
1109            .then(lit(0i64))
1110            .otherwise(len_chars - n_expr.clone());
1111        Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
1112    }
1113
1114    /// Replace all occurrences of literal search string with replacement (PySpark replace for literal).
1115    pub fn replace(&self, search: &str, replacement: &str) -> Column {
1116        use polars::prelude::*;
1117        Self::from_expr(
1118            self.expr().clone().str().replace_all(
1119                lit(search.to_string()),
1120                lit(replacement.to_string()),
1121                true,
1122            ),
1123            None,
1124        )
1125    }
1126
1127    /// Replace multiple (search, replacement) pairs in order (PySpark replace with dict/list).
1128    pub fn replace_many(&self, pairs: &[(String, String)]) -> Column {
1129        let mut out = self.clone();
1130        for (search, replacement) in pairs {
1131            out = out.replace(search, replacement);
1132        }
1133        out
1134    }
1135
1136    /// True if string starts with prefix (PySpark startswith).
1137    pub fn startswith(&self, prefix: &str) -> Column {
1138        use polars::prelude::*;
1139        Self::from_expr(
1140            self.expr()
1141                .clone()
1142                .str()
1143                .starts_with(lit(prefix.to_string())),
1144            None,
1145        )
1146    }
1147
1148    /// True if string ends with suffix (PySpark endswith).
1149    pub fn endswith(&self, suffix: &str) -> Column {
1150        use polars::prelude::*;
1151        Self::from_expr(
1152            self.expr().clone().str().ends_with(lit(suffix.to_string())),
1153            None,
1154        )
1155    }
1156
1157    /// True if string contains substring (literal, not regex). PySpark contains.
1158    pub fn contains(&self, substring: &str) -> Column {
1159        use polars::prelude::*;
1160        Self::from_expr(
1161            self.expr()
1162                .clone()
1163                .str()
1164                .contains(lit(substring.to_string()), true),
1165            None,
1166        )
1167    }
1168
1169    /// Split string by delimiter (PySpark split). Returns list of strings.
1170    /// When limit is Some(n) with n > 0, at most n parts; remainder in last part. None or <= 0: no limit.
1171    /// Uses literal split so "|" is not interpreted as regex alternation.
1172    pub fn split(&self, delimiter: &str, limit: Option<i32>) -> Column {
1173        use polars::prelude::*;
1174        let use_limit = limit.is_some_and(|l| l > 0);
1175        if use_limit {
1176            let delim = delimiter.to_string();
1177            let lim = limit.unwrap_or(0);
1178            let expr = self.expr().clone().map(
1179                move |col| expect_col(crate::udfs::apply_split_with_limit(col, &delim, lim)),
1180                |_schema, field| {
1181                    Ok(Field::new(
1182                        field.name().clone(),
1183                        DataType::List(Box::new(DataType::String)),
1184                    ))
1185                },
1186            );
1187            Self::from_expr(expr, None)
1188        } else {
1189            Self::from_expr(
1190                self.expr().clone().str().split(lit(delimiter.to_string())),
1191                None,
1192            )
1193        }
1194    }
1195
1196    /// Title case: first letter of each word uppercase (PySpark initcap). Uses UDF; Polars has no to_titlecase in 0.53.
1197    pub fn initcap(&self) -> Column {
1198        let expr = self.expr().clone().map(
1199            |s| expect_col(crate::udfs::apply_initcap(s)),
1200            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1201        );
1202        Self::from_expr(expr, None)
1203    }
1204
1205    /// Extract all matches of regex (PySpark regexp_extract_all). Returns list of strings.
1206    pub fn regexp_extract_all(&self, pattern: &str) -> Column {
1207        use polars::prelude::*;
1208        Self::from_expr(
1209            self.expr()
1210                .clone()
1211                .str()
1212                .extract_all(lit(pattern.to_string())),
1213            None,
1214        )
1215    }
1216
1217    /// Extract all matches of regex with capture group index (PySpark regexp_extract_all(col, pattern, idx)).
1218    /// idx=0 returns whole match; idx>0 returns capture group.
1219    pub fn regexp_extract_all_group(&self, pattern: &str, group_index: usize) -> Column {
1220        if group_index == 0 {
1221            return self.regexp_extract_all(pattern);
1222        }
1223        use polars::prelude::*;
1224        let pat = pattern.to_string();
1225        let idx = group_index;
1226        let expr = self.expr().clone().map(
1227            move |s| expect_col(crate::udfs::apply_regexp_extract_all_group(s, &pat, idx)),
1228            |_schema, field| {
1229                Ok(Field::new(
1230                    field.name().clone(),
1231                    DataType::List(Box::new(DataType::String)),
1232                ))
1233            },
1234        );
1235        Self::from_expr(expr, None)
1236    }
1237
1238    /// Check if string matches regex (PySpark regexp_like / rlike).
1239    pub fn regexp_like(&self, pattern: &str) -> Column {
1240        use polars::prelude::*;
1241        // Polars regex engine (regex crate) does not support lookaround; use fancy-regex fallback.
1242        if pattern.contains("(?=")
1243            || pattern.contains("(?!")
1244            || pattern.contains("(?<=")
1245            || pattern.contains("(?<!")
1246        {
1247            let pat = pattern.to_string();
1248            let expr = self.expr().clone().map(
1249                move |s| expect_col(crate::udfs::apply_regexp_like_lookaround(s, &pat)),
1250                |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1251            );
1252            return Self::from_expr(expr, None);
1253        }
1254        Self::from_expr(
1255            self.expr()
1256                .clone()
1257                .str()
1258                .contains(lit(pattern.to_string()), false),
1259            None,
1260        )
1261    }
1262
1263    /// Count of non-overlapping regex matches (PySpark regexp_count).
1264    pub fn regexp_count(&self, pattern: &str) -> Column {
1265        use polars::prelude::*;
1266        Self::from_expr(
1267            self.expr()
1268                .clone()
1269                .str()
1270                .count_matches(lit(pattern.to_string()), false)
1271                .cast(DataType::Int64),
1272            None,
1273        )
1274    }
1275
1276    /// First substring matching regex (PySpark regexp_substr). Null if no match.
1277    pub fn regexp_substr(&self, pattern: &str) -> Column {
1278        self.regexp_extract(pattern, 0)
1279    }
1280
1281    /// 1-based position of first regex match (PySpark regexp_instr). group_idx 0 = full match; null if no match.
1282    pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
1283        let idx = group_idx.unwrap_or(0);
1284        let pattern = pattern.to_string();
1285        let expr = self.expr().clone().map(
1286            move |s| expect_col(crate::udfs::apply_regexp_instr(s, pattern.clone(), idx)),
1287            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1288        );
1289        Self::from_expr(expr, None)
1290    }
1291
1292    /// 1-based index of self in comma-delimited set column (PySpark find_in_set). 0 if not found or self contains comma.
1293    pub fn find_in_set(&self, set_column: &Column) -> Column {
1294        let args = [set_column.expr().clone()];
1295        let expr = self.expr().clone().map_many(
1296            |cols| expect_col(crate::udfs::apply_find_in_set(cols)),
1297            &args,
1298            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
1299        );
1300        Self::from_expr(expr, None)
1301    }
1302
1303    /// Repeat string column n times (PySpark repeat). Each element repeated n times.
1304    pub fn repeat(&self, n: i32) -> Column {
1305        use polars::prelude::*;
1306        // repeat_by yields List[str]; join to get a single string per row.
1307        Self::from_expr(
1308            self.expr()
1309                .clone()
1310                .repeat_by(lit(n as u32))
1311                .list()
1312                .join(lit(""), false),
1313            None,
1314        )
1315    }
1316
1317    /// Reverse string (PySpark reverse).
1318    pub fn reverse(&self) -> Column {
1319        Self::from_expr(self.expr().clone().str().reverse(), None)
1320    }
1321
1322    /// Find substring position (1-based; 0 if not found). PySpark instr(col, substr).
1323    pub fn instr(&self, substr: &str) -> Column {
1324        use polars::prelude::*;
1325        let found = self
1326            .expr()
1327            .clone()
1328            .str()
1329            .find_literal(lit(substr.to_string()));
1330        // Polars find_literal returns 0-based index (null if not found); PySpark is 1-based, 0 when not found.
1331        Self::from_expr(
1332            (found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
1333            None,
1334        )
1335    }
1336
1337    /// Left-pad string to length with pad character (PySpark lpad).
1338    pub fn lpad(&self, length: i32, pad: &str) -> Column {
1339        let pad_str = if pad.is_empty() { " " } else { pad };
1340        let fill = pad_str.chars().next().unwrap_or(' ');
1341        Self::from_expr(
1342            self.expr()
1343                .clone()
1344                .str()
1345                .pad_start(lit(length as i64), fill),
1346            None,
1347        )
1348    }
1349
1350    /// Right-pad string to length with pad character (PySpark rpad).
1351    pub fn rpad(&self, length: i32, pad: &str) -> Column {
1352        let pad_str = if pad.is_empty() { " " } else { pad };
1353        let fill = pad_str.chars().next().unwrap_or(' ');
1354        Self::from_expr(
1355            self.expr().clone().str().pad_end(lit(length as i64), fill),
1356            None,
1357        )
1358    }
1359
1360    /// Character-by-character translation (PySpark translate). Replaces each char in from_str with corresponding in to_str; if to_str is shorter, extra from chars are removed.
1361    pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
1362        use polars::prelude::*;
1363        let mut e = self.expr().clone();
1364        let from_chars: Vec<char> = from_str.chars().collect();
1365        let to_chars: Vec<char> = to_str.chars().collect();
1366        for (i, fc) in from_chars.iter().enumerate() {
1367            let f = fc.to_string();
1368            let t = to_chars
1369                .get(i)
1370                .map(|c| c.to_string())
1371                .unwrap_or_else(String::new); // PySpark: no replacement = drop char
1372            e = e.str().replace_all(lit(f), lit(t), true);
1373        }
1374        Self::from_expr(e, None)
1375    }
1376
1377    /// Mask string: replace uppercase with upper_char, lowercase with lower_char, digits with digit_char (PySpark mask).
1378    /// Defaults: upper 'X', lower 'x', digit 'n'; other chars unchanged.
1379    pub fn mask(
1380        &self,
1381        upper_char: Option<char>,
1382        lower_char: Option<char>,
1383        digit_char: Option<char>,
1384        other_char: Option<char>,
1385    ) -> Column {
1386        use polars::prelude::*;
1387        let upper = upper_char.unwrap_or('X').to_string();
1388        let lower = lower_char.unwrap_or('x').to_string();
1389        let digit = digit_char.unwrap_or('n').to_string();
1390        let other = other_char.map(|c| c.to_string());
1391        let mut e = self
1392            .expr()
1393            .clone()
1394            .str()
1395            .replace_all(lit("[A-Z]".to_string()), lit(upper), false)
1396            .str()
1397            .replace_all(lit("[a-z]".to_string()), lit(lower), false)
1398            .str()
1399            .replace_all(lit(r"\d".to_string()), lit(digit), false);
1400        if let Some(o) = other {
1401            e = e
1402                .str()
1403                .replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
1404        }
1405        Self::from_expr(e, None)
1406    }
1407
1408    /// Split by delimiter and return 1-based part (PySpark split_part).
1409    /// part_num > 0: from left; part_num < 0: from right; part_num = 0: null; out-of-range: empty string.
1410    pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
1411        use polars::prelude::*;
1412        if part_num == 0 {
1413            return Self::from_expr(lit(NULL), None);
1414        }
1415        let use_regex = delimiter == "|";
1416        if use_regex {
1417            let pattern = delimiter.to_string();
1418            let part = part_num;
1419            let get_expr = self.expr().clone().map(
1420                move |col| expect_col(crate::udfs::apply_split_part_regex(col, &pattern, part)),
1421                |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1422            );
1423            let expr = when(self.expr().clone().is_null())
1424                .then(lit(NULL))
1425                .otherwise(get_expr.fill_null(lit("")));
1426            return Self::from_expr(expr, None);
1427        }
1428        let delim = delimiter.to_string();
1429        let split_expr = self.expr().clone().str().split(lit(delim));
1430        let index = if part_num > 0 {
1431            lit(part_num - 1)
1432        } else {
1433            lit(part_num)
1434        };
1435        let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
1436        let expr = when(self.expr().clone().is_null())
1437            .then(lit(NULL))
1438            .otherwise(get_expr);
1439        Self::from_expr(expr, None)
1440    }
1441
1442    /// Substring before/after nth delimiter (PySpark substring_index). count > 0: before nth from left; count < 0: after nth from right.
1443    pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
1444        use polars::prelude::*;
1445        // PySpark edge case: empty delimiter always yields empty string for non-null input,
1446        // and null when the input is null.
1447        if delimiter.is_empty() {
1448            let expr = when(self.expr().clone().is_null())
1449                .then(lit(NULL))
1450                .otherwise(lit("").cast(DataType::String));
1451            return Self::from_expr(expr, None);
1452        }
1453        let delim = delimiter.to_string();
1454        let split_expr = self.expr().clone().str().split(lit(delim.clone()));
1455        let n = count.unsigned_abs() as i64;
1456        let expr = if count > 0 {
1457            split_expr
1458                .clone()
1459                .list()
1460                .slice(lit(0i64), lit(n))
1461                .list()
1462                .join(lit(delim), false)
1463        } else {
1464            let len = split_expr.clone().list().len();
1465            let start = when(len.clone().gt(lit(n)))
1466                .then(len.clone() - lit(n))
1467                .otherwise(lit(0i64));
1468            let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
1469            split_expr
1470                .list()
1471                .slice(start, slice_len)
1472                .list()
1473                .join(lit(delim), false)
1474        };
1475        Self::from_expr(expr, None)
1476    }
1477
1478    /// Soundex code (PySpark soundex). Implemented via map UDF (strsim/soundex crates).
1479    pub fn soundex(&self) -> Column {
1480        let expr = self.expr().clone().map(
1481            |s| expect_col(crate::udfs::apply_soundex(s)),
1482            |_schema, field| Ok(field.clone()),
1483        );
1484        Self::from_expr(expr, None)
1485    }
1486
1487    /// Levenshtein distance to another string (PySpark levenshtein). Implemented via map_many UDF (strsim).
1488    pub fn levenshtein(&self, other: &Column) -> Column {
1489        let args = [other.expr().clone()];
1490        let expr = self.expr().clone().map_many(
1491            |cols| expect_col(crate::udfs::apply_levenshtein(cols)),
1492            &args,
1493            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
1494        );
1495        Self::from_expr(expr, None)
1496    }
1497
1498    /// CRC32 checksum of string bytes (PySpark crc32). Implemented via map UDF (crc32fast).
1499    pub fn crc32(&self) -> Column {
1500        let expr = self.expr().clone().map(
1501            |s| expect_col(crate::udfs::apply_crc32(s)),
1502            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1503        );
1504        Self::from_expr(expr, None)
1505    }
1506
1507    /// XXH64 hash of string (PySpark xxhash64). Implemented via map UDF (twox-hash).
1508    pub fn xxhash64(&self) -> Column {
1509        let expr = self.expr().clone().map(
1510            |s| expect_col(crate::udfs::apply_xxhash64(s)),
1511            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1512        );
1513        Self::from_expr(expr, None)
1514    }
1515
1516    /// ASCII value of first character (PySpark ascii). Returns Int32.
1517    pub fn ascii(&self) -> Column {
1518        let expr = self.expr().clone().map(
1519            |s| expect_col(crate::udfs::apply_ascii(s)),
1520            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1521        );
1522        Self::from_expr(expr, None)
1523    }
1524
1525    /// Format numeric as string with fixed decimal places (PySpark format_number).
1526    pub fn format_number(&self, decimals: u32) -> Column {
1527        let expr = self.expr().clone().map(
1528            move |s| expect_col(crate::udfs::apply_format_number(s, decimals)),
1529            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1530        );
1531        Self::from_expr(expr, None)
1532    }
1533
1534    /// Int to single-character string (PySpark char / chr). Valid codepoint only.
1535    pub fn char(&self) -> Column {
1536        let expr = self.expr().clone().map(
1537            |s| expect_col(crate::udfs::apply_char(s)),
1538            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1539        );
1540        Self::from_expr(expr, None)
1541    }
1542
1543    /// Alias for char (PySpark chr).
1544    pub fn chr(&self) -> Column {
1545        self.char()
1546    }
1547
1548    /// Base64 encode string bytes (PySpark base64).
1549    pub fn base64(&self) -> Column {
1550        let expr = self.expr().clone().map(
1551            |s| expect_col(crate::udfs::apply_base64(s)),
1552            |_schema, field| Ok(field.clone()),
1553        );
1554        Self::from_expr(expr, None)
1555    }
1556
1557    /// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1558    pub fn unbase64(&self) -> Column {
1559        let expr = self.expr().clone().map(
1560            |s| expect_col(crate::udfs::apply_unbase64(s)),
1561            |_schema, field| Ok(field.clone()),
1562        );
1563        Self::from_expr(expr, None)
1564    }
1565
1566    /// SHA1 hash of string bytes, return hex string (PySpark sha1).
1567    pub fn sha1(&self) -> Column {
1568        let expr = self.expr().clone().map(
1569            |s| expect_col(crate::udfs::apply_sha1(s)),
1570            |_schema, field| Ok(field.clone()),
1571        );
1572        Self::from_expr(expr, None)
1573    }
1574
1575    /// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2). Default 256.
1576    pub fn sha2(&self, bit_length: i32) -> Column {
1577        let expr = self.expr().clone().map(
1578            move |s| expect_col(crate::udfs::apply_sha2(s, bit_length)),
1579            |_schema, field| Ok(field.clone()),
1580        );
1581        Self::from_expr(expr, None)
1582    }
1583
1584    /// MD5 hash of string bytes, return hex string (PySpark md5).
1585    pub fn md5(&self) -> Column {
1586        let expr = self.expr().clone().map(
1587            |s| expect_col(crate::udfs::apply_md5(s)),
1588            |_schema, field| Ok(field.clone()),
1589        );
1590        Self::from_expr(expr, None)
1591    }
1592
1593    /// Replace substring at 1-based position (PySpark overlay). replace is literal string.
1594    pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
1595        use polars::prelude::*;
1596        let pos = pos.max(1);
1597        let replace_len = length.max(0);
1598        let start_left = 0i64;
1599        let len_left = (pos - 1).max(0);
1600        let start_right = (pos - 1 + replace_len).max(0);
1601        let len_right = 1_000_000i64; // "rest of string"
1602        let left = self
1603            .expr()
1604            .clone()
1605            .str()
1606            .slice(lit(start_left), lit(len_left));
1607        let mid = lit(replace.to_string());
1608        let right = self
1609            .expr()
1610            .clone()
1611            .str()
1612            .slice(lit(start_right), lit(len_right));
1613        let exprs = [left, mid, right];
1614        let concat_expr = polars::prelude::concat_str(&exprs, "", false);
1615        Self::from_expr(concat_expr, None)
1616    }
1617
1618    // --- Math functions ---
1619
1620    /// Absolute value (PySpark abs)
1621    pub fn abs(&self) -> Column {
1622        Self::from_expr(self.expr().clone().abs(), None)
1623    }
1624
1625    /// Ceiling (PySpark ceil)
1626    pub fn ceil(&self) -> Column {
1627        use polars::prelude::*;
1628        // PySpark ceil returns an integral (bigint) type. Cast the ceil result
1629        // to Int64 so the schema reflects LongType/bigint instead of double.
1630        let expr = self.expr().clone().ceil().cast(DataType::Int64);
1631        Self::from_expr(expr, None)
1632    }
1633
1634    /// Alias for ceil. PySpark ceiling.
1635    pub fn ceiling(&self) -> Column {
1636        self.ceil()
1637    }
1638
1639    /// Floor (PySpark floor)
1640    pub fn floor(&self) -> Column {
1641        use polars::prelude::*;
1642        // PySpark floor returns an integral (bigint) type. Cast the floor result
1643        // to Int64 so the schema reflects LongType/bigint instead of double.
1644        let expr = self.expr().clone().floor().cast(DataType::Int64);
1645        Self::from_expr(expr, None)
1646    }
1647
1648    /// Round to given decimal places (PySpark round). scale can be negative (e.g. -3 rounds to thousands).
1649    /// Supports string columns containing numeric values (implicit cast to double then round; parity with PySpark).
1650    pub fn round(&self, scale: i32) -> Column {
1651        let expr = self.expr().clone().map(
1652            move |s| expect_col(crate::udfs::apply_round(s, scale)),
1653            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1654        );
1655        Self::from_expr(expr, None)
1656    }
1657
1658    /// Banker's rounding - round half to even (PySpark bround).
1659    pub fn bround(&self, scale: i32) -> Column {
1660        let expr = self.expr().clone().map(
1661            move |s| expect_col(crate::udfs::apply_bround(s, scale)),
1662            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1663        );
1664        Self::from_expr(expr, None)
1665    }
1666
1667    /// Unary minus (PySpark negate, negative).
1668    pub fn negate(&self) -> Column {
1669        use polars::prelude::*;
1670        Self::from_expr(self.expr().clone() * lit(-1), None)
1671    }
1672
1673    /// Multiply with PySpark-style string/number coercion (used by Python Column operators).
1674    ///
1675    /// Both operands are coerced to Double when used from Python; string columns are parsed
1676    /// as doubles where possible, invalid strings become null.
1677    pub fn multiply_pyspark(&self, other: &Column) -> Column {
1678        let args = [other.expr().clone()];
1679        let expr = self.expr().clone().map_many(
1680            |cols| expect_col(crate::udfs::apply_pyspark_multiply(cols)),
1681            &args,
1682            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1683        );
1684        Self::from_expr(expr, None)
1685    }
1686
1687    /// Add with PySpark-style string/number coercion (used by Python Column operators).
1688    /// For string columns, + attempts numeric addition so non-numeric strings become null (issue #1138).
1689    pub fn add_pyspark(&self, other: &Column) -> Column {
1690        let args = [other.expr().clone()];
1691        let expr = self.expr().clone().map_many(
1692            |cols| expect_col(crate::udfs::apply_pyspark_add(cols)),
1693            &args,
1694            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1695        );
1696        Self::from_expr(expr, None)
1697    }
1698
1699    /// Subtract with PySpark-style string/number coercion (used by Python Column operators).
1700    pub fn subtract_pyspark(&self, other: &Column) -> Column {
1701        let args = [other.expr().clone()];
1702        let expr = self.expr().clone().map_many(
1703            |cols| expect_col(crate::udfs::apply_pyspark_subtract(cols)),
1704            &args,
1705            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1706        );
1707        Self::from_expr(expr, None)
1708    }
1709
1710    /// Divide with PySpark-style string/number coercion (used by Python Column operators).
1711    pub fn divide_pyspark(&self, other: &Column) -> Column {
1712        let args = [other.expr().clone()];
1713        let expr = self.expr().clone().map_many(
1714            |cols| expect_col(crate::udfs::apply_pyspark_divide(cols)),
1715            &args,
1716            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1717        );
1718        Self::from_expr(expr, None)
1719    }
1720
1721    /// Modulo with PySpark-style string/number coercion (used by Python Column operators).
1722    pub fn mod_pyspark(&self, other: &Column) -> Column {
1723        let args = [other.expr().clone()];
1724        let expr = self.expr().clone().map_many(
1725            |cols| expect_col(crate::udfs::apply_pyspark_mod(cols)),
1726            &args,
1727            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1728        );
1729        Self::from_expr(expr, None)
1730    }
1731
1732    /// Multiply by another column or literal (PySpark multiply). Broadcasts scalars.
1733    pub fn multiply(&self, other: &Column) -> Column {
1734        Self::from_expr(self.expr().clone() * other.expr().clone(), None)
1735    }
1736
1737    /// Add another column or literal (PySpark +). Broadcasts scalars.
1738    pub fn add(&self, other: &Column) -> Column {
1739        Self::from_expr(self.expr().clone() + other.expr().clone(), None)
1740    }
1741
1742    /// Subtract another column or literal (PySpark -). Broadcasts scalars.
1743    pub fn subtract(&self, other: &Column) -> Column {
1744        Self::from_expr(self.expr().clone() - other.expr().clone(), None)
1745    }
1746
1747    /// Divide by another column or literal (PySpark /). Broadcasts scalars.
1748    pub fn divide(&self, other: &Column) -> Column {
1749        Self::from_expr(self.expr().clone() / other.expr().clone(), None)
1750    }
1751
1752    /// Modulo (PySpark %). Broadcasts scalars.
1753    pub fn mod_(&self, other: &Column) -> Column {
1754        Self::from_expr(self.expr().clone() % other.expr().clone(), None)
1755    }
1756
1757    /// Square root (PySpark sqrt)
1758    pub fn sqrt(&self) -> Column {
1759        Self::from_expr(self.expr().clone().sqrt(), None)
1760    }
1761
1762    /// Power (PySpark pow). Exponent can be literal or expression.
1763    pub fn pow(&self, exp: i64) -> Column {
1764        use polars::prelude::*;
1765        Self::from_expr(self.expr().clone().pow(lit(exp)), None)
1766    }
1767
1768    /// Power with column or scalar exponent (for __pow__ / col ** other).
1769    /// Uses float pow so fractional exponents work (#817); handles 0^positive=0 (#863).
1770    pub fn pow_with(&self, exponent: &Column) -> Column {
1771        let args = [exponent.expr().clone()];
1772        let expr = self.expr().clone().map_many(
1773            move |cols| expect_col(crate::udfs::apply_pow_pyspark(cols)),
1774            &args,
1775            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1776        );
1777        Self::from_expr(expr, None)
1778    }
1779
1780    /// Alias for pow. PySpark power.
1781    pub fn power(&self, exp: i64) -> Column {
1782        self.pow(exp)
1783    }
1784
1785    /// Exponential (PySpark exp)
1786    pub fn exp(&self) -> Column {
1787        Self::from_expr(self.expr().clone().exp(), None)
1788    }
1789
1790    /// Natural logarithm (PySpark log)
1791    pub fn log(&self) -> Column {
1792        Self::from_expr(self.expr().clone().log(lit(std::f64::consts::E)), None)
1793    }
1794
1795    /// Alias for log. PySpark ln.
1796    pub fn ln(&self) -> Column {
1797        self.log()
1798    }
1799
1800    /// Sine (radians). PySpark sin.
1801    pub fn sin(&self) -> Column {
1802        let expr = self.expr().clone().map(
1803            |s| expect_col(crate::udfs::apply_sin(s)),
1804            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1805        );
1806        Self::from_expr(expr, None)
1807    }
1808
1809    /// Cosine (radians). PySpark cos.
1810    pub fn cos(&self) -> Column {
1811        let expr = self.expr().clone().map(
1812            |s| expect_col(crate::udfs::apply_cos(s)),
1813            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1814        );
1815        Self::from_expr(expr, None)
1816    }
1817
1818    /// Tangent (radians). PySpark tan.
1819    pub fn tan(&self) -> Column {
1820        let expr = self.expr().clone().map(
1821            |s| expect_col(crate::udfs::apply_tan(s)),
1822            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1823        );
1824        Self::from_expr(expr, None)
1825    }
1826
1827    /// Cotangent: 1/tan (PySpark cot).
1828    pub fn cot(&self) -> Column {
1829        let expr = self.expr().clone().map(
1830            |s| expect_col(crate::udfs::apply_cot(s)),
1831            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1832        );
1833        Self::from_expr(expr, None)
1834    }
1835
1836    /// Cosecant: 1/sin (PySpark csc).
1837    pub fn csc(&self) -> Column {
1838        let expr = self.expr().clone().map(
1839            |s| expect_col(crate::udfs::apply_csc(s)),
1840            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1841        );
1842        Self::from_expr(expr, None)
1843    }
1844
1845    /// Secant: 1/cos (PySpark sec).
1846    pub fn sec(&self) -> Column {
1847        let expr = self.expr().clone().map(
1848            |s| expect_col(crate::udfs::apply_sec(s)),
1849            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1850        );
1851        Self::from_expr(expr, None)
1852    }
1853
1854    /// Arc sine. PySpark asin.
1855    pub fn asin(&self) -> Column {
1856        let expr = self.expr().clone().map(
1857            |s| expect_col(crate::udfs::apply_asin(s)),
1858            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1859        );
1860        Self::from_expr(expr, None)
1861    }
1862
1863    /// Arc cosine. PySpark acos.
1864    pub fn acos(&self) -> Column {
1865        let expr = self.expr().clone().map(
1866            |s| expect_col(crate::udfs::apply_acos(s)),
1867            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1868        );
1869        Self::from_expr(expr, None)
1870    }
1871
1872    /// Arc tangent. PySpark atan.
1873    pub fn atan(&self) -> Column {
1874        let expr = self.expr().clone().map(
1875            |s| expect_col(crate::udfs::apply_atan(s)),
1876            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1877        );
1878        Self::from_expr(expr, None)
1879    }
1880
1881    /// Two-argument arc tangent (y, x) -> angle in radians. PySpark atan2.
1882    pub fn atan2(&self, x: &Column) -> Column {
1883        let args = [x.expr().clone()];
1884        let expr = self.expr().clone().map_many(
1885            |cols| expect_col(crate::udfs::apply_atan2(cols)),
1886            &args,
1887            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1888        );
1889        Self::from_expr(expr, None)
1890    }
1891
1892    /// Convert radians to degrees. PySpark degrees.
1893    pub fn degrees(&self) -> Column {
1894        let expr = self.expr().clone().map(
1895            |s| expect_col(crate::udfs::apply_degrees(s)),
1896            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1897        );
1898        Self::from_expr(expr, None)
1899    }
1900
1901    /// Alias for degrees. PySpark toDegrees.
1902    pub fn to_degrees(&self) -> Column {
1903        self.degrees()
1904    }
1905
1906    /// Convert degrees to radians. PySpark radians.
1907    pub fn radians(&self) -> Column {
1908        let expr = self.expr().clone().map(
1909            |s| expect_col(crate::udfs::apply_radians(s)),
1910            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1911        );
1912        Self::from_expr(expr, None)
1913    }
1914
1915    /// Alias for radians. PySpark toRadians.
1916    pub fn to_radians(&self) -> Column {
1917        self.radians()
1918    }
1919
1920    /// Sign of the number (-1, 0, or 1). PySpark signum.
1921    pub fn signum(&self) -> Column {
1922        let expr = self.expr().clone().map(
1923            |s| expect_col(crate::udfs::apply_signum(s)),
1924            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1925        );
1926        Self::from_expr(expr, None)
1927    }
1928
1929    /// Hyperbolic cosine. PySpark cosh.
1930    pub fn cosh(&self) -> Column {
1931        let expr = self.expr().clone().map(
1932            |s| expect_col(crate::udfs::apply_cosh(s)),
1933            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1934        );
1935        Self::from_expr(expr, None)
1936    }
1937    /// Hyperbolic sine. PySpark sinh.
1938    pub fn sinh(&self) -> Column {
1939        let expr = self.expr().clone().map(
1940            |s| expect_col(crate::udfs::apply_sinh(s)),
1941            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1942        );
1943        Self::from_expr(expr, None)
1944    }
1945    /// Hyperbolic tangent. PySpark tanh.
1946    pub fn tanh(&self) -> Column {
1947        let expr = self.expr().clone().map(
1948            |s| expect_col(crate::udfs::apply_tanh(s)),
1949            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1950        );
1951        Self::from_expr(expr, None)
1952    }
1953    /// Inverse hyperbolic cosine. PySpark acosh.
1954    pub fn acosh(&self) -> Column {
1955        let expr = self.expr().clone().map(
1956            |s| expect_col(crate::udfs::apply_acosh(s)),
1957            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1958        );
1959        Self::from_expr(expr, None)
1960    }
1961    /// Inverse hyperbolic sine. PySpark asinh.
1962    pub fn asinh(&self) -> Column {
1963        let expr = self.expr().clone().map(
1964            |s| expect_col(crate::udfs::apply_asinh(s)),
1965            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1966        );
1967        Self::from_expr(expr, None)
1968    }
1969    /// Inverse hyperbolic tangent. PySpark atanh.
1970    pub fn atanh(&self) -> Column {
1971        let expr = self.expr().clone().map(
1972            |s| expect_col(crate::udfs::apply_atanh(s)),
1973            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1974        );
1975        Self::from_expr(expr, None)
1976    }
1977    /// Cube root. PySpark cbrt.
1978    pub fn cbrt(&self) -> Column {
1979        let expr = self.expr().clone().map(
1980            |s| expect_col(crate::udfs::apply_cbrt(s)),
1981            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1982        );
1983        Self::from_expr(expr, None)
1984    }
1985    /// exp(x) - 1. PySpark expm1.
1986    pub fn expm1(&self) -> Column {
1987        let expr = self.expr().clone().map(
1988            |s| expect_col(crate::udfs::apply_expm1(s)),
1989            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1990        );
1991        Self::from_expr(expr, None)
1992    }
1993    /// log(1 + x). PySpark log1p.
1994    pub fn log1p(&self) -> Column {
1995        let expr = self.expr().clone().map(
1996            |s| expect_col(crate::udfs::apply_log1p(s)),
1997            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1998        );
1999        Self::from_expr(expr, None)
2000    }
2001    /// Base-10 logarithm. PySpark log10.
2002    pub fn log10(&self) -> Column {
2003        let expr = self.expr().clone().map(
2004            |s| expect_col(crate::udfs::apply_log10(s)),
2005            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
2006        );
2007        Self::from_expr(expr, None)
2008    }
2009    /// Base-2 logarithm. PySpark log2.
2010    pub fn log2(&self) -> Column {
2011        let expr = self.expr().clone().map(
2012            |s| expect_col(crate::udfs::apply_log2(s)),
2013            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
2014        );
2015        Self::from_expr(expr, None)
2016    }
2017    /// Round to nearest integer. PySpark rint.
2018    pub fn rint(&self) -> Column {
2019        let expr = self.expr().clone().map(
2020            |s| expect_col(crate::udfs::apply_rint(s)),
2021            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
2022        );
2023        Self::from_expr(expr, None)
2024    }
2025
2026    /// sqrt(x^2 + y^2). PySpark hypot.
2027    pub fn hypot(&self, other: &Column) -> Column {
2028        let xx = self.expr().clone() * self.expr().clone();
2029        let yy = other.expr().clone() * other.expr().clone();
2030        Self::from_expr((xx + yy).sqrt(), None)
2031    }
2032
2033    /// Cast to the given type (PySpark cast). Fails on invalid conversion.
2034    pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
2035        crate::functions::cast(self, type_name)
2036    }
2037
2038    /// Cast to the given type, null on invalid conversion (PySpark try_cast).
2039    pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
2040        crate::functions::try_cast(self, type_name)
2041    }
2042
2043    /// True where the float value is NaN (PySpark isnan). Non-float columns (e.g. string) return all False.
2044    pub fn is_nan(&self) -> Column {
2045        let expr = self.expr().clone().map(
2046            |s| expect_col(crate::udfs::apply_isnan_pyspark_parity(s)),
2047            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
2048        );
2049        Self::from_expr(expr, None)
2050    }
2051
2052    // --- Datetime functions ---
2053
2054    /// Extract year from datetime column (PySpark year)
2055    pub fn year(&self) -> Column {
2056        let name = format!("year({})", self.name());
2057        use polars::prelude::*;
2058        let parsed = self.expr().clone().map(
2059            |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2060            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2061        );
2062        Self::from_expr(parsed.dt().year().alias(&name), Some(name))
2063    }
2064
2065    /// Extract month from datetime column (PySpark month). Returns IntegerType (int) for schema parity (#1402).
2066    pub fn month(&self) -> Column {
2067        let name = format!("month({})", self.name());
2068        use polars::prelude::*;
2069        let parsed = self.expr().clone().map(
2070            |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2071            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2072        );
2073        let month_expr = parsed.dt().month().cast(DataType::Int32);
2074        Self::from_expr(month_expr.alias(&name), Some(name))
2075    }
2076
2077    /// Extract day of month from datetime column (PySpark day)
2078    pub fn day(&self) -> Column {
2079        Self::from_expr(self.expr().clone().dt().day(), None)
2080    }
2081
2082    /// Alias for day. PySpark dayofmonth. Returns IntegerType (int) for schema parity (#1403).
2083    pub fn dayofmonth(&self) -> Column {
2084        let name = format!("dayofmonth({})", self.name());
2085        use polars::prelude::*;
2086        let parsed = self.expr().clone().map(
2087            |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2088            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2089        );
2090        let day_expr = parsed.dt().day().cast(DataType::Int32);
2091        Self::from_expr(day_expr.alias(&name), Some(name))
2092    }
2093
2094    /// Extract quarter (1-4) from date/datetime column (PySpark quarter).
2095    pub fn quarter(&self) -> Column {
2096        Self::from_expr(self.expr().clone().dt().quarter(), None)
2097    }
2098
2099    /// Extract ISO week of year (1-53) (PySpark weekofyear / week).
2100    pub fn weekofyear(&self) -> Column {
2101        Self::from_expr(self.expr().clone().dt().week(), None)
2102    }
2103
2104    /// Alias for weekofyear (PySpark week).
2105    pub fn week(&self) -> Column {
2106        self.weekofyear()
2107    }
2108
2109    /// Day of week: 1 = Sunday, 2 = Monday, ..., 7 = Saturday (PySpark dayofweek).
2110    /// Polars weekday is Mon=1..Sun=7; we convert to Sun=1..Sat=7.
2111    pub fn dayofweek(&self) -> Column {
2112        use polars::prelude::*;
2113        let parsed = self.expr().clone().map(
2114            |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2115            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2116        );
2117        let w = parsed.dt().weekday().cast(DataType::Int32);
2118        let dayofweek = ((w % lit(7i32)) + lit(1i32)).cast(DataType::Int32);
2119        let name = format!("dayofweek({})", self.name());
2120        Self::from_expr(dayofweek.alias(&name), Some(name))
2121    }
2122
2123    /// Day of year (1-366) (PySpark dayofyear).
2124    pub fn dayofyear(&self) -> Column {
2125        Self::from_expr(
2126            self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
2127            None,
2128        )
2129    }
2130
2131    /// Cast to date (PySpark to_date). Drops time component from datetime/timestamp.
2132    pub fn to_date(&self) -> Column {
2133        use polars::prelude::DataType;
2134        Self::from_expr(self.expr().clone().cast(DataType::Date), None)
2135    }
2136
2137    /// Format date/datetime as string (PySpark date_format). Uses chrono strftime format.
2138    pub fn date_format(&self, format: &str) -> Column {
2139        Self::from_expr(self.expr().clone().dt().strftime(format), None)
2140    }
2141
2142    /// Extract hour from datetime column (PySpark hour). Accepts string timestamp (#403).
2143    pub fn hour(&self) -> Column {
2144        let expr = self.expr().clone().map(
2145            |s| expect_col(crate::udfs::apply_hour(s)),
2146            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2147        );
2148        Self::from_expr(expr, None)
2149    }
2150
2151    /// Extract minute from datetime column (PySpark minute). Accepts string timestamp (#403).
2152    pub fn minute(&self) -> Column {
2153        let expr = self.expr().clone().map(
2154            |s| expect_col(crate::udfs::apply_minute(s)),
2155            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2156        );
2157        Self::from_expr(expr, None)
2158    }
2159
2160    /// Extract second from datetime column (PySpark second). Accepts string timestamp (#403).
2161    pub fn second(&self) -> Column {
2162        let expr = self.expr().clone().map(
2163            |s| expect_col(crate::udfs::apply_second(s)),
2164            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2165        );
2166        Self::from_expr(expr, None)
2167    }
2168
2169    /// Extract field from date/datetime (PySpark extract). field: "year","month","day","hour","minute","second","quarter","week","dayofweek","dayofyear".
2170    pub fn extract(&self, field: &str) -> Column {
2171        use polars::prelude::*;
2172        let e = self.expr().clone();
2173        let expr = match field.trim().to_lowercase().as_str() {
2174            "year" => e.dt().year(),
2175            "month" => e.dt().month(),
2176            "day" => e.dt().day(),
2177            "hour" => e.dt().hour(),
2178            "minute" => e.dt().minute(),
2179            "second" => e.dt().second(),
2180            "quarter" => e.dt().quarter(),
2181            "week" | "weekofyear" => e.dt().week(),
2182            "dayofweek" | "dow" => {
2183                let w = e.dt().weekday();
2184                (w % lit(7i32)) + lit(1i32)
2185            }
2186            "dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
2187            _ => e.dt().year(), // fallback
2188        };
2189        Self::from_expr(expr, None)
2190    }
2191
2192    /// Timestamp to microseconds since epoch (PySpark unix_micros).
2193    pub fn unix_micros(&self) -> Column {
2194        use polars::prelude::*;
2195        Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
2196    }
2197
2198    /// Timestamp to milliseconds since epoch (PySpark unix_millis).
2199    pub fn unix_millis(&self) -> Column {
2200        use polars::prelude::*;
2201        let micros = self.expr().clone().cast(DataType::Int64);
2202        Self::from_expr(micros / lit(1000i64), None)
2203    }
2204
2205    /// Timestamp to seconds since epoch (PySpark unix_seconds).
2206    pub fn unix_seconds(&self) -> Column {
2207        use polars::prelude::*;
2208        let micros = self.expr().clone().cast(DataType::Int64);
2209        Self::from_expr(micros / lit(1_000_000i64), None)
2210    }
2211
2212    /// Weekday name "Mon","Tue",... (PySpark dayname).
2213    pub fn dayname(&self) -> Column {
2214        let expr = self.expr().clone().map(
2215            |s| expect_col(crate::udfs::apply_dayname(s)),
2216            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2217        );
2218        Self::from_expr(expr, None)
2219    }
2220
2221    /// Weekday 0=Mon, 6=Sun (PySpark weekday).
2222    pub fn weekday(&self) -> Column {
2223        let expr = self.expr().clone().map(
2224            |s| expect_col(crate::udfs::apply_weekday(s)),
2225            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2226        );
2227        Self::from_expr(expr, None)
2228    }
2229
2230    /// Add n days to date/datetime column (PySpark date_add).
2231    pub fn date_add(&self, n: i32) -> Column {
2232        use polars::prelude::*;
2233        let date_expr = self.expr().clone().map(
2234            |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2235            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2236        );
2237        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
2238        let name = format!("date_add({}, {n})", self.name());
2239        Self::from_expr((date_expr + dur).alias(&name), Some(name))
2240    }
2241
2242    /// Subtract n days from date/datetime column (PySpark date_sub).
2243    pub fn date_sub(&self, n: i32) -> Column {
2244        use polars::prelude::*;
2245        let date_expr = self.expr().clone().map(
2246            |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2247            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2248        );
2249        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
2250        let name = format!("date_sub({}, {n})", self.name());
2251        Self::from_expr((date_expr - dur).alias(&name), Some(name))
2252    }
2253
2254    /// Number of days between two date/datetime columns (PySpark datediff). (end - start).
2255    pub fn datediff(&self, other: &Column) -> Column {
2256        use polars::prelude::*;
2257        let start = self.expr().clone().cast(DataType::Date);
2258        let end = other.expr().clone().cast(DataType::Date);
2259        // Cast to Int32 so schema maps to PySpark IntegerType instead of LongType.
2260        let expr = (end - start).dt().total_days(false).cast(DataType::Int32);
2261        Self::from_expr(expr, None)
2262    }
2263
2264    /// Last day of the month for date/datetime column (PySpark last_day).
2265    pub fn last_day(&self) -> Column {
2266        Self::from_expr(self.expr().clone().dt().month_end(), None)
2267    }
2268
2269    /// Add amount of unit to timestamp (PySpark timestampadd). unit: DAY, HOUR, MINUTE, SECOND, etc.
2270    pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
2271        use polars::prelude::*;
2272        let ts = self.expr().clone();
2273        let amt = amount.expr().clone().cast(DataType::Int64);
2274        let dur = match unit.trim().to_uppercase().as_str() {
2275            "DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
2276            "HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
2277            "MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
2278            "SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
2279            "WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
2280            _ => duration(DurationArgs::new().with_days(amt)),
2281        };
2282        Self::from_expr(ts + dur, None)
2283    }
2284
2285    /// Difference between timestamps in given unit (PySpark timestampdiff). unit: DAY, HOUR, MINUTE, SECOND.
2286    pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
2287        let start = self.expr().clone();
2288        let end = other.expr().clone();
2289        let diff = end - start;
2290        let expr = match unit.trim().to_uppercase().as_str() {
2291            "HOUR" | "HOURS" => diff.dt().total_hours(false),
2292            "MINUTE" | "MINUTES" => diff.dt().total_minutes(false),
2293            "SECOND" | "SECONDS" => diff.dt().total_seconds(false),
2294            "DAY" | "DAYS" => diff.dt().total_days(false),
2295            _ => diff.dt().total_days(false),
2296        };
2297        Self::from_expr(expr, None)
2298    }
2299
2300    /// Interpret timestamp as UTC, convert to target timezone (PySpark from_utc_timestamp).
2301    pub fn from_utc_timestamp(&self, tz: &str) -> Column {
2302        let tz = tz.to_string();
2303        let expr = self.expr().clone().map(
2304            move |s| expect_col(crate::udfs::apply_from_utc_timestamp(s, &tz)),
2305            |_schema, field| Ok(field.clone()),
2306        );
2307        Self::from_expr(expr, None)
2308    }
2309
2310    /// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
2311    pub fn to_utc_timestamp(&self, tz: &str) -> Column {
2312        let tz = tz.to_string();
2313        let expr = self.expr().clone().map(
2314            move |s| expect_col(crate::udfs::apply_to_utc_timestamp(s, &tz)),
2315            |_schema, field| Ok(field.clone()),
2316        );
2317        Self::from_expr(expr, None)
2318    }
2319
2320    /// Truncate date/datetime to unit. PySpark date_trunc/trunc use "year", "month", "day", "hour";
2321    /// Polars dt.truncate expects duration strings like "1y", "1mo", "1d", "1h". We map PySpark names.
2322    /// Uses a UDF path so string columns (e.g. from createDataFrame with Python datetime) are coerced to datetime then truncated.
2323    pub fn trunc(&self, format: &str) -> Column {
2324        use polars::prelude::*;
2325        let polars_duration = pyspark_trunc_format_to_polars_duration(format);
2326        let duration = polars_duration.clone();
2327        let expr = self.expr().clone().map(
2328            move |c| expect_col(crate::udfs::apply_date_trunc(c, &duration)),
2329            |_schema, field| {
2330                Ok(Field::new(
2331                    field.name().clone(),
2332                    DataType::Datetime(TimeUnit::Microseconds, None),
2333                ))
2334            },
2335        );
2336        Self::from_expr(expr, Some(self.name().to_string()))
2337    }
2338
2339    /// Add n months to date/datetime column (PySpark add_months). Month-aware.
2340    pub fn add_months(&self, n: i32) -> Column {
2341        let expr = self.expr().clone().map(
2342            move |col| expect_col(crate::udfs::apply_add_months(col, n)),
2343            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2344        );
2345        Self::from_expr(expr, None)
2346    }
2347
2348    /// Number of months between end and start dates, as fractional (PySpark months_between).
2349    /// When round_off is true, rounds to 8 decimal places (PySpark default).
2350    pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
2351        let args = [start.expr().clone()];
2352        let expr = self.expr().clone().map_many(
2353            move |cols| expect_col(crate::udfs::apply_months_between(cols, round_off)),
2354            &args,
2355            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
2356        );
2357        Self::from_expr(expr, None)
2358    }
2359
2360    /// Next date that is the given day of week (e.g. "Mon", "Tue") (PySpark next_day).
2361    pub fn next_day(&self, day_of_week: &str) -> Column {
2362        let day = day_of_week.to_string();
2363        let expr = self.expr().clone().map(
2364            move |col| expect_col(crate::udfs::apply_next_day(col, &day)),
2365            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2366        );
2367        Self::from_expr(expr, None)
2368    }
2369
2370    /// Parse string timestamp to seconds since epoch (PySpark unix_timestamp).
2371    pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
2372        let fmt = format.map(String::from);
2373        let expr = self.expr().clone().map(
2374            move |col| expect_col(crate::udfs::apply_unix_timestamp(col, fmt.as_deref())),
2375            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2376        );
2377        Self::from_expr(expr, None)
2378    }
2379
2380    /// Convert seconds since epoch to formatted string (PySpark from_unixtime).
2381    pub fn from_unixtime(&self, format: Option<&str>) -> Column {
2382        let fmt = format.map(String::from);
2383        let expr = self.expr().clone().map(
2384            move |col| expect_col(crate::udfs::apply_from_unixtime(col, fmt.as_deref())),
2385            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2386        );
2387        Self::from_expr(expr, None)
2388    }
2389
2390    /// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
2391    pub fn timestamp_seconds(&self) -> Column {
2392        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
2393            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2394        Self::from_expr(expr, None)
2395    }
2396
2397    /// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
2398    pub fn timestamp_millis(&self) -> Column {
2399        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
2400            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2401        Self::from_expr(expr, None)
2402    }
2403
2404    /// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
2405    pub fn timestamp_micros(&self) -> Column {
2406        let expr = self
2407            .expr()
2408            .clone()
2409            .cast(DataType::Int64)
2410            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2411        Self::from_expr(expr, None)
2412    }
2413
2414    /// Date to days since 1970-01-01 (PySpark unix_date).
2415    pub fn unix_date(&self) -> Column {
2416        let expr = self.expr().clone().map(
2417            |s| expect_col(crate::udfs::apply_unix_date(s)),
2418            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2419        );
2420        Self::from_expr(expr, None)
2421    }
2422
2423    /// Days since epoch to date (PySpark date_from_unix_date).
2424    pub fn date_from_unix_date(&self) -> Column {
2425        let expr = self.expr().clone().map(
2426            |s| expect_col(crate::udfs::apply_date_from_unix_date(s)),
2427            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2428        );
2429        Self::from_expr(expr, None)
2430    }
2431
2432    /// Positive modulus (PySpark pmod). Column method: pmod(self, other).
2433    pub fn pmod(&self, divisor: &Column) -> Column {
2434        let args = [divisor.expr().clone()];
2435        let expr = self.expr().clone().map_many(
2436            |cols| expect_col(crate::udfs::apply_pmod(cols)),
2437            &args,
2438            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
2439        );
2440        Self::from_expr(expr, None)
2441    }
2442
2443    /// Factorial n! for n in 0..=20 (PySpark factorial).
2444    pub fn factorial(&self) -> Column {
2445        let expr = self.expr().clone().map(
2446            |s| expect_col(crate::udfs::apply_factorial(s)),
2447            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2448        );
2449        Self::from_expr(expr, None)
2450    }
2451
2452    // --- Window functions ---
2453
2454    /// Apply window partitioning. Returns a new Column with `.over(partition_by)`.
2455    /// Use after rank(), dense_rank(), row_number(), lag(), lead().
2456    pub fn over(&self, partition_by: &[&str]) -> Column {
2457        let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2458            vec![lit(1i32)]
2459        } else {
2460            partition_by.iter().map(|s| col(*s)).collect()
2461        };
2462        Self::from_expr(self.expr().clone().over(partition_exprs), None)
2463    }
2464
2465    /// Apply window with optional order-by for running aggregates (e.g. sum, count).
2466    /// `order_by_encoded`: e.g. ["value"] for asc, ["-value"] for desc.
2467    /// When `use_running_aggregate` is true and we have `source_for_running`, use cum_sum for running semantics.
2468    /// When `first_last_value` is Some and order is present, use order-sensitive first/last (PySpark #1145).
2469    /// When `is_full_partition_frame` is false and we have last_value (first_last_value.is_last), use current-row semantics (PySpark default frame).
2470    pub fn over_window(
2471        &self,
2472        partition_by: &[&str],
2473        order_by_encoded: &[String],
2474        use_running_aggregate: bool,
2475        is_full_partition_frame: bool,
2476    ) -> Result<Column, PolarsError> {
2477        // PySpark does not support countDistinct().over(); approx_count_distinct().over() is allowed (#1218).
2478        if expr_is_or_contains_n_unique(self.expr()) && self.name.starts_with("count_distinct(") {
2479            return Err(PolarsError::InvalidOperation(
2480                "Distinct window functions are not supported".into(),
2481            ));
2482        }
2483        let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2484            vec![lit(1i32)]
2485        } else {
2486            partition_by.iter().map(|s| col(*s)).collect()
2487        };
2488
2489        // last_value (or F.last().over()) with orderBy and default frame: "last" = current row (PySpark default frame).
2490        if let Some(ref fl) = self.first_last_value {
2491            if fl.is_last && !order_by_encoded.is_empty() && !is_full_partition_frame {
2492                let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
2493                let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
2494                for s in order_by_encoded.iter() {
2495                    let s = s.trim();
2496                    let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
2497                        (stripped.trim(), true)
2498                    } else {
2499                        (s.trim(), false)
2500                    };
2501                    order_exprs.push(col(name));
2502                    descending_multi.push(descending);
2503                }
2504                let default_opts = SortOptions {
2505                    descending: descending_multi.first().copied().unwrap_or(false),
2506                    nulls_last: descending_multi.first().copied().unwrap_or(false),
2507                    ..Default::default()
2508                };
2509                let expr = fl.value_expr.clone().over_with_options(
2510                    Some(partition_exprs),
2511                    Some((order_exprs, default_opts)),
2512                    WindowMapping::default(),
2513                )?;
2514                return Ok(Self::from_expr(expr, None));
2515            }
2516        }
2517
2518        let base_expr = if use_running_aggregate {
2519            if let Some(ref src) = self.source_for_running_mean {
2520                // Running mean in window order: cum_sum / cum_count on same column so order applies (#1241).
2521                let sum_expr = col(src).cast(DataType::Float64).cum_sum(false);
2522                let count_expr = col(src).cum_count(false).cast(DataType::Float64);
2523                sum_expr / count_expr
2524            } else if let Some(ref src) = self.source_for_running {
2525                // Running sum in window order. Cast to Float64 so string columns work
2526                // (PySpark parity, issue #393). Non-running sums still use native dtype.
2527                col(src).cast(DataType::Float64).cum_sum(false)
2528            } else if let Some(ref src) = self.source_for_running_count {
2529                // Running count in window order (PySpark count().over(order); #1218).
2530                col(src).cum_count(false).cast(DataType::Int64)
2531            } else {
2532                self.expr().clone()
2533            }
2534        } else {
2535            // Non-running aggregates over a window: use the aggregate expression as-is.
2536            // For sum() on string columns, this expression already includes a cast to Float64
2537            // (PySpark parity, issue #393).
2538            self.expr().clone()
2539        };
2540        let expr = if order_by_encoded.is_empty() {
2541            base_expr.over(partition_exprs)
2542        } else {
2543            // Build order exprs and sort options. Polars over_with_options uses (order_exprs, sort_options).
2544            // Use column as-is (no String cast) so numeric columns sort 80,90,100 (issue #1052).
2545            let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
2546            let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
2547            for s in order_by_encoded.iter() {
2548                let s = s.trim();
2549                let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
2550                    (stripped.trim(), true)
2551                } else {
2552                    (s, false)
2553                };
2554                order_exprs.push(col(name));
2555                descending_multi.push(descending);
2556            }
2557            // Single sort_options for the window: use first column's direction. Polars may use this for the whole order.
2558            let default_opts = SortOptions {
2559                descending: descending_multi.first().copied().unwrap_or(false),
2560                nulls_last: descending_multi.first().copied().unwrap_or(false),
2561                ..Default::default()
2562            };
2563            base_expr.over_with_options(
2564                Some(partition_exprs),
2565                Some((order_exprs, default_opts)),
2566                WindowMapping::default(),
2567            )?
2568        };
2569        Ok(Self::from_expr(expr, None))
2570    }
2571
2572    /// Rank (with ties, gaps). Use with `.over(partition_by)`.
2573    pub fn rank(&self, descending: bool) -> Column {
2574        let opts = RankOptions {
2575            method: RankMethod::Min,
2576            descending,
2577        };
2578        Self::from_expr(self.expr().clone().rank(opts, None), None)
2579    }
2580
2581    /// Dense rank (no gaps). Use with `.over(partition_by)`.
2582    pub fn dense_rank(&self, descending: bool) -> Column {
2583        let opts = RankOptions {
2584            method: RankMethod::Dense,
2585            descending,
2586        };
2587        Self::from_expr(self.expr().clone().rank(opts, None), None)
2588    }
2589
2590    /// Row number (1, 2, 3 by this column's order). Use with `.over(partition_by)`.
2591    /// Nulls in the order column get a rank (PySpark parity: nulls last for asc, nulls first for desc).
2592    pub fn row_number(&self, descending: bool) -> Column {
2593        use polars::prelude::*;
2594        let opts = RankOptions {
2595            method: RankMethod::Ordinal,
2596            descending,
2597        };
2598        // Fill nulls so rank returns a value: ascending -> nulls last (fill with inf); descending -> nulls first (fill with -inf)
2599        let rank_expr = self
2600            .expr()
2601            .clone()
2602            .cast(DataType::Float64)
2603            .fill_null(lit(if descending {
2604                f64::NEG_INFINITY
2605            } else {
2606                f64::INFINITY
2607            }))
2608            .rank(opts, None);
2609        Self::from_expr(rank_expr, None)
2610    }
2611
2612    /// Row number with explicit multi-column order (PySpark Window.orderBy([...]) parity).
2613    /// Uses over_with_options; multi-column order is done via a struct sort so Type then Score then Name is respected.
2614    pub fn row_number_over(
2615        partition_by: &[&str],
2616        order_by_encoded: &[String],
2617    ) -> Result<Column, PolarsError> {
2618        use polars::prelude::*;
2619        if order_by_encoded.is_empty() {
2620            return Err(PolarsError::InvalidOperation(
2621                "row_number_over: order_by_encoded cannot be empty".into(),
2622            ));
2623        }
2624        let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2625            vec![lit(1i32)]
2626        } else {
2627            partition_by.iter().map(|s| col(*s)).collect()
2628        };
2629        // Parse encoded order: "-col" = descending, "col" = ascending. Trim whitespace (#1241).
2630        fn parse_order_key(s: &str) -> (&str, bool) {
2631            let s = s.trim();
2632            let descending = s.starts_with('-');
2633            let name = if descending {
2634                s.trim_start_matches('-').trim()
2635            } else {
2636                s
2637            };
2638            (name, descending)
2639        }
2640        let all_asc = order_by_encoded.iter().all(|s| !s.trim().starts_with('-'));
2641        // Row number = ordinal rank of the order key within partition. For multi-column mixed asc/desc, use struct with negated desc columns (#1241).
2642        let rank_expr = if order_by_encoded.len() == 1 {
2643            let (first_name, first_desc) = parse_order_key(order_by_encoded[0].trim());
2644            // For descending, rank by -col so ascending rank gives high values rank 1 (#1241).
2645            let order_col = col(first_name)
2646                .cast(DataType::Float64)
2647                .fill_null(lit(if first_desc {
2648                    f64::NEG_INFINITY
2649                } else {
2650                    f64::INFINITY
2651                }));
2652            let rank_input = if first_desc {
2653                order_col.neg()
2654            } else {
2655                order_col
2656            };
2657            let opts = RankOptions {
2658                method: RankMethod::Ordinal,
2659                descending: false,
2660            };
2661            rank_input.rank(opts, None)
2662        } else if all_asc {
2663            let struct_fields: Vec<Expr> = order_by_encoded
2664                .iter()
2665                .map(|s| col(parse_order_key(s).0))
2666                .collect();
2667            let opts = RankOptions {
2668                method: RankMethod::Ordinal,
2669                descending: false,
2670            };
2671            as_struct(struct_fields).rank(opts, None)
2672        } else {
2673            // Mixed asc/desc: rank by struct with desc columns negated so ascending struct sort gives correct order (#1241).
2674            let struct_fields: Vec<Expr> = order_by_encoded
2675                .iter()
2676                .map(|s| {
2677                    let (name, desc) = parse_order_key(s);
2678                    if desc {
2679                        (col(name)
2680                            .cast(DataType::Float64)
2681                            .fill_null(lit(f64::NEG_INFINITY)))
2682                        .neg()
2683                    } else {
2684                        col(name)
2685                            .cast(DataType::Float64)
2686                            .fill_null(lit(f64::INFINITY))
2687                    }
2688                })
2689                .collect();
2690            let opts = RankOptions {
2691                method: RankMethod::Ordinal,
2692                descending: false,
2693            };
2694            as_struct(struct_fields).rank(opts, None)
2695        };
2696        let expr = rank_expr.over(partition_exprs);
2697        Ok(Self::from_expr(expr, None))
2698    }
2699
2700    /// Lag: value from n rows before. Use with `.over(partition_by)`.
2701    pub fn lag(&self, n: i64) -> Column {
2702        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
2703    }
2704
2705    /// Lead: value from n rows after. Use with `.over(partition_by)`.
2706    pub fn lead(&self, n: i64) -> Column {
2707        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
2708    }
2709
2710    /// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2711    /// When the window has orderBy, over_window uses order-sensitive semantics (#1145).
2712    pub fn first_value(&self) -> Column {
2713        let value_expr = self.expr().clone();
2714        Column {
2715            name: "first_value".to_string(),
2716            expr: value_expr.clone().first(),
2717            is_array_expr: false,
2718            deferred: None,
2719            udf_call: None,
2720            source_for_running: None,
2721            source_for_running_mean: None,
2722            first_last_value: Some(FirstLastValue {
2723                value_expr,
2724                is_last: false,
2725            }),
2726            source_for_running_count: None,
2727        }
2728    }
2729
2730    /// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2731    /// When the window has orderBy, over_window uses order-sensitive semantics (#1145).
2732    pub fn last_value(&self) -> Column {
2733        let value_expr = self.expr().clone();
2734        Column {
2735            name: "last_value".to_string(),
2736            expr: value_expr.clone().last(),
2737            is_array_expr: false,
2738            deferred: None,
2739            udf_call: None,
2740            source_for_running: None,
2741            source_for_running_mean: None,
2742            first_last_value: Some(FirstLastValue {
2743                value_expr,
2744                is_last: true,
2745            }),
2746            source_for_running_count: None,
2747        }
2748    }
2749
2750    /// Percent rank in partition: (rank - 1) / (count - 1). Window is applied; do not call .over() again.
2751    pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
2752        use polars::prelude::*;
2753        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2754        let opts = RankOptions {
2755            method: RankMethod::Min,
2756            descending,
2757        };
2758        let rank_expr = self
2759            .expr()
2760            .clone()
2761            .rank(opts, None)
2762            .over(partition_exprs.clone());
2763        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2764        let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
2765        let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
2766        // Avoid division by zero: single-row partition -> 0.0 (PySpark parity)
2767        // When count=2, count_f=1.0; use gt(0) not gt(1) so we compute (rank-1)/1 correctly
2768        let pct = when(count_f.clone().gt(lit(0.0)))
2769            .then(rank_f / count_f)
2770            .otherwise(lit(0.0));
2771        Self::from_expr(pct, None)
2772    }
2773
2774    /// Cumulative distribution in partition: row_number / count. Window is applied; do not call .over() again.
2775    pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
2776        use polars::prelude::*;
2777        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2778        let opts = RankOptions {
2779            method: RankMethod::Ordinal,
2780            descending,
2781        };
2782        let row_num = self
2783            .expr()
2784            .clone()
2785            .rank(opts, None)
2786            .over(partition_exprs.clone());
2787        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2788        // Avoid division by zero when partition is empty
2789        let count_f = count_expr.clone().cast(DataType::Float64);
2790        let cume = when(count_f.clone().eq(lit(0.0)))
2791            .then(lit(0.0))
2792            .otherwise(row_num.cast(DataType::Float64) / count_f);
2793        Self::from_expr(cume, None)
2794    }
2795
2796    /// Ntile: bucket 1..n by rank within partition (ceil(rank * n / count)). Window is applied; do not call .over() again.
2797    pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
2798        use polars::prelude::*;
2799        let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2800            vec![lit(1i32)]
2801        } else {
2802            partition_by.iter().map(|s| col(*s)).collect()
2803        };
2804        let opts = RankOptions {
2805            method: RankMethod::Ordinal,
2806            descending,
2807        };
2808        let rank_expr = self
2809            .expr()
2810            .clone()
2811            .rank(opts, None)
2812            .over(partition_exprs.clone());
2813        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2814        let n_expr = lit(n as f64);
2815        let rank_f = rank_expr.cast(DataType::Float64);
2816        let count_f = count_expr.cast(DataType::Float64);
2817        // Avoid division by zero when partition is empty: use bucket 1.
2818        // PySpark parity: ntile(n) uses floor((rank - 1) * n / count) + 1 so that
2819        // the first buckets get the extra rows when count % n != 0.
2820        let bucket = when(count_f.clone().eq(lit(0.0))).then(lit(1.0)).otherwise(
2821            ((rank_f.clone() - lit(1.0)) * n_expr.clone() / count_f.clone()).floor() + lit(1.0),
2822        );
2823        let clamped = bucket.clip(lit(1.0), lit(n as f64));
2824        Self::from_expr(clamped.cast(DataType::Int32), None)
2825    }
2826
2827    /// Nth value in partition by order (1-based n). Returns a Column with window already applied; do not call .over() again.
2828    pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
2829        use polars::prelude::*;
2830        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2831        let opts = RankOptions {
2832            method: RankMethod::Ordinal,
2833            descending,
2834        };
2835        let rank_expr = self
2836            .expr()
2837            .clone()
2838            .rank(opts, None)
2839            .over(partition_exprs.clone());
2840        let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
2841        let null_col = Self::from_expr(lit(NULL), None);
2842        let value_col = Self::from_expr(self.expr().clone(), None);
2843        let when_expr = crate::functions::when(&cond_col)
2844            .then(&value_col)
2845            .otherwise(&null_col)
2846            .into_expr();
2847        let windowed = when_expr.max().over(partition_exprs);
2848        Self::from_expr(windowed, None)
2849    }
2850
2851    /// Number of elements in list (PySpark size / array_size). Returns Int32.
2852    pub fn array_size(&self) -> Column {
2853        use polars::prelude::*;
2854        Self::from_expr(
2855            self.expr().clone().list().len().cast(DataType::Int32),
2856            Some("size".to_string()),
2857        )
2858    }
2859
2860    /// Cardinality: number of elements in array/list (PySpark cardinality). Alias for array_size.
2861    pub fn cardinality(&self) -> Column {
2862        self.array_size()
2863    }
2864
2865    /// Check if list contains value (PySpark array_contains).
2866    pub fn array_contains(&self, value: Expr) -> Column {
2867        use polars::prelude::*;
2868        let args = [value];
2869        let base_expr = self.expr().clone().map_many(
2870            |cols| expect_col(crate::udfs::apply_array_contains(cols)),
2871            &args,
2872            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
2873        );
2874        // Ensure PySpark parity for null arrays: array_contains(null, x) -> null.
2875        let is_null = self.expr().clone().is_null();
2876        let expr = when(is_null)
2877            .then(lit(NULL))
2878            .otherwise(base_expr)
2879            .cast(DataType::Boolean);
2880        Self::from_expr(expr, None)
2881    }
2882
2883    /// Join list of strings with separator (PySpark array_join).
2884    pub fn array_join(&self, separator: &str) -> Column {
2885        use polars::prelude::*;
2886        // PySpark array_join accepts arrays of any element type and stringifies elements.
2887        // Cast elements to String via list.eval before joining.
2888        let elem_to_str = col("").cast(DataType::String);
2889        let list_expr = self.expr().clone().list().eval(elem_to_str);
2890        let joined = list_expr.list().join(lit(separator.to_string()), false);
2891        Self::from_expr(joined, None)
2892    }
2893
2894    /// Maximum element in list (PySpark array_max).
2895    pub fn array_max(&self) -> Column {
2896        Self::from_expr(self.expr().clone().list().max(), None)
2897    }
2898
2899    /// Minimum element in list (PySpark array_min).
2900    pub fn array_min(&self) -> Column {
2901        Self::from_expr(self.expr().clone().list().min(), None)
2902    }
2903
2904    /// Get element at 1-based index (PySpark element_at). Returns null if out of bounds.
2905    pub fn element_at(&self, index: i64) -> Column {
2906        use polars::prelude::*;
2907        // PySpark uses 1-based indexing; Polars uses 0-based. index 1 -> get(0).
2908        let idx = if index >= 1 { index - 1 } else { index };
2909        Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
2910    }
2911
2912    /// Get element at 0-based index (PySpark Column.getItem). Returns null if out of bounds.
2913    pub fn get_item(&self, index: i64) -> Column {
2914        use polars::prelude::*;
2915        Self::from_expr(self.expr().clone().list().get(lit(index), true), None)
2916    }
2917
2918    /// Get struct field by name (PySpark Column.getField).
2919    pub fn get_field(&self, name: &str) -> Column {
2920        Self::from_expr(
2921            self.expr().clone().struct_().field_by_name(name),
2922            Some(name.to_string()),
2923        )
2924    }
2925
2926    /// Add or replace a struct field (PySpark Column.withField).
2927    ///
2928    /// Panics if the column is not a struct type. If you need error handling, use
2929    /// [`Column::try_with_field`].
2930    pub fn with_field(&self, name: &str, value: &Column) -> Column {
2931        self.try_with_field(name, value)
2932            .expect("with_field: column must be struct type")
2933    }
2934
2935    /// Add or replace a struct field (PySpark Column.withField), returning an error if the
2936    /// column is not a struct type. Uses a map_many UDF so we don't rely on Polars "*" wildcard
2937    /// (removed in 0.53). Schema callback returns the extended struct so collected rows include
2938    /// the new field (issue #1066).
2939    pub fn try_with_field(
2940        &self,
2941        name: &str,
2942        value: &Column,
2943    ) -> Result<Column, polars::error::PolarsError> {
2944        use polars::prelude::PlSmallStr;
2945        let field_name = name.to_string();
2946        let field_name_schema = field_name.clone();
2947        let args = [value.expr().clone()];
2948        let expr = self.expr().clone().map_many(
2949            move |cols| {
2950                // map_many passes [self, ...args]: self is struct, args[0] is value
2951                expect_col(crate::udfs::apply_struct_with_field(
2952                    cols[0].clone(),
2953                    cols[1].clone(),
2954                    &field_name,
2955                ))
2956            },
2957            &args,
2958            move |_schema, fields| {
2959                let struct_field = &fields[0];
2960                let struct_dtype = struct_field.dtype();
2961                let inner: &[Field] = match struct_dtype {
2962                    DataType::Struct(f) => f.as_ref(),
2963                    _ => return Ok(struct_field.clone()),
2964                };
2965                let value_dtype = fields[1].dtype().clone();
2966                let known_value_dtype = if value_dtype.is_known() {
2967                    value_dtype
2968                } else if let DataType::Unknown(uk) = &value_dtype {
2969                    uk.materialize().unwrap_or(DataType::String)
2970                } else {
2971                    DataType::String
2972                };
2973                let mut new_fields: Vec<Field> = inner.to_vec();
2974                let mut replaced = false;
2975                for f in &mut new_fields {
2976                    if f.name.as_str() == field_name_schema {
2977                        // When replacing, use value's dtype so type changes (e.g. int -> string) and
2978                        // new complex types (e.g. array) are reflected in schema (Issue #1263).
2979                        let dtype = if known_value_dtype.is_known() {
2980                            known_value_dtype.clone()
2981                        } else if f.dtype.is_known() {
2982                            f.dtype.clone()
2983                        } else {
2984                            DataType::String
2985                        };
2986                        *f = Field::new(PlSmallStr::from(f.name.as_str()), dtype);
2987                        replaced = true;
2988                        break;
2989                    }
2990                }
2991                if !replaced {
2992                    new_fields.push(Field::new(
2993                        PlSmallStr::from(field_name_schema.as_str()),
2994                        known_value_dtype,
2995                    ));
2996                }
2997                let out_dtype = DataType::Struct(new_fields);
2998                Ok(Field::new(struct_field.name().clone(), out_dtype))
2999            },
3000        );
3001        Ok(Self::from_expr(expr, None))
3002    }
3003
3004    /// Sort list elements (PySpark array_sort). Ascending, nulls last.
3005    pub fn array_sort(&self) -> Column {
3006        use polars::prelude::SortOptions;
3007        let opts = SortOptions {
3008            descending: false,
3009            nulls_last: true,
3010            ..Default::default()
3011        };
3012        Self::from_expr(self.expr().clone().list().sort(opts), None)
3013    }
3014
3015    /// Distinct elements in list (PySpark array_distinct). Preserves first-occurrence order.
3016    pub fn array_distinct(&self) -> Column {
3017        let expr = self.expr().clone().map(
3018            |s| expect_col(crate::udfs::apply_array_distinct_first_order(s)),
3019            |_schema, field| {
3020                let new_name = format!("array_distinct({})", field.name());
3021                Ok(Field::new(new_name.into(), field.dtype().clone()))
3022            },
3023        );
3024        Self::from_expr(expr, None)
3025    }
3026
3027    /// Mode aggregation - most frequent value (PySpark mode).
3028    /// Uses value_counts sorted by count descending, then first.
3029    pub fn mode(&self) -> Column {
3030        // value_counts(sort=true, parallel=false, name="count", normalize=false)
3031        // puts highest count first; first() gives the mode
3032        // Struct has "count" and value field; field 0 is typically the value
3033        let vc = self
3034            .expr()
3035            .clone()
3036            .value_counts(true, false, "count", false);
3037        let first_struct = vc.first();
3038        let val_expr = first_struct.struct_().field_by_index(0);
3039        Self::from_expr(val_expr, Some("mode".to_string()))
3040    }
3041
3042    /// Slice list from start with optional length (PySpark slice). 1-based start.
3043    pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
3044        use polars::prelude::*;
3045        let start_expr = lit((start - 1).max(0)); // 1-based to 0-based
3046        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
3047        Self::from_expr(
3048            self.expr().clone().list().slice(start_expr, length_expr),
3049            None,
3050        )
3051    }
3052
3053    /// Explode list into one row per element (PySpark explode).
3054    pub fn explode(&self) -> Column {
3055        use polars::prelude::ExplodeOptions;
3056        Self::from_expr(
3057            self.expr().clone().explode(ExplodeOptions {
3058                empty_as_null: false,
3059                keep_nulls: false,
3060            }),
3061            None,
3062        )
3063    }
3064
3065    /// Explode list; null/empty produces one row with null (PySpark explode_outer).
3066    pub fn explode_outer(&self) -> Column {
3067        use polars::prelude::ExplodeOptions;
3068        Self::from_expr(
3069            self.expr().clone().explode(ExplodeOptions {
3070                empty_as_null: true,
3071                keep_nulls: true,
3072            }),
3073            None,
3074        )
3075    }
3076
3077    /// Posexplode with null preservation (PySpark posexplode_outer).
3078    ///
3079    /// Implementation detail:
3080    /// - Build a list of structs per row: [{pos, col}, {pos, col}, ...].
3081    /// - Explode that single list column once, then project struct fields "pos" and "col".
3082    ///   This avoids applying two separate explode() expressions on the same list column,
3083    ///   which can lead to length mismatches when both position and value are selected
3084    ///   in the same DataFrame.select call.
3085    pub fn posexplode_outer(&self) -> (Column, Column) {
3086        use polars::prelude::{ExplodeOptions, as_struct};
3087
3088        let opts = ExplodeOptions {
3089            empty_as_null: true,
3090            keep_nulls: true,
3091        };
3092
3093        // In list.eval context, col("") is the current list element. cum_count(false)
3094        // yields 1-based positions, so subtract 1 for 0-based PySpark posexplode parity.
3095        let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
3096        let val_inner = col("").alias("col");
3097        let struct_expr = as_struct(vec![pos_inner, val_inner]);
3098
3099        // list of structs [{pos, col}, ...] per input row
3100        let list_struct_expr = self.expr().clone().list().eval(struct_expr);
3101        // explode once to get a struct column with one row per element (or null/empty handling via opts)
3102        let struct_exploded = list_struct_expr.explode(opts);
3103
3104        let pos_expr = struct_exploded.clone().struct_().field_by_name("pos");
3105        let val_expr = struct_exploded.struct_().field_by_name("col");
3106
3107        (
3108            Self::from_expr(pos_expr, Some("pos".to_string())),
3109            Self::from_expr(val_expr, Some("col".to_string())),
3110        )
3111    }
3112
3113    /// Zip two arrays element-wise into array of structs (PySpark arrays_zip).
3114    pub fn arrays_zip(&self, other: &Column) -> Column {
3115        let args = [other.expr().clone()];
3116        let expr = self.expr().clone().map_many(
3117            |cols| expect_col(crate::udfs::apply_arrays_zip(cols)),
3118            &args,
3119            |_schema, fields| Ok(fields[0].clone()),
3120        );
3121        Self::from_expr(expr, None)
3122    }
3123
3124    /// True if two arrays have any element in common (PySpark arrays_overlap).
3125    pub fn arrays_overlap(&self, other: &Column) -> Column {
3126        use polars::prelude::*;
3127
3128        let args = [other.expr().clone()];
3129        let base_expr = self.expr().clone().map_many(
3130            |cols| expect_col(crate::udfs::apply_arrays_overlap(cols)),
3131            &args,
3132            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
3133        );
3134
3135        // PySpark parity: arrays_overlap(null_array, other) and arrays_overlap(array, null_array)
3136        // both yield null, not false. This also drives Column-arg array_contains.
3137        let is_null = self
3138            .expr()
3139            .clone()
3140            .is_null()
3141            .or(other.expr().clone().is_null());
3142        let expr = polars::prelude::when(is_null)
3143            .then(lit(NULL))
3144            .otherwise(base_expr)
3145            .cast(DataType::Boolean);
3146
3147        Self::from_expr(expr, None)
3148    }
3149
3150    /// Collect to array (PySpark array_agg). Alias for implode in group context.
3151    pub fn array_agg(&self) -> Column {
3152        Self::from_expr(self.expr().clone().implode(), None)
3153    }
3154
3155    /// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
3156    /// Uses Polars list.eval with col("") as element (requires polars list_eval feature).
3157    pub fn array_position(&self, value: Expr) -> Column {
3158        use polars::prelude::{DataType, NULL};
3159        // In list.eval context, col("") refers to the current list element.
3160        let cond = Self::from_expr(col("").eq(value), None);
3161        let then_val = Self::from_expr(col("").cum_count(false), None);
3162        let else_val = Self::from_expr(lit(NULL), None);
3163        let idx_expr = crate::functions::when(&cond)
3164            .then(&then_val)
3165            .otherwise(&else_val)
3166            .into_expr();
3167        let list_expr = self
3168            .expr()
3169            .clone()
3170            .list()
3171            .eval(idx_expr)
3172            .list()
3173            .min()
3174            .fill_null(lit(0i64))
3175            .cast(DataType::Int64);
3176        Self::from_expr(list_expr, Some("array_position".to_string()))
3177    }
3178
3179    /// Remove null elements from list (PySpark array_compact). Preserves order.
3180    pub fn array_compact(&self) -> Column {
3181        let list_expr = self.expr().clone().list().drop_nulls();
3182        Self::from_expr(list_expr, None)
3183    }
3184
3185    /// New list with all elements equal to value removed (PySpark array_remove).
3186    /// Uses list.eval + drop_nulls (requires polars list_eval and list_drop_nulls).
3187    pub fn array_remove(&self, value: Expr) -> Column {
3188        use polars::prelude::NULL;
3189        // when(element != value) then element else null; then drop_nulls.
3190        let cond = Self::from_expr(col("").neq(value), None);
3191        let then_val = Self::from_expr(col(""), None);
3192        let else_val = Self::from_expr(lit(NULL), None);
3193        let elem_neq = crate::functions::when(&cond)
3194            .then(&then_val)
3195            .otherwise(&else_val)
3196            .into_expr();
3197        let list_expr = self
3198            .expr()
3199            .clone()
3200            .list()
3201            .eval(elem_neq)
3202            .list()
3203            .drop_nulls();
3204        Self::from_expr(list_expr, None)
3205    }
3206
3207    /// Repeat each element n times (PySpark array_repeat). Implemented via map UDF.
3208    pub fn array_repeat(&self, n: i64) -> Column {
3209        let expr = self.expr().clone().map(
3210            move |c| expect_col(crate::udfs::apply_array_repeat(c, n)),
3211            |_schema, field| Ok(field.clone()),
3212        );
3213        Self::from_expr(expr, None)
3214    }
3215
3216    /// Flatten list of lists to one list (PySpark flatten). Implemented via map UDF.
3217    pub fn array_flatten(&self) -> Column {
3218        let expr = self.expr().clone().map(
3219            |s| expect_col(crate::udfs::apply_array_flatten(s)),
3220            |_schema, field| Ok(field.clone()),
3221        );
3222        Self::from_expr(expr, None)
3223    }
3224
3225    /// Append element to end of list (PySpark array_append).
3226    pub fn array_append(&self, elem: &Column) -> Column {
3227        let args = [elem.expr().clone()];
3228        let expr = self.expr().clone().map_many(
3229            |cols| expect_col(crate::udfs::apply_array_append(cols)),
3230            &args,
3231            |_schema, fields| Ok(fields[0].clone()),
3232        );
3233        Self::from_expr(expr, None)
3234    }
3235
3236    /// Prepend element to start of list (PySpark array_prepend).
3237    pub fn array_prepend(&self, elem: &Column) -> Column {
3238        let args = [elem.expr().clone()];
3239        let expr = self.expr().clone().map_many(
3240            |cols| expect_col(crate::udfs::apply_array_prepend(cols)),
3241            &args,
3242            |_schema, fields| Ok(fields[0].clone()),
3243        );
3244        Self::from_expr(expr, None)
3245    }
3246
3247    /// Insert element at 1-based position (PySpark array_insert).
3248    pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
3249        let args = [pos.expr().clone(), elem.expr().clone()];
3250        let expr = self.expr().clone().map_many(
3251            |cols| expect_col(crate::udfs::apply_array_insert(cols)),
3252            &args,
3253            |_schema, fields| Ok(fields[0].clone()),
3254        );
3255        Self::from_expr(expr, None)
3256    }
3257
3258    /// Elements in first array not in second (PySpark array_except).
3259    pub fn array_except(&self, other: &Column) -> Column {
3260        let args = [other.expr().clone()];
3261        let expr = self.expr().clone().map_many(
3262            |cols| expect_col(crate::udfs::apply_array_except(cols)),
3263            &args,
3264            |_schema, fields| Ok(fields[0].clone()),
3265        );
3266        Self::from_expr(expr, None)
3267    }
3268
3269    /// Elements in both arrays (PySpark array_intersect).
3270    pub fn array_intersect(&self, other: &Column) -> Column {
3271        let args = [other.expr().clone()];
3272        let expr = self.expr().clone().map_many(
3273            |cols| expect_col(crate::udfs::apply_array_intersect(cols)),
3274            &args,
3275            |_schema, fields| Ok(fields[0].clone()),
3276        );
3277        Self::from_expr(expr, None)
3278    }
3279
3280    /// Distinct elements from both arrays (PySpark array_union).
3281    pub fn array_union(&self, other: &Column) -> Column {
3282        let args = [other.expr().clone()];
3283        let expr = self.expr().clone().map_many(
3284            |cols| expect_col(crate::udfs::apply_array_union(cols)),
3285            &args,
3286            |_schema, fields| Ok(fields[0].clone()),
3287        );
3288        Self::from_expr(expr, None)
3289    }
3290
3291    /// Zip two arrays element-wise with merge function (PySpark zip_with). Shorter array padded with null.
3292    /// Merge Expr uses col("").struct_().field_by_name("left") and field_by_name("right").
3293    pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
3294        let args = [other.expr().clone()];
3295        let zip_expr = self.expr().clone().map_many(
3296            |cols| expect_col(crate::udfs::apply_zip_arrays_to_struct(cols)),
3297            &args,
3298            |_schema, fields| {
3299                let left_inner = match &fields[0].dtype {
3300                    DataType::List(inner) => *inner.clone(),
3301                    _ => DataType::Unknown(Default::default()),
3302                };
3303                let right_inner = match fields.get(1).map(|f| &f.dtype) {
3304                    Some(DataType::List(inner)) => *inner.clone(),
3305                    _ => DataType::Unknown(Default::default()),
3306                };
3307                let struct_dtype = DataType::Struct(vec![
3308                    Field::new("left".into(), left_inner),
3309                    Field::new("right".into(), right_inner),
3310                ]);
3311                Ok(Field::new(
3312                    fields[0].name().clone(),
3313                    DataType::List(Box::new(struct_dtype)),
3314                ))
3315            },
3316        );
3317        let list_expr = zip_expr.list().eval(merge);
3318        Self::from_expr(list_expr, None)
3319    }
3320
3321    /// True if any list element satisfies the predicate (PySpark exists). Uses list.eval(pred).list().any().
3322    pub fn array_exists(&self, predicate: Expr) -> Column {
3323        let pred_expr = self.expr().clone().list().eval(predicate).list().any();
3324        Self::from_expr(pred_expr, Some("exists".to_string()))
3325    }
3326
3327    /// True if all list elements satisfy the predicate (PySpark forall). Uses list.eval(pred).list().all().
3328    pub fn array_forall(&self, predicate: Expr) -> Column {
3329        let pred_expr = self.expr().clone().list().eval(predicate).list().all();
3330        Self::from_expr(pred_expr, Some("forall".to_string()))
3331    }
3332
3333    /// Filter list elements by predicate (PySpark filter). Keeps elements where predicate is true.
3334    pub fn array_filter(&self, predicate: Expr) -> Column {
3335        use polars::prelude::NULL;
3336        let then_val = Self::from_expr(col(""), None);
3337        let else_val = Self::from_expr(lit(NULL), None);
3338        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
3339            .then(&then_val)
3340            .otherwise(&else_val)
3341            .into_expr();
3342        let list_expr = self
3343            .expr()
3344            .clone()
3345            .list()
3346            .eval(elem_expr)
3347            .list()
3348            .drop_nulls();
3349        Self::from_expr(list_expr, None)
3350    }
3351
3352    /// Transform list elements by expression (PySpark transform). list.eval(expr).
3353    pub fn array_transform(&self, f: Expr) -> Column {
3354        let list_expr = self.expr().clone().list().eval(f);
3355        Self::from_expr(list_expr, None)
3356    }
3357
3358    /// Sum of list elements (PySpark aggregate with sum). Uses list.sum().
3359    pub fn array_sum(&self) -> Column {
3360        Self::from_expr(self.expr().clone().list().sum(), None)
3361    }
3362
3363    /// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list). Full (zero, merge, finish) deferred.
3364    pub fn array_aggregate(&self, zero: &Column) -> Column {
3365        let sum_expr = self.expr().clone().list().sum();
3366        Self::from_expr(sum_expr + zero.expr().clone(), None)
3367    }
3368
3369    /// Mean of list elements (PySpark aggregate with avg). Uses list.mean().
3370    pub fn array_mean(&self) -> Column {
3371        Self::from_expr(self.expr().clone().list().mean(), None)
3372    }
3373
3374    /// Explode list with position (PySpark posexplode). Returns (pos_col, value_col).
3375    ///
3376    /// Implementation detail:
3377    /// - Build a list of structs per row: [{pos, col}, {pos, col}, ...].
3378    /// - Explode that single list column once, then project struct fields "pos" and "col".
3379    ///   This ensures that selecting both position and value in the same DataFrame.select
3380    ///   call yields a single exploded DataFrame (matching PySpark posexplode semantics)
3381    ///   instead of attempting two independent explode() calls on the same list column.
3382    pub fn posexplode(&self) -> (Column, Column) {
3383        use polars::prelude::{ExplodeOptions, as_struct};
3384
3385        let opts = ExplodeOptions {
3386            empty_as_null: false,
3387            keep_nulls: false,
3388        };
3389
3390        // In list.eval context, col("") is the current list element. cum_count(false)
3391        // yields 1-based positions, so subtract 1 for 0-based PySpark posexplode parity.
3392        let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
3393        let val_inner = col("").alias("col");
3394        let struct_expr = as_struct(vec![pos_inner, val_inner]);
3395
3396        // list of structs [{pos, col}, ...] per input row
3397        let list_struct_expr = self.expr().clone().list().eval(struct_expr);
3398        // explode once to get a struct column with one row per element
3399        let struct_exploded = list_struct_expr.explode(opts);
3400
3401        let pos_expr = struct_exploded.clone().struct_().field_by_name("pos");
3402        let val_expr = struct_exploded.struct_().field_by_name("col");
3403
3404        (
3405            Self::from_expr(pos_expr, Some("pos".to_string())),
3406            Self::from_expr(val_expr, Some("col".to_string())),
3407        )
3408    }
3409
3410    /// Extract keys from a map column (PySpark map_keys). Map column is List(Struct{key, value}).
3411    pub fn map_keys(&self) -> Column {
3412        let elem_key = col("").struct_().field_by_name("key");
3413        let list_expr = self.expr().clone().list().eval(elem_key);
3414        Self::from_expr(list_expr, None)
3415    }
3416
3417    /// Extract values from a map column (PySpark map_values). Map column is List(Struct{key, value}).
3418    pub fn map_values(&self) -> Column {
3419        let elem_val = col("").struct_().field_by_name("value");
3420        let list_expr = self.expr().clone().list().eval(elem_val);
3421        Self::from_expr(list_expr, None)
3422    }
3423
3424    /// Return map as list of structs {key, value} (PySpark map_entries). Identity for List(Struct) column.
3425    pub fn map_entries(&self) -> Column {
3426        Self::from_expr(self.expr().clone(), None)
3427    }
3428
3429    /// Build map from two array columns (keys, values) (PySpark map_from_arrays). Implemented via map_many UDF.
3430    pub fn map_from_arrays(&self, values: &Column) -> Column {
3431        let args = [values.expr().clone()];
3432        let expr = self.expr().clone().map_many(
3433            |cols| expect_col(crate::udfs::apply_map_from_arrays(cols)),
3434            &args,
3435            |_schema, fields| Ok(fields[0].clone()),
3436        );
3437        Self::from_expr(expr, None)
3438    }
3439
3440    /// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
3441    pub fn map_concat(&self, other: &Column) -> Column {
3442        let args = [other.expr().clone()];
3443        let expr = self.expr().clone().map_many(
3444            |cols| expect_col(crate::udfs::apply_map_concat(cols)),
3445            &args,
3446            |_schema, fields| Ok(fields[0].clone()),
3447        );
3448        Self::from_expr(expr, None)
3449    }
3450
3451    /// Transform each map key by expr (PySpark transform_keys). key_expr should use col("").struct_().field_by_name("key").
3452    pub fn transform_keys(&self, key_expr: Expr) -> Column {
3453        use polars::prelude::as_struct;
3454        let value = col("").struct_().field_by_name("value");
3455        let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
3456        let list_expr = self.expr().clone().list().eval(new_struct);
3457        Self::from_expr(list_expr, None)
3458    }
3459
3460    /// Transform each map value by expr (PySpark transform_values). value_expr should use col("").struct_().field_by_name("value").
3461    pub fn transform_values(&self, value_expr: Expr) -> Column {
3462        use polars::prelude::as_struct;
3463        let key = col("").struct_().field_by_name("key");
3464        let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
3465        let list_expr = self.expr().clone().list().eval(new_struct);
3466        Self::from_expr(list_expr, None)
3467    }
3468
3469    /// Merge two maps by key with merge function (PySpark map_zip_with).
3470    /// Merge Expr uses col("").struct_().field_by_name("value1") and field_by_name("value2").
3471    pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
3472        use polars::prelude::as_struct;
3473        let args = [other.expr().clone()];
3474        let zip_expr = self.expr().clone().map_many(
3475            |cols| expect_col(crate::udfs::apply_map_zip_to_struct(cols)),
3476            &args,
3477            |_schema, fields| {
3478                let list_inner = match &fields[0].dtype {
3479                    DataType::List(inner) => *inner.clone(),
3480                    _ => return Ok(fields[0].clone()),
3481                };
3482                let (key_dtype, value_dtype) = match &list_inner {
3483                    DataType::Struct(struct_fields) => {
3484                        let k = struct_fields
3485                            .iter()
3486                            .find(|f| f.name.as_str() == "key")
3487                            .map(|f| f.dtype.clone())
3488                            .unwrap_or(DataType::String);
3489                        let v = struct_fields
3490                            .iter()
3491                            .find(|f| f.name.as_str() == "value")
3492                            .map(|f| f.dtype.clone())
3493                            .unwrap_or(DataType::String);
3494                        (k, v)
3495                    }
3496                    _ => (DataType::String, DataType::String),
3497                };
3498                let out_struct = DataType::Struct(vec![
3499                    Field::new("key".into(), key_dtype),
3500                    Field::new("value1".into(), value_dtype.clone()),
3501                    Field::new("value2".into(), value_dtype),
3502                ]);
3503                Ok(Field::new(
3504                    fields[0].name().clone(),
3505                    DataType::List(Box::new(out_struct)),
3506                ))
3507            },
3508        );
3509        let key_field = col("").struct_().field_by_name("key").alias("key");
3510        let value_field = merge.alias("value");
3511        let merge_expr = as_struct(vec![key_field, value_field]);
3512        let list_expr = zip_expr.list().eval(merge_expr);
3513        Self::from_expr(list_expr, None)
3514    }
3515
3516    /// Filter map entries by predicate (PySpark map_filter). Keeps key-value pairs where predicate is true.
3517    /// Predicate uses col("").struct_().field_by_name("key") and field_by_name("value") to reference key/value.
3518    pub fn map_filter(&self, predicate: Expr) -> Column {
3519        use polars::prelude::NULL;
3520        let then_val = Self::from_expr(col(""), None);
3521        let else_val = Self::from_expr(lit(NULL), None);
3522        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
3523            .then(&then_val)
3524            .otherwise(&else_val)
3525            .into_expr();
3526        let list_expr = self
3527            .expr()
3528            .clone()
3529            .list()
3530            .eval(elem_expr)
3531            .list()
3532            .drop_nulls();
3533        Self::from_expr(list_expr, None)
3534    }
3535
3536    /// Array of structs {key, value} to map (PySpark map_from_entries). Identity for List(Struct) format.
3537    pub fn map_from_entries(&self) -> Column {
3538        Self::from_expr(self.expr().clone(), None)
3539    }
3540
3541    /// True if map contains key (PySpark map_contains_key).
3542    pub fn map_contains_key(&self, key: &Column) -> Column {
3543        let args = [key.expr().clone()];
3544        let expr = self.expr().clone().map_many(
3545            |cols| expect_col(crate::udfs::apply_map_contains_key(cols)),
3546            &args,
3547            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
3548        );
3549        Self::from_expr(expr, None)
3550    }
3551
3552    /// Get value for key from map, or null (PySpark get).
3553    pub fn get(&self, key: &Column) -> Column {
3554        let args = [key.expr().clone()];
3555        let expr = self.expr().clone().map_many(
3556            |cols| expect_col(crate::udfs::apply_get(cols)),
3557            &args,
3558            |_schema, fields| {
3559                let dtype = &fields[0].dtype;
3560                let value_dtype = match dtype {
3561                    DataType::List(inner) => match inner.as_ref() {
3562                        DataType::Struct(struct_fields) => struct_fields
3563                            .iter()
3564                            .find(|f| f.name == "value")
3565                            .map(|f| f.dtype.clone())
3566                            .unwrap_or(DataType::String),
3567                        _ => DataType::String,
3568                    },
3569                    DataType::Struct(struct_fields) => struct_fields
3570                        .first()
3571                        .map(|f| f.dtype.clone())
3572                        .unwrap_or(DataType::String),
3573                    _ => DataType::String,
3574                };
3575                Ok(Field::new(fields[0].name().clone(), value_dtype))
3576            },
3577        );
3578        Self::from_expr(expr, None)
3579    }
3580
3581    /// Extract JSON path from string column (PySpark get_json_object). UDF returns string always (#1146).
3582    pub fn get_json_object(&self, path: &str) -> Column {
3583        let path = path.to_string();
3584        let expr = self
3585            .expr()
3586            .clone()
3587            .map(
3588                move |s| expect_col(crate::udfs::apply_get_json_object(s, &path)),
3589                |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3590            )
3591            .cast(DataType::String);
3592        Self::from_expr(expr, None)
3593    }
3594
3595    /// Parse string column as JSON into struct (PySpark from_json). Uses Polars str().json_decode.
3596    pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
3597        use polars::prelude::DataType;
3598        let dtype = schema.unwrap_or(DataType::String);
3599        let out = self.expr().clone().str().json_decode(dtype);
3600        Self::from_expr(out, None)
3601    }
3602
3603    /// Serialize struct column to JSON string (PySpark to_json). Uses Polars struct().json_encode.
3604    pub fn to_json(&self) -> Column {
3605        let out = self.expr().clone().struct_().json_encode();
3606        Self::from_expr(out, None)
3607    }
3608
3609    /// Length of JSON array at path (PySpark json_array_length). UDF.
3610    pub fn json_array_length(&self, path: &str) -> Column {
3611        let path = path.to_string();
3612        let expr = self.expr().clone().map(
3613            move |s| expect_col(crate::udfs::apply_json_array_length(s, &path)),
3614            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
3615        );
3616        Self::from_expr(expr, None)
3617    }
3618
3619    /// Keys of JSON object (PySpark json_object_keys). Returns list of strings. UDF.
3620    pub fn json_object_keys(&self) -> Column {
3621        let expr = self.expr().clone().map(
3622            |s| expect_col(crate::udfs::apply_json_object_keys(s)),
3623            |_schema, field| {
3624                Ok(Field::new(
3625                    field.name().clone(),
3626                    DataType::List(Box::new(DataType::String)),
3627                ))
3628            },
3629        );
3630        Self::from_expr(expr, None)
3631    }
3632
3633    /// Extract keys from JSON as struct (PySpark json_tuple). UDF. Returns struct with one string field per key.
3634    pub fn json_tuple(&self, keys: &[&str]) -> Column {
3635        let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
3636        let struct_fields: Vec<polars::datatypes::Field> = keys_vec
3637            .iter()
3638            .map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
3639            .collect();
3640        let expr = self.expr().clone().map(
3641            move |s| expect_col(crate::udfs::apply_json_tuple(s, &keys_vec)),
3642            move |_schema, field| {
3643                Ok(Field::new(
3644                    field.name().clone(),
3645                    DataType::Struct(struct_fields.clone()),
3646                ))
3647            },
3648        );
3649        Self::from_expr(expr, None)
3650    }
3651
3652    /// Parse CSV string to struct (PySpark from_csv). Minimal: split by comma, up to 32 columns. UDF.
3653    pub fn from_csv(&self) -> Column {
3654        let expr = self.expr().clone().map(
3655            |s| expect_col(crate::udfs::apply_from_csv(s)),
3656            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Struct(vec![]))),
3657        );
3658        Self::from_expr(expr, None)
3659    }
3660
3661    /// Format struct as CSV string (PySpark to_csv). Minimal. UDF.
3662    pub fn to_csv(&self) -> Column {
3663        let expr = self.expr().clone().map(
3664            |s| expect_col(crate::udfs::apply_to_csv(s)),
3665            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3666        );
3667        Self::from_expr(expr, None)
3668    }
3669
3670    /// Parse URL and extract part (PySpark parse_url). UDF.
3671    /// When part is QUERY/QUERYSTRING and key is Some(k), returns the value for that query parameter only.
3672    pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
3673        let part = part.to_string();
3674        let key_owned = key.map(String::from);
3675        let expr = self.expr().clone().map(
3676            move |s| expect_col(crate::udfs::apply_parse_url(s, &part, key_owned.as_deref())),
3677            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3678        );
3679        Self::from_expr(expr, None)
3680    }
3681
3682    /// Hash of column value (PySpark hash). Single-column version.
3683    pub fn hash(&self) -> Column {
3684        let expr = self.expr().clone().map(
3685            |s| expect_col(crate::udfs::apply_hash_one(s)),
3686            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
3687        );
3688        Self::from_expr(expr, None)
3689    }
3690
3691    /// Check if column values are in the other column's list/series (PySpark isin).
3692    pub fn isin(&self, other: &Column) -> Column {
3693        let out = self.expr().clone().is_in(other.expr().clone(), false);
3694        Self::from_expr(out, None)
3695    }
3696
3697    /// Percent-decode URL-encoded string (PySpark url_decode). Uses UDF.
3698    pub fn url_decode(&self) -> Column {
3699        let expr = self.expr().clone().map(
3700            |s| expect_col(crate::udfs::apply_url_decode(s)),
3701            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3702        );
3703        Self::from_expr(expr, None)
3704    }
3705
3706    /// Percent-encode string for URL (PySpark url_encode). Uses UDF.
3707    pub fn url_encode(&self) -> Column {
3708        let expr = self.expr().clone().map(
3709            |s| expect_col(crate::udfs::apply_url_encode(s)),
3710            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3711        );
3712        Self::from_expr(expr, None)
3713    }
3714
3715    /// Bitwise left shift (PySpark shiftLeft). col << n = col * 2^n.
3716    pub fn shift_left(&self, n: i32) -> Column {
3717        use polars::prelude::*;
3718        let pow = lit(2i64).pow(lit(n as i64));
3719        Self::from_expr(
3720            (self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
3721            None,
3722        )
3723    }
3724
3725    /// Bitwise signed right shift (PySpark shiftRight). col >> n = col / 2^n.
3726    pub fn shift_right(&self, n: i32) -> Column {
3727        use polars::prelude::*;
3728        let pow = lit(2i64).pow(lit(n as i64));
3729        Self::from_expr(
3730            (self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
3731            None,
3732        )
3733    }
3734
3735    /// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift.
3736    pub fn shift_right_unsigned(&self, n: i32) -> Column {
3737        let expr = self.expr().clone().map(
3738            move |s| expect_col(crate::udfs::apply_shift_right_unsigned(s, n)),
3739            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
3740        );
3741        Self::from_expr(expr, None)
3742    }
3743}
3744
3745#[cfg(test)]
3746mod tests {
3747    use super::Column;
3748    use polars::prelude::{IntoLazy, col, df, lit};
3749
3750    /// Helper to create a simple DataFrame for testing
3751    fn test_df() -> polars::prelude::DataFrame {
3752        df!(
3753            "a" => &[1, 2, 3, 4, 5],
3754            "b" => &[10, 20, 30, 40, 50]
3755        )
3756        .unwrap()
3757    }
3758
3759    /// Helper to create a DataFrame with nulls for testing
3760    fn test_df_with_nulls() -> polars::prelude::DataFrame {
3761        df!(
3762            "a" => &[Some(1), Some(2), None, Some(4), None],
3763            "b" => &[Some(10), None, Some(30), None, None]
3764        )
3765        .unwrap()
3766    }
3767
3768    #[test]
3769    fn test_column_new() {
3770        let column = Column::new("age".to_string());
3771        assert_eq!(column.name(), "age");
3772    }
3773
3774    #[test]
3775    fn test_column_from_expr() {
3776        let expr = col("test");
3777        let column = Column::from_expr(expr, Some("test".to_string()));
3778        assert_eq!(column.name(), "test");
3779    }
3780
3781    #[test]
3782    fn test_column_from_expr_default_name() {
3783        let expr = col("test").gt(lit(5));
3784        let column = Column::from_expr(expr, None);
3785        assert_eq!(column.name(), "<expr>");
3786    }
3787
3788    #[test]
3789    fn test_column_alias() {
3790        let column = Column::new("original".to_string());
3791        let aliased = column.alias("new_name");
3792        assert_eq!(aliased.name(), "new_name");
3793    }
3794
3795    #[test]
3796    fn test_column_gt() {
3797        let df = test_df();
3798        let column = Column::new("a".to_string());
3799        let result = column.gt(lit(3));
3800
3801        // Apply the expression to filter the DataFrame
3802        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3803        assert_eq!(filtered.height(), 2); // rows with a > 3: 4, 5
3804    }
3805
3806    #[test]
3807    fn test_column_lt() {
3808        let df = test_df();
3809        let column = Column::new("a".to_string());
3810        let result = column.lt(lit(3));
3811
3812        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3813        assert_eq!(filtered.height(), 2); // rows with a < 3: 1, 2
3814    }
3815
3816    #[test]
3817    fn test_column_eq() {
3818        let df = test_df();
3819        let column = Column::new("a".to_string());
3820        let result = column.eq(lit(3));
3821
3822        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3823        assert_eq!(filtered.height(), 1); // only row with a == 3
3824    }
3825
3826    #[test]
3827    fn test_column_neq() {
3828        let df = test_df();
3829        let column = Column::new("a".to_string());
3830        let result = column.neq(lit(3));
3831
3832        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3833        assert_eq!(filtered.height(), 4); // rows with a != 3
3834    }
3835
3836    #[test]
3837    fn test_column_gt_eq() {
3838        let df = test_df();
3839        let column = Column::new("a".to_string());
3840        let result = column.gt_eq(lit(3));
3841
3842        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3843        assert_eq!(filtered.height(), 3); // rows with a >= 3: 3, 4, 5
3844    }
3845
3846    #[test]
3847    fn test_column_lt_eq() {
3848        let df = test_df();
3849        let column = Column::new("a".to_string());
3850        let result = column.lt_eq(lit(3));
3851
3852        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3853        assert_eq!(filtered.height(), 3); // rows with a <= 3: 1, 2, 3
3854    }
3855
3856    #[test]
3857    fn test_column_is_null() {
3858        let df = test_df_with_nulls();
3859        let column = Column::new("a".to_string());
3860        let result = column.is_null();
3861
3862        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3863        assert_eq!(filtered.height(), 2); // 2 null values in column 'a'
3864    }
3865
3866    #[test]
3867    fn test_column_is_not_null() {
3868        let df = test_df_with_nulls();
3869        let column = Column::new("a".to_string());
3870        let result = column.is_not_null();
3871
3872        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3873        assert_eq!(filtered.height(), 3); // 3 non-null values in column 'a'
3874    }
3875
3876    #[test]
3877    fn test_null_boolean_column_produces_null_bool_series() {
3878        let df = test_df();
3879        let expr = Column::null_boolean().into_expr();
3880        let out = df
3881            .lazy()
3882            .select([expr.alias("null_bool")])
3883            .collect()
3884            .unwrap();
3885        let s = out.column("null_bool").unwrap();
3886        assert_eq!(s.dtype(), &polars::prelude::DataType::Boolean);
3887        assert_eq!(s.null_count(), s.len());
3888    }
3889
3890    #[test]
3891    fn test_eq_null_safe_both_null() {
3892        // Create a DataFrame where both columns have NULL at the same row
3893        let df = df!(
3894            "a" => &[Some(1), None, Some(3)],
3895            "b" => &[Some(1), None, Some(4)]
3896        )
3897        .unwrap();
3898
3899        let col_a = Column::new("a".to_string());
3900        let col_b = Column::new("b".to_string());
3901        let result = col_a.eq_null_safe(&col_b);
3902
3903        // Apply the expression and collect
3904        let result_df = df
3905            .lazy()
3906            .with_column(result.into_expr().alias("eq_null_safe"))
3907            .collect()
3908            .unwrap();
3909
3910        // Get the result column
3911        let eq_col = result_df.column("eq_null_safe").unwrap();
3912        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
3913
3914        // Row 0: 1 == 1 -> true
3915        // Row 1: NULL <=> NULL -> true
3916        // Row 2: 3 == 4 -> false
3917        assert_eq!(values[0], Some(true));
3918        assert_eq!(values[1], Some(true)); // NULL-safe: both NULL = true
3919        assert_eq!(values[2], Some(false));
3920    }
3921
3922    #[test]
3923    fn test_eq_null_safe_one_null() {
3924        // Create a DataFrame where only one column has NULL
3925        let df = df!(
3926            "a" => &[Some(1), None, Some(3)],
3927            "b" => &[Some(1), Some(2), None]
3928        )
3929        .unwrap();
3930
3931        let col_a = Column::new("a".to_string());
3932        let col_b = Column::new("b".to_string());
3933        let result = col_a.eq_null_safe(&col_b);
3934
3935        let result_df = df
3936            .lazy()
3937            .with_column(result.into_expr().alias("eq_null_safe"))
3938            .collect()
3939            .unwrap();
3940
3941        let eq_col = result_df.column("eq_null_safe").unwrap();
3942        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
3943
3944        // Row 0: 1 == 1 -> true
3945        // Row 1: NULL <=> 2 -> false (one is null, not both)
3946        // Row 2: 3 <=> NULL -> false (one is null, not both)
3947        assert_eq!(values[0], Some(true));
3948        assert_eq!(values[1], Some(false));
3949        assert_eq!(values[2], Some(false));
3950    }
3951}