Skip to main content

robin_sparkless_polars/
column.rs

1use polars::prelude::{
2    DataType, Expr, Field, PolarsError, PolarsResult, RankMethod, RankOptions, TimeUnit, col, lit,
3};
4
5/// Unwrap UDF result to Column (map() expects Result<Column>, UDFs return Result<Option<Column>>).
6#[inline]
7pub(crate) fn expect_col(
8    r: PolarsResult<Option<polars::prelude::Column>>,
9) -> PolarsResult<polars::prelude::Column> {
10    r.and_then(|o| o.ok_or_else(|| PolarsError::ComputeError("expected column".into())))
11}
12
13/// Convert SQL LIKE pattern (% = any sequence, _ = one char) to regex. Escapes regex specials.
14/// When escape_char is Some(esc), esc + any char treats that char as literal (no %/_ expansion).
15fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
16    let mut out = String::with_capacity(pattern.len() * 2);
17    let mut it = pattern.chars();
18    while let Some(c) = it.next() {
19        if escape_char == Some(c) {
20            if let Some(next) = it.next() {
21                // Literal: escape for regex
22                if "\\.*+?[](){}^$|".contains(next) {
23                    out.push('\\');
24                }
25                out.push(next);
26            } else {
27                out.push('\\');
28                out.push(c);
29            }
30        } else {
31            match c {
32                '%' => out.push_str(".*"),
33                '_' => out.push('.'),
34                '\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
35                | '|' => {
36                    out.push('\\');
37                    out.push(c);
38                }
39                _ => out.push(c),
40            }
41        }
42    }
43    format!("^{out}$")
44}
45
46/// Deferred random column: when added via with_column, we generate a full-length series in one go (PySpark-like).
47#[derive(Debug, Clone, Copy)]
48pub enum DeferredRandom {
49    Rand(Option<u64>),
50    Randn(Option<u64>),
51}
52
53/// Column - represents a column in a DataFrame, used for building expressions
54/// Thin wrapper around Polars `Expr`. May carry a DeferredRandom for rand/randn so with_column can produce one value per row.
55/// May carry UdfCall for Python UDFs (eager execution at with_column).
56#[derive(Debug, Clone)]
57pub struct Column {
58    name: String,
59    expr: Expr, // Polars expression for lazy evaluation
60    /// When Some, with_column generates a full-length random series instead of using expr (PySpark-like per-row rand/randn).
61    pub deferred: Option<DeferredRandom>,
62    /// When Some, with_column executes Python UDF eagerly (name, arg columns).
63    pub udf_call: Option<(String, Vec<Column>)>,
64}
65
66impl Column {
67    /// Create a new Column from a column name
68    pub fn new(name: String) -> Self {
69        Column {
70            name: name.clone(),
71            expr: col(&name),
72            deferred: None,
73            udf_call: None,
74        }
75    }
76
77    /// Create a Column from a Polars Expr
78    pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
79        let display_name = name.unwrap_or_else(|| "<expr>".to_string());
80        Column {
81            name: display_name,
82            expr,
83            deferred: None,
84            udf_call: None,
85        }
86    }
87
88    /// Create a Column for Python UDF call (eager execution at with_column).
89    pub fn from_udf_call(name: String, args: Vec<Column>) -> Self {
90        Column {
91            name: format!("{name}()"),
92            expr: lit(0i32), // dummy, never used
93            deferred: None,
94            udf_call: Some((name, args)),
95        }
96    }
97
98    /// Create a Column for rand(seed). When used in with_column, generates one value per row (PySpark-like).
99    pub fn from_rand(seed: Option<u64>) -> Self {
100        let expr = lit(1i64).cum_sum(false).map(
101            move |c| expect_col(crate::udfs::apply_rand_with_seed(c, seed)),
102            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
103        );
104        Column {
105            name: "rand".to_string(),
106            expr,
107            deferred: Some(DeferredRandom::Rand(seed)),
108            udf_call: None,
109        }
110    }
111
112    /// Create a Column for randn(seed). When used in with_column, generates one value per row (PySpark-like).
113    pub fn from_randn(seed: Option<u64>) -> Self {
114        let expr = lit(1i64).cum_sum(false).map(
115            move |c| expect_col(crate::udfs::apply_randn_with_seed(c, seed)),
116            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
117        );
118        Column {
119            name: "randn".to_string(),
120            expr,
121            deferred: Some(DeferredRandom::Randn(seed)),
122            udf_call: None,
123        }
124    }
125
126    /// Get the underlying Polars Expr
127    pub fn expr(&self) -> &Expr {
128        &self.expr
129    }
130
131    /// Convert to Polars Expr (consumes self)
132    pub fn into_expr(self) -> Expr {
133        self.expr
134    }
135
136    /// Get the column name
137    pub fn name(&self) -> &str {
138        &self.name
139    }
140
141    /// Alias the column
142    pub fn alias(&self, name: &str) -> Column {
143        Column {
144            name: name.to_string(),
145            expr: self.expr.clone().alias(name),
146            deferred: self.deferred,
147            udf_call: self.udf_call.clone(),
148        }
149    }
150
151    /// Ascending sort, nulls first (Spark default for ASC). PySpark asc.
152    pub fn asc(&self) -> crate::functions::SortOrder {
153        crate::functions::asc(self)
154    }
155
156    /// Ascending sort, nulls first. PySpark asc_nulls_first.
157    pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
158        crate::functions::asc_nulls_first(self)
159    }
160
161    /// Ascending sort, nulls last. PySpark asc_nulls_last.
162    pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
163        crate::functions::asc_nulls_last(self)
164    }
165
166    /// Descending sort, nulls last (Spark default for DESC). PySpark desc.
167    pub fn desc(&self) -> crate::functions::SortOrder {
168        crate::functions::desc(self)
169    }
170
171    /// Descending sort, nulls first. PySpark desc_nulls_first.
172    pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
173        crate::functions::desc_nulls_first(self)
174    }
175
176    /// Descending sort, nulls last. PySpark desc_nulls_last.
177    pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
178        crate::functions::desc_nulls_last(self)
179    }
180
181    /// Check if column is null
182    pub fn is_null(&self) -> Column {
183        Column {
184            name: format!("({} IS NULL)", self.name),
185            expr: self.expr.clone().is_null(),
186            deferred: None,
187            udf_call: None,
188        }
189    }
190
191    /// Check if column is not null
192    pub fn is_not_null(&self) -> Column {
193        Column {
194            name: format!("({} IS NOT NULL)", self.name),
195            expr: self.expr.clone().is_not_null(),
196            deferred: None,
197            udf_call: None,
198        }
199    }
200
201    /// Alias for is_null. PySpark isnull.
202    pub fn isnull(&self) -> Column {
203        self.is_null()
204    }
205
206    /// Alias for is_not_null. PySpark isnotnull.
207    pub fn isnotnull(&self) -> Column {
208        self.is_not_null()
209    }
210
211    /// Create a null boolean expression
212    fn null_boolean_expr() -> Expr {
213        use polars::prelude::*;
214        // Create an expression that is always a null boolean
215        lit(NULL).cast(DataType::Boolean)
216    }
217
218    /// SQL LIKE pattern matching (% = any chars, _ = one char). PySpark like.
219    /// When escape_char is Some(esc), esc + char treats that char as literal (e.g. \\% = literal %).
220    pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
221        let regex = like_pattern_to_regex(pattern, escape_char);
222        self.regexp_like(&regex)
223    }
224
225    /// Case-insensitive LIKE. PySpark ilike.
226    /// When escape_char is Some(esc), esc + char treats that char as literal.
227    pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
228        use polars::prelude::*;
229        let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
230        Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
231    }
232
233    /// PySpark-style equality comparison (NULL == NULL returns NULL, not True)
234    /// Any comparison involving NULL returns NULL
235    ///
236    /// Explicitly wraps comparisons with null checks to ensure PySpark semantics.
237    /// If either side is NULL, the result is NULL.
238    pub fn eq_pyspark(&self, other: &Column) -> Column {
239        // Check if either side is NULL
240        let left_null = self.expr().clone().is_null();
241        let right_null = other.expr().clone().is_null();
242        let either_null = left_null.clone().or(right_null.clone());
243
244        // Standard equality comparison
245        let eq_result = self.expr().clone().eq(other.expr().clone());
246
247        // Wrap: if either is null, return null boolean, else return comparison result
248        let null_boolean = Self::null_boolean_expr();
249        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
250            .then(&Self::from_expr(null_boolean, None))
251            .otherwise(&Self::from_expr(eq_result, None));
252
253        Self::from_expr(null_aware_expr.into_expr(), None)
254    }
255
256    /// PySpark-style inequality comparison (NULL != NULL returns NULL, not False)
257    /// Any comparison involving NULL returns NULL
258    pub fn ne_pyspark(&self, other: &Column) -> Column {
259        // Check if either side is NULL
260        let left_null = self.expr().clone().is_null();
261        let right_null = other.expr().clone().is_null();
262        let either_null = left_null.clone().or(right_null.clone());
263
264        // Standard inequality comparison
265        let ne_result = self.expr().clone().neq(other.expr().clone());
266
267        // Wrap: if either is null, return null boolean, else return comparison result
268        let null_boolean = Self::null_boolean_expr();
269        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
270            .then(&Self::from_expr(null_boolean, None))
271            .otherwise(&Self::from_expr(ne_result, None));
272
273        Self::from_expr(null_aware_expr.into_expr(), None)
274    }
275
276    /// Null-safe equality (NULL <=> NULL returns True)
277    /// PySpark's eqNullSafe() method. Applies type coercion (e.g. string vs int) for PySpark parity (#266).
278    pub fn eq_null_safe(&self, other: &Column) -> Column {
279        use crate::functions::{lit_bool, when};
280
281        let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(
282            self.expr().clone(),
283            other.expr().clone(),
284        )
285        .unwrap_or_else(|_| (self.expr().clone(), other.expr().clone()));
286
287        let left_null = left_c.clone().is_null();
288        let right_null = right_c.clone().is_null();
289        let both_null = left_null.clone().and(right_null.clone());
290        let either_null = left_null.clone().or(right_null.clone());
291
292        // Standard equality (on coerced exprs)
293        let eq_result = left_c.eq(right_c);
294
295        // If both are null, return True
296        // If either is null (but not both), return False
297        // Otherwise, return standard equality result
298        when(&Self::from_expr(both_null, None))
299            .then(&lit_bool(true))
300            .otherwise(
301                &when(&Self::from_expr(either_null, None))
302                    .then(&lit_bool(false))
303                    .otherwise(&Self::from_expr(eq_result, None)),
304            )
305    }
306
307    /// Create a Column that is always a null boolean.
308    /// This is useful for downstream bindings (e.g. PyO3) that need a null literal
309    /// without depending directly on Polars types like `Expr` or `LiteralValue`.
310    pub fn null_boolean() -> Column {
311        Column::from_expr(Self::null_boolean_expr(), None)
312    }
313
314    /// Create a Column that is always a null value of the given type.
315    /// `dtype` is a type name string (e.g. `"boolean"`, `"string"`, `"bigint"`, `"double"`).
316    /// See [`crate::functions::parse_type_name`] for supported names.
317    /// Returns `Err` on unknown type name so bindings get a clear error.
318    pub fn lit_null(dtype: &str) -> Result<Column, String> {
319        use polars::prelude::{NULL, lit};
320        let dt = crate::functions::parse_type_name(dtype)?;
321        Ok(Column::from_expr(lit(NULL).cast(dt), None))
322    }
323
324    /// Create a Column from a boolean literal. Convenience for bindings that prefer method form.
325    pub fn from_bool(b: bool) -> Column {
326        crate::functions::lit_bool(b)
327    }
328
329    /// Create a Column from an i64 literal. Convenience for bindings that prefer method form.
330    pub fn from_i64(n: i64) -> Column {
331        crate::functions::lit_i64(n)
332    }
333
334    /// Create a Column from a string literal. Convenience for bindings that prefer method form.
335    pub fn from_string(s: &str) -> Column {
336        crate::functions::lit_str(s)
337    }
338
339    /// PySpark-style greater-than comparison (NULL > value returns NULL)
340    /// Any comparison involving NULL returns NULL
341    pub fn gt_pyspark(&self, other: &Column) -> Column {
342        // Check if either side is NULL
343        let left_null = self.expr().clone().is_null();
344        let right_null = other.expr().clone().is_null();
345        let either_null = left_null.clone().or(right_null.clone());
346
347        // Standard greater-than comparison
348        let gt_result = self.expr().clone().gt(other.expr().clone());
349
350        // Wrap: if either is null, return null boolean, else return comparison result
351        let null_boolean = Self::null_boolean_expr();
352        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
353            .then(&Self::from_expr(null_boolean, None))
354            .otherwise(&Self::from_expr(gt_result, None));
355
356        Self::from_expr(null_aware_expr.into_expr(), None)
357    }
358
359    /// PySpark-style greater-than-or-equal comparison
360    /// Any comparison involving NULL returns NULL
361    pub fn ge_pyspark(&self, other: &Column) -> Column {
362        // Check if either side is NULL
363        let left_null = self.expr().clone().is_null();
364        let right_null = other.expr().clone().is_null();
365        let either_null = left_null.clone().or(right_null.clone());
366
367        // Standard greater-than-or-equal comparison
368        let ge_result = self.expr().clone().gt_eq(other.expr().clone());
369
370        // Wrap: if either is null, return null boolean, else return comparison result
371        let null_boolean = Self::null_boolean_expr();
372        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
373            .then(&Self::from_expr(null_boolean, None))
374            .otherwise(&Self::from_expr(ge_result, None));
375
376        Self::from_expr(null_aware_expr.into_expr(), None)
377    }
378
379    /// PySpark-style less-than comparison
380    /// Any comparison involving NULL returns NULL
381    pub fn lt_pyspark(&self, other: &Column) -> Column {
382        // Check if either side is NULL
383        let left_null = self.expr().clone().is_null();
384        let right_null = other.expr().clone().is_null();
385        let either_null = left_null.clone().or(right_null.clone());
386
387        // Standard less-than comparison
388        let lt_result = self.expr().clone().lt(other.expr().clone());
389
390        // Wrap: if either is null, return null boolean, else return comparison result
391        let null_boolean = Self::null_boolean_expr();
392        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
393            .then(&Self::from_expr(null_boolean, None))
394            .otherwise(&Self::from_expr(lt_result, None));
395
396        Self::from_expr(null_aware_expr.into_expr(), None)
397    }
398
399    /// PySpark-style less-than-or-equal comparison
400    /// Any comparison involving NULL returns NULL
401    pub fn le_pyspark(&self, other: &Column) -> Column {
402        // Check if either side is NULL
403        let left_null = self.expr().clone().is_null();
404        let right_null = other.expr().clone().is_null();
405        let either_null = left_null.clone().or(right_null.clone());
406
407        // Standard less-than-or-equal comparison
408        let le_result = self.expr().clone().lt_eq(other.expr().clone());
409
410        // Wrap: if either is null, return null boolean, else return comparison result
411        let null_boolean = Self::null_boolean_expr();
412        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
413            .then(&Self::from_expr(null_boolean, None))
414            .otherwise(&Self::from_expr(le_result, None));
415
416        Self::from_expr(null_aware_expr.into_expr(), None)
417    }
418
419    // Standard comparison methods that work with Expr (for literals and columns)
420    // These delegate to Polars and may not match PySpark null semantics exactly.
421    // Use _pyspark variants for explicit PySpark semantics.
422
423    /// Greater than comparison
424    pub fn gt(&self, other: Expr) -> Column {
425        Self::from_expr(self.expr().clone().gt(other), None)
426    }
427
428    /// Greater than or equal comparison
429    pub fn gt_eq(&self, other: Expr) -> Column {
430        Self::from_expr(self.expr().clone().gt_eq(other), None)
431    }
432
433    /// Less than comparison
434    pub fn lt(&self, other: Expr) -> Column {
435        Self::from_expr(self.expr().clone().lt(other), None)
436    }
437
438    /// Less than or equal comparison
439    pub fn lt_eq(&self, other: Expr) -> Column {
440        Self::from_expr(self.expr().clone().lt_eq(other), None)
441    }
442
443    /// Equality comparison
444    pub fn eq(&self, other: Expr) -> Column {
445        Self::from_expr(self.expr().clone().eq(other), None)
446    }
447
448    /// Inequality comparison
449    pub fn neq(&self, other: Expr) -> Column {
450        Self::from_expr(self.expr().clone().neq(other), None)
451    }
452
453    // Equality comparison with special handling for string-vs-numeric literals (issue #235).
454    //
455    // When comparing a column to a numeric literal (e.g. col("s") == lit(123)), Polars
456    // normally raises `cannot compare string with numeric type` if the column is a
457    // string column. PySpark, however, coerces types (string → numeric) and performs
458    // the comparison, treating invalid strings as null (non-matching in filters).
459
460    // --- String functions ---
461
462    /// Convert string column to uppercase (PySpark upper)
463    pub fn upper(&self) -> Column {
464        Self::from_expr(self.expr().clone().str().to_uppercase(), None)
465    }
466
467    /// Convert string column to lowercase (PySpark lower)
468    pub fn lower(&self) -> Column {
469        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
470    }
471
472    /// Alias for lower. PySpark lcase.
473    pub fn lcase(&self) -> Column {
474        self.lower()
475    }
476
477    /// Alias for upper. PySpark ucase.
478    pub fn ucase(&self) -> Column {
479        self.upper()
480    }
481
482    /// Substring with 1-based start (PySpark substring/substr semantics).
483    /// - Positive start: 1-based index (1 = first char).
484    /// - Negative start: count from end (e.g. -3 = third char from end).
485    /// - Length less than 1: empty string.
486    pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
487        use polars::prelude::*;
488        // PySpark: len < 1 -> empty string
489        if length.map(|l| l < 1).unwrap_or(false) {
490            return Self::from_expr(lit(""), None);
491        }
492        let len_chars = self.expr().clone().str().len_chars();
493        // 1-based start: positive -> 0-based offset = (start - 1).max(0); negative -> from end: len + start (clamped to 0)
494        let offset_expr = if start >= 1 {
495            lit((start - 1).max(0))
496        } else {
497            let from_end = len_chars + lit(start);
498            when(from_end.clone().lt(lit(0i64)))
499                .then(lit(0i64))
500                .otherwise(from_end)
501        };
502        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
503        Self::from_expr(
504            self.expr().clone().str().slice(offset_expr, length_expr),
505            None,
506        )
507    }
508
509    /// String length in characters (PySpark length)
510    pub fn length(&self) -> Column {
511        Self::from_expr(self.expr().clone().str().len_chars(), None)
512    }
513
514    /// Bit length of string in bytes * 8 (PySpark bit_length).
515    pub fn bit_length(&self) -> Column {
516        use polars::prelude::*;
517        let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
518        Self::from_expr(len_bytes * lit(8i32), None)
519    }
520
521    /// Length of string in bytes (PySpark octet_length).
522    pub fn octet_length(&self) -> Column {
523        use polars::prelude::*;
524        Self::from_expr(
525            self.expr().clone().str().len_bytes().cast(DataType::Int32),
526            None,
527        )
528    }
529
530    /// Length of string in characters (PySpark char_length). Alias of length().
531    pub fn char_length(&self) -> Column {
532        self.length()
533    }
534
535    /// Length of string in characters (PySpark character_length). Alias of length().
536    pub fn character_length(&self) -> Column {
537        self.length()
538    }
539
540    /// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
541    pub fn encode(&self, charset: &str) -> Column {
542        let charset = charset.to_string();
543        let expr = self.expr().clone().map(
544            move |s| expect_col(crate::udfs::apply_encode(s, &charset)),
545            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
546        );
547        Self::from_expr(expr, None)
548    }
549
550    /// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
551    pub fn decode(&self, charset: &str) -> Column {
552        let charset = charset.to_string();
553        let expr = self.expr().clone().map(
554            move |s| expect_col(crate::udfs::apply_decode(s, &charset)),
555            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
556        );
557        Self::from_expr(expr, None)
558    }
559
560    /// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'. Returns hex string.
561    pub fn to_binary(&self, fmt: &str) -> Column {
562        let fmt = fmt.to_string();
563        let expr = self.expr().clone().map(
564            move |s| expect_col(crate::udfs::apply_to_binary(s, &fmt)),
565            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
566        );
567        Self::from_expr(expr, None)
568    }
569
570    /// Try convert to binary; null on failure (PySpark try_to_binary).
571    pub fn try_to_binary(&self, fmt: &str) -> Column {
572        let fmt = fmt.to_string();
573        let expr = self.expr().clone().map(
574            move |s| expect_col(crate::udfs::apply_try_to_binary(s, &fmt)),
575            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
576        );
577        Self::from_expr(expr, None)
578    }
579
580    /// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM. Output hex(nonce||ciphertext).
581    pub fn aes_encrypt(&self, key: &str) -> Column {
582        let key = key.to_string();
583        let expr = self.expr().clone().map(
584            move |s| expect_col(crate::udfs::apply_aes_encrypt(s, &key)),
585            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
586        );
587        Self::from_expr(expr, None)
588    }
589
590    /// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext). Null on failure.
591    pub fn aes_decrypt(&self, key: &str) -> Column {
592        let key = key.to_string();
593        let expr = self.expr().clone().map(
594            move |s| expect_col(crate::udfs::apply_aes_decrypt(s, &key)),
595            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
596        );
597        Self::from_expr(expr, None)
598    }
599
600    /// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
601    pub fn try_aes_decrypt(&self, key: &str) -> Column {
602        let key = key.to_string();
603        let expr = self.expr().clone().map(
604            move |s| expect_col(crate::udfs::apply_try_aes_decrypt(s, &key)),
605            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
606        );
607        Self::from_expr(expr, None)
608    }
609
610    /// Data type as string (PySpark typeof). Uses dtype from schema.
611    pub fn typeof_(&self) -> Column {
612        Self::from_expr(
613            self.expr().clone().map(
614                |s| expect_col(crate::udfs::apply_typeof(s)),
615                |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
616            ),
617            None,
618        )
619    }
620
621    /// Trim leading and trailing whitespace (PySpark trim)
622    pub fn trim(&self) -> Column {
623        use polars::prelude::*;
624        Self::from_expr(self.expr().clone().str().strip_chars(lit(" \t\n\r")), None)
625    }
626
627    /// Trim leading whitespace (PySpark ltrim)
628    pub fn ltrim(&self) -> Column {
629        use polars::prelude::*;
630        Self::from_expr(
631            self.expr().clone().str().strip_chars_start(lit(" \t\n\r")),
632            None,
633        )
634    }
635
636    /// Trim trailing whitespace (PySpark rtrim)
637    pub fn rtrim(&self) -> Column {
638        use polars::prelude::*;
639        Self::from_expr(
640            self.expr().clone().str().strip_chars_end(lit(" \t\n\r")),
641            None,
642        )
643    }
644
645    /// Trim leading and trailing characters (PySpark btrim). trim_str defaults to whitespace.
646    pub fn btrim(&self, trim_str: Option<&str>) -> Column {
647        use polars::prelude::*;
648        let chars = trim_str.unwrap_or(" \t\n\r");
649        Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
650    }
651
652    /// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
653    pub fn locate(&self, substr: &str, pos: i64) -> Column {
654        use polars::prelude::*;
655        if substr.is_empty() {
656            return Self::from_expr(lit(1i64), None);
657        }
658        let start = (pos - 1).max(0);
659        let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
660        let found = slice_expr.str().find_literal(lit(substr.to_string()));
661        Self::from_expr(
662            (found.cast(DataType::Int64) + lit(start + 1)).fill_null(lit(0i64)),
663            None,
664        )
665    }
666
667    /// Base conversion (PySpark conv). num_str from from_base to to_base.
668    pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
669        let expr = self.expr().clone().map(
670            move |s| expect_col(crate::udfs::apply_conv(s, from_base, to_base)),
671            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
672        );
673        Self::from_expr(expr, None)
674    }
675
676    /// Convert to hex string (PySpark hex). Int or string input.
677    pub fn hex(&self) -> Column {
678        let expr = self.expr().clone().map(
679            |s| expect_col(crate::udfs::apply_hex(s)),
680            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
681        );
682        Self::from_expr(expr, None)
683    }
684
685    /// Convert hex string to binary/string (PySpark unhex).
686    pub fn unhex(&self) -> Column {
687        let expr = self.expr().clone().map(
688            |s| expect_col(crate::udfs::apply_unhex(s)),
689            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
690        );
691        Self::from_expr(expr, None)
692    }
693
694    /// Convert integer to binary string (PySpark bin).
695    pub fn bin(&self) -> Column {
696        let expr = self.expr().clone().map(
697            |s| expect_col(crate::udfs::apply_bin(s)),
698            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
699        );
700        Self::from_expr(expr, None)
701    }
702
703    /// Get bit at 0-based position (PySpark getbit).
704    pub fn getbit(&self, pos: i64) -> Column {
705        let expr = self.expr().clone().map(
706            move |s| expect_col(crate::udfs::apply_getbit(s, pos)),
707            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
708        );
709        Self::from_expr(expr, None)
710    }
711
712    /// Bitwise AND of two integer/boolean columns (PySpark bit_and).
713    pub fn bit_and(&self, other: &Column) -> Column {
714        let args = [other.expr().clone()];
715        let expr = self.expr().clone().cast(DataType::Int64).map_many(
716            |cols| expect_col(crate::udfs::apply_bit_and(cols)),
717            &args,
718            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
719        );
720        Self::from_expr(expr, None)
721    }
722
723    /// Bitwise OR of two integer/boolean columns (PySpark bit_or).
724    pub fn bit_or(&self, other: &Column) -> Column {
725        let args = [other.expr().clone()];
726        let expr = self.expr().clone().cast(DataType::Int64).map_many(
727            |cols| expect_col(crate::udfs::apply_bit_or(cols)),
728            &args,
729            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
730        );
731        Self::from_expr(expr, None)
732    }
733
734    /// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
735    pub fn bit_xor(&self, other: &Column) -> Column {
736        let args = [other.expr().clone()];
737        let expr = self.expr().clone().cast(DataType::Int64).map_many(
738            |cols| expect_col(crate::udfs::apply_bit_xor(cols)),
739            &args,
740            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
741        );
742        Self::from_expr(expr, None)
743    }
744
745    /// Count of set bits in the integer representation (PySpark bit_count).
746    pub fn bit_count(&self) -> Column {
747        let expr = self.expr().clone().map(
748            |s| expect_col(crate::udfs::apply_bit_count(s)),
749            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
750        );
751        Self::from_expr(expr, None)
752    }
753
754    /// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
755    /// When err_msg is Some, it is used in the error message when assertion fails.
756    pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
757        let msg = err_msg.map(String::from);
758        let expr = self.expr().clone().map(
759            move |c| expect_col(crate::udfs::apply_assert_true(c, msg.as_deref())),
760            |_schema, field| Ok(field.clone()),
761        );
762        Self::from_expr(expr, None)
763    }
764
765    /// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
766    pub fn bitwise_not(&self) -> Column {
767        // Use arithmetic identity: !n == -1 - n for two's-complement integers.
768        let expr = (lit(-1i64) - self.expr().clone().cast(DataType::Int64)).cast(DataType::Int64);
769        Self::from_expr(expr, None)
770    }
771
772    /// Parse string to map (PySpark str_to_map). "k1:v1,k2:v2" -> map.
773    pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
774        let pair_delim = pair_delim.to_string();
775        let key_value_delim = key_value_delim.to_string();
776        let expr = self.expr().clone().map(
777            move |s| {
778                expect_col(crate::udfs::apply_str_to_map(
779                    s,
780                    &pair_delim,
781                    &key_value_delim,
782                ))
783            },
784            |_schema, field| Ok(field.clone()),
785        );
786        Self::from_expr(expr, None)
787    }
788
789    /// True if regex pattern contains lookahead/lookbehind (Polars regex does not support these).
790    fn pattern_has_lookaround(pattern: &str) -> bool {
791        let p = pattern.as_bytes();
792        let n = p.len();
793        let mut i = 0;
794        while i + 2 < n {
795            if p[i] == b'(' && p[i + 1] == b'?' {
796                match p[i + 2] {
797                    b'=' | b'!' => return true, // (?= (?! lookahead
798                    b'<' if i + 4 <= n && (p[i + 3] == b'=' || p[i + 3] == b'!') => return true, // (?<= (?<! lookbehind
799                    _ => {}
800                }
801            }
802            i += 1;
803        }
804        false
805    }
806
807    /// Extract first match of regex pattern (PySpark regexp_extract). Group 0 = full match.
808    /// When pattern contains lookahead/lookbehind, uses fancy-regex (Polars regex does not support them).
809    pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
810        use polars::prelude::*;
811        if Self::pattern_has_lookaround(pattern) {
812            let pat = pattern.to_string();
813            let group = group_index;
814            Self::from_expr(
815                self.expr().clone().map(
816                    move |s| {
817                        expect_col(crate::udfs::apply_regexp_extract_lookaround(s, &pat, group))
818                    },
819                    |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
820                ),
821                None,
822            )
823        } else {
824            let pat = pattern.to_string();
825            Self::from_expr(
826                self.expr().clone().str().extract(lit(pat), group_index),
827                None,
828            )
829        }
830    }
831
832    /// Replace first match of regex pattern (PySpark regexp_replace). literal=false for regex.
833    pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
834        use polars::prelude::*;
835        let pat = pattern.to_string();
836        let rep = replacement.to_string();
837        Self::from_expr(
838            self.expr().clone().str().replace(lit(pat), lit(rep), false),
839            None,
840        )
841    }
842
843    /// Leftmost n characters (PySpark left).
844    pub fn left(&self, n: i64) -> Column {
845        use polars::prelude::*;
846        let len = n.max(0) as u32;
847        Self::from_expr(
848            self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
849            None,
850        )
851    }
852
853    /// Rightmost n characters (PySpark right).
854    pub fn right(&self, n: i64) -> Column {
855        use polars::prelude::*;
856        let n_val = n.max(0);
857        let n_expr = lit(n_val);
858        let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
859        let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
860            .then(lit(0i64))
861            .otherwise(len_chars - n_expr.clone());
862        Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
863    }
864
865    /// Replace all occurrences of literal search string with replacement (PySpark replace for literal).
866    pub fn replace(&self, search: &str, replacement: &str) -> Column {
867        use polars::prelude::*;
868        Self::from_expr(
869            self.expr().clone().str().replace_all(
870                lit(search.to_string()),
871                lit(replacement.to_string()),
872                true,
873            ),
874            None,
875        )
876    }
877
878    /// Replace multiple (search, replacement) pairs in order (PySpark replace with dict/list).
879    pub fn replace_many(&self, pairs: &[(String, String)]) -> Column {
880        let mut out = self.clone();
881        for (search, replacement) in pairs {
882            out = out.replace(search, replacement);
883        }
884        out
885    }
886
887    /// True if string starts with prefix (PySpark startswith).
888    pub fn startswith(&self, prefix: &str) -> Column {
889        use polars::prelude::*;
890        Self::from_expr(
891            self.expr()
892                .clone()
893                .str()
894                .starts_with(lit(prefix.to_string())),
895            None,
896        )
897    }
898
899    /// True if string ends with suffix (PySpark endswith).
900    pub fn endswith(&self, suffix: &str) -> Column {
901        use polars::prelude::*;
902        Self::from_expr(
903            self.expr().clone().str().ends_with(lit(suffix.to_string())),
904            None,
905        )
906    }
907
908    /// True if string contains substring (literal, not regex). PySpark contains.
909    pub fn contains(&self, substring: &str) -> Column {
910        use polars::prelude::*;
911        Self::from_expr(
912            self.expr()
913                .clone()
914                .str()
915                .contains(lit(substring.to_string()), true),
916            None,
917        )
918    }
919
920    /// Split string by delimiter (PySpark split). Returns list of strings.
921    /// When limit is Some(n) with n > 0, at most n parts; remainder in last part. None or <= 0: no limit.
922    /// Uses literal split so "|" is not interpreted as regex alternation.
923    pub fn split(&self, delimiter: &str, limit: Option<i32>) -> Column {
924        use polars::prelude::*;
925        let use_limit = limit.is_some_and(|l| l > 0);
926        if use_limit {
927            let delim = delimiter.to_string();
928            let lim = limit.unwrap_or(0);
929            let expr = self.expr().clone().map(
930                move |col| expect_col(crate::udfs::apply_split_with_limit(col, &delim, lim)),
931                |_schema, field| {
932                    Ok(Field::new(
933                        field.name().clone(),
934                        DataType::List(Box::new(DataType::String)),
935                    ))
936                },
937            );
938            Self::from_expr(expr, None)
939        } else {
940            Self::from_expr(
941                self.expr().clone().str().split(lit(delimiter.to_string())),
942                None,
943            )
944        }
945    }
946
947    /// Title case: first letter of each word uppercase (PySpark initcap).
948    /// Approximates with lowercase when Polars to_titlecase is not enabled.
949    pub fn initcap(&self) -> Column {
950        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
951    }
952
953    /// Extract all matches of regex (PySpark regexp_extract_all). Returns list of strings.
954    pub fn regexp_extract_all(&self, pattern: &str) -> Column {
955        use polars::prelude::*;
956        Self::from_expr(
957            self.expr()
958                .clone()
959                .str()
960                .extract_all(lit(pattern.to_string())),
961            None,
962        )
963    }
964
965    /// Check if string matches regex (PySpark regexp_like / rlike).
966    pub fn regexp_like(&self, pattern: &str) -> Column {
967        use polars::prelude::*;
968        Self::from_expr(
969            self.expr()
970                .clone()
971                .str()
972                .contains(lit(pattern.to_string()), false),
973            None,
974        )
975    }
976
977    /// Count of non-overlapping regex matches (PySpark regexp_count).
978    pub fn regexp_count(&self, pattern: &str) -> Column {
979        use polars::prelude::*;
980        Self::from_expr(
981            self.expr()
982                .clone()
983                .str()
984                .count_matches(lit(pattern.to_string()), false)
985                .cast(DataType::Int64),
986            None,
987        )
988    }
989
990    /// First substring matching regex (PySpark regexp_substr). Null if no match.
991    pub fn regexp_substr(&self, pattern: &str) -> Column {
992        self.regexp_extract(pattern, 0)
993    }
994
995    /// 1-based position of first regex match (PySpark regexp_instr). group_idx 0 = full match; null if no match.
996    pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
997        let idx = group_idx.unwrap_or(0);
998        let pattern = pattern.to_string();
999        let expr = self.expr().clone().map(
1000            move |s| expect_col(crate::udfs::apply_regexp_instr(s, pattern.clone(), idx)),
1001            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1002        );
1003        Self::from_expr(expr, None)
1004    }
1005
1006    /// 1-based index of self in comma-delimited set column (PySpark find_in_set). 0 if not found or self contains comma.
1007    pub fn find_in_set(&self, set_column: &Column) -> Column {
1008        let args = [set_column.expr().clone()];
1009        let expr = self.expr().clone().map_many(
1010            |cols| expect_col(crate::udfs::apply_find_in_set(cols)),
1011            &args,
1012            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
1013        );
1014        Self::from_expr(expr, None)
1015    }
1016
1017    /// Repeat string column n times (PySpark repeat). Each element repeated n times.
1018    pub fn repeat(&self, n: i32) -> Column {
1019        use polars::prelude::*;
1020        // repeat_by yields List[str]; join to get a single string per row.
1021        Self::from_expr(
1022            self.expr()
1023                .clone()
1024                .repeat_by(lit(n as u32))
1025                .list()
1026                .join(lit(""), false),
1027            None,
1028        )
1029    }
1030
1031    /// Reverse string (PySpark reverse).
1032    pub fn reverse(&self) -> Column {
1033        Self::from_expr(self.expr().clone().str().reverse(), None)
1034    }
1035
1036    /// Find substring position (1-based; 0 if not found). PySpark instr(col, substr).
1037    pub fn instr(&self, substr: &str) -> Column {
1038        use polars::prelude::*;
1039        let found = self
1040            .expr()
1041            .clone()
1042            .str()
1043            .find_literal(lit(substr.to_string()));
1044        // Polars find_literal returns 0-based index (null if not found); PySpark is 1-based, 0 when not found.
1045        Self::from_expr(
1046            (found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
1047            None,
1048        )
1049    }
1050
1051    /// Left-pad string to length with pad character (PySpark lpad).
1052    pub fn lpad(&self, length: i32, pad: &str) -> Column {
1053        let pad_str = if pad.is_empty() { " " } else { pad };
1054        let fill = pad_str.chars().next().unwrap_or(' ');
1055        Self::from_expr(
1056            self.expr()
1057                .clone()
1058                .str()
1059                .pad_start(lit(length as i64), fill),
1060            None,
1061        )
1062    }
1063
1064    /// Right-pad string to length with pad character (PySpark rpad).
1065    pub fn rpad(&self, length: i32, pad: &str) -> Column {
1066        let pad_str = if pad.is_empty() { " " } else { pad };
1067        let fill = pad_str.chars().next().unwrap_or(' ');
1068        Self::from_expr(
1069            self.expr().clone().str().pad_end(lit(length as i64), fill),
1070            None,
1071        )
1072    }
1073
1074    /// Character-by-character translation (PySpark translate). Replaces each char in from_str with corresponding in to_str; if to_str is shorter, extra from chars are removed.
1075    pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
1076        use polars::prelude::*;
1077        let mut e = self.expr().clone();
1078        let from_chars: Vec<char> = from_str.chars().collect();
1079        let to_chars: Vec<char> = to_str.chars().collect();
1080        for (i, fc) in from_chars.iter().enumerate() {
1081            let f = fc.to_string();
1082            let t = to_chars
1083                .get(i)
1084                .map(|c| c.to_string())
1085                .unwrap_or_else(String::new); // PySpark: no replacement = drop char
1086            e = e.str().replace_all(lit(f), lit(t), true);
1087        }
1088        Self::from_expr(e, None)
1089    }
1090
1091    /// Mask string: replace uppercase with upper_char, lowercase with lower_char, digits with digit_char (PySpark mask).
1092    /// Defaults: upper 'X', lower 'x', digit 'n'; other chars unchanged.
1093    pub fn mask(
1094        &self,
1095        upper_char: Option<char>,
1096        lower_char: Option<char>,
1097        digit_char: Option<char>,
1098        other_char: Option<char>,
1099    ) -> Column {
1100        use polars::prelude::*;
1101        let upper = upper_char.unwrap_or('X').to_string();
1102        let lower = lower_char.unwrap_or('x').to_string();
1103        let digit = digit_char.unwrap_or('n').to_string();
1104        let other = other_char.map(|c| c.to_string());
1105        let mut e = self
1106            .expr()
1107            .clone()
1108            .str()
1109            .replace_all(lit("[A-Z]".to_string()), lit(upper), false)
1110            .str()
1111            .replace_all(lit("[a-z]".to_string()), lit(lower), false)
1112            .str()
1113            .replace_all(lit(r"\d".to_string()), lit(digit), false);
1114        if let Some(o) = other {
1115            e = e
1116                .str()
1117                .replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
1118        }
1119        Self::from_expr(e, None)
1120    }
1121
1122    /// Split by delimiter and return 1-based part (PySpark split_part).
1123    /// part_num > 0: from left; part_num < 0: from right; part_num = 0: null; out-of-range: empty string.
1124    pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
1125        use polars::prelude::*;
1126        if part_num == 0 {
1127            return Self::from_expr(lit(NULL), None);
1128        }
1129        let use_regex = delimiter == "|";
1130        if use_regex {
1131            let pattern = delimiter.to_string();
1132            let part = part_num;
1133            let get_expr = self.expr().clone().map(
1134                move |col| expect_col(crate::udfs::apply_split_part_regex(col, &pattern, part)),
1135                |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1136            );
1137            let expr = when(self.expr().clone().is_null())
1138                .then(lit(NULL))
1139                .otherwise(get_expr.fill_null(lit("")));
1140            return Self::from_expr(expr, None);
1141        }
1142        let delim = delimiter.to_string();
1143        let split_expr = self.expr().clone().str().split(lit(delim));
1144        let index = if part_num > 0 {
1145            lit(part_num - 1)
1146        } else {
1147            lit(part_num)
1148        };
1149        let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
1150        let expr = when(self.expr().clone().is_null())
1151            .then(lit(NULL))
1152            .otherwise(get_expr);
1153        Self::from_expr(expr, None)
1154    }
1155
1156    /// Substring before/after nth delimiter (PySpark substring_index). count > 0: before nth from left; count < 0: after nth from right.
1157    pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
1158        use polars::prelude::*;
1159        let delim = delimiter.to_string();
1160        let split_expr = self.expr().clone().str().split(lit(delim.clone()));
1161        let n = count.unsigned_abs() as i64;
1162        let expr = if count > 0 {
1163            split_expr
1164                .clone()
1165                .list()
1166                .slice(lit(0i64), lit(n))
1167                .list()
1168                .join(lit(delim), false)
1169        } else {
1170            let len = split_expr.clone().list().len();
1171            let start = when(len.clone().gt(lit(n)))
1172                .then(len.clone() - lit(n))
1173                .otherwise(lit(0i64));
1174            let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
1175            split_expr
1176                .list()
1177                .slice(start, slice_len)
1178                .list()
1179                .join(lit(delim), false)
1180        };
1181        Self::from_expr(expr, None)
1182    }
1183
1184    /// Soundex code (PySpark soundex). Implemented via map UDF (strsim/soundex crates).
1185    pub fn soundex(&self) -> Column {
1186        let expr = self.expr().clone().map(
1187            |s| expect_col(crate::udfs::apply_soundex(s)),
1188            |_schema, field| Ok(field.clone()),
1189        );
1190        Self::from_expr(expr, None)
1191    }
1192
1193    /// Levenshtein distance to another string (PySpark levenshtein). Implemented via map_many UDF (strsim).
1194    pub fn levenshtein(&self, other: &Column) -> Column {
1195        let args = [other.expr().clone()];
1196        let expr = self.expr().clone().map_many(
1197            |cols| expect_col(crate::udfs::apply_levenshtein(cols)),
1198            &args,
1199            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
1200        );
1201        Self::from_expr(expr, None)
1202    }
1203
1204    /// CRC32 checksum of string bytes (PySpark crc32). Implemented via map UDF (crc32fast).
1205    pub fn crc32(&self) -> Column {
1206        let expr = self.expr().clone().map(
1207            |s| expect_col(crate::udfs::apply_crc32(s)),
1208            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1209        );
1210        Self::from_expr(expr, None)
1211    }
1212
1213    /// XXH64 hash of string (PySpark xxhash64). Implemented via map UDF (twox-hash).
1214    pub fn xxhash64(&self) -> Column {
1215        let expr = self.expr().clone().map(
1216            |s| expect_col(crate::udfs::apply_xxhash64(s)),
1217            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1218        );
1219        Self::from_expr(expr, None)
1220    }
1221
1222    /// ASCII value of first character (PySpark ascii). Returns Int32.
1223    pub fn ascii(&self) -> Column {
1224        let expr = self.expr().clone().map(
1225            |s| expect_col(crate::udfs::apply_ascii(s)),
1226            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1227        );
1228        Self::from_expr(expr, None)
1229    }
1230
1231    /// Format numeric as string with fixed decimal places (PySpark format_number).
1232    pub fn format_number(&self, decimals: u32) -> Column {
1233        let expr = self.expr().clone().map(
1234            move |s| expect_col(crate::udfs::apply_format_number(s, decimals)),
1235            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1236        );
1237        Self::from_expr(expr, None)
1238    }
1239
1240    /// Int to single-character string (PySpark char / chr). Valid codepoint only.
1241    pub fn char(&self) -> Column {
1242        let expr = self.expr().clone().map(
1243            |s| expect_col(crate::udfs::apply_char(s)),
1244            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1245        );
1246        Self::from_expr(expr, None)
1247    }
1248
1249    /// Alias for char (PySpark chr).
1250    pub fn chr(&self) -> Column {
1251        self.char()
1252    }
1253
1254    /// Base64 encode string bytes (PySpark base64).
1255    pub fn base64(&self) -> Column {
1256        let expr = self.expr().clone().map(
1257            |s| expect_col(crate::udfs::apply_base64(s)),
1258            |_schema, field| Ok(field.clone()),
1259        );
1260        Self::from_expr(expr, None)
1261    }
1262
1263    /// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1264    pub fn unbase64(&self) -> Column {
1265        let expr = self.expr().clone().map(
1266            |s| expect_col(crate::udfs::apply_unbase64(s)),
1267            |_schema, field| Ok(field.clone()),
1268        );
1269        Self::from_expr(expr, None)
1270    }
1271
1272    /// SHA1 hash of string bytes, return hex string (PySpark sha1).
1273    pub fn sha1(&self) -> Column {
1274        let expr = self.expr().clone().map(
1275            |s| expect_col(crate::udfs::apply_sha1(s)),
1276            |_schema, field| Ok(field.clone()),
1277        );
1278        Self::from_expr(expr, None)
1279    }
1280
1281    /// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2). Default 256.
1282    pub fn sha2(&self, bit_length: i32) -> Column {
1283        let expr = self.expr().clone().map(
1284            move |s| expect_col(crate::udfs::apply_sha2(s, bit_length)),
1285            |_schema, field| Ok(field.clone()),
1286        );
1287        Self::from_expr(expr, None)
1288    }
1289
1290    /// MD5 hash of string bytes, return hex string (PySpark md5).
1291    pub fn md5(&self) -> Column {
1292        let expr = self.expr().clone().map(
1293            |s| expect_col(crate::udfs::apply_md5(s)),
1294            |_schema, field| Ok(field.clone()),
1295        );
1296        Self::from_expr(expr, None)
1297    }
1298
1299    /// Replace substring at 1-based position (PySpark overlay). replace is literal string.
1300    pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
1301        use polars::prelude::*;
1302        let pos = pos.max(1);
1303        let replace_len = length.max(0);
1304        let start_left = 0i64;
1305        let len_left = (pos - 1).max(0);
1306        let start_right = (pos - 1 + replace_len).max(0);
1307        let len_right = 1_000_000i64; // "rest of string"
1308        let left = self
1309            .expr()
1310            .clone()
1311            .str()
1312            .slice(lit(start_left), lit(len_left));
1313        let mid = lit(replace.to_string());
1314        let right = self
1315            .expr()
1316            .clone()
1317            .str()
1318            .slice(lit(start_right), lit(len_right));
1319        let exprs = [left, mid, right];
1320        let concat_expr = polars::prelude::concat_str(&exprs, "", false);
1321        Self::from_expr(concat_expr, None)
1322    }
1323
1324    // --- Math functions ---
1325
1326    /// Absolute value (PySpark abs)
1327    pub fn abs(&self) -> Column {
1328        Self::from_expr(self.expr().clone().abs(), None)
1329    }
1330
1331    /// Ceiling (PySpark ceil)
1332    pub fn ceil(&self) -> Column {
1333        Self::from_expr(self.expr().clone().ceil(), None)
1334    }
1335
1336    /// Alias for ceil. PySpark ceiling.
1337    pub fn ceiling(&self) -> Column {
1338        self.ceil()
1339    }
1340
1341    /// Floor (PySpark floor)
1342    pub fn floor(&self) -> Column {
1343        Self::from_expr(self.expr().clone().floor(), None)
1344    }
1345
1346    /// Round to given decimal places (PySpark round). Supports string columns containing
1347    /// numeric values (implicit cast to double then round; parity with PySpark).
1348    pub fn round(&self, decimals: u32) -> Column {
1349        let expr = self.expr().clone().map(
1350            move |s| expect_col(crate::udfs::apply_round(s, decimals)),
1351            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1352        );
1353        Self::from_expr(expr, None)
1354    }
1355
1356    /// Banker's rounding - round half to even (PySpark bround).
1357    pub fn bround(&self, scale: i32) -> Column {
1358        let expr = self.expr().clone().map(
1359            move |s| expect_col(crate::udfs::apply_bround(s, scale)),
1360            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1361        );
1362        Self::from_expr(expr, None)
1363    }
1364
1365    /// Unary minus (PySpark negate, negative).
1366    pub fn negate(&self) -> Column {
1367        use polars::prelude::*;
1368        Self::from_expr(self.expr().clone() * lit(-1), None)
1369    }
1370
1371    /// Multiply with PySpark-style string/number coercion (used by Python Column operators).
1372    ///
1373    /// Both operands are coerced to Double when used from Python; string columns are parsed
1374    /// as doubles where possible, invalid strings become null.
1375    pub fn multiply_pyspark(&self, other: &Column) -> Column {
1376        let args = [other.expr().clone()];
1377        let expr = self.expr().clone().map_many(
1378            |cols| expect_col(crate::udfs::apply_pyspark_multiply(cols)),
1379            &args,
1380            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1381        );
1382        Self::from_expr(expr, None)
1383    }
1384
1385    /// Add with PySpark-style string/number coercion (used by Python Column operators).
1386    pub fn add_pyspark(&self, other: &Column) -> Column {
1387        let args = [other.expr().clone()];
1388        let expr = self.expr().clone().map_many(
1389            |cols| expect_col(crate::udfs::apply_pyspark_add(cols)),
1390            &args,
1391            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1392        );
1393        Self::from_expr(expr, None)
1394    }
1395
1396    /// Subtract with PySpark-style string/number coercion (used by Python Column operators).
1397    pub fn subtract_pyspark(&self, other: &Column) -> Column {
1398        let args = [other.expr().clone()];
1399        let expr = self.expr().clone().map_many(
1400            |cols| expect_col(crate::udfs::apply_pyspark_subtract(cols)),
1401            &args,
1402            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1403        );
1404        Self::from_expr(expr, None)
1405    }
1406
1407    /// Divide with PySpark-style string/number coercion (used by Python Column operators).
1408    pub fn divide_pyspark(&self, other: &Column) -> Column {
1409        let args = [other.expr().clone()];
1410        let expr = self.expr().clone().map_many(
1411            |cols| expect_col(crate::udfs::apply_pyspark_divide(cols)),
1412            &args,
1413            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1414        );
1415        Self::from_expr(expr, None)
1416    }
1417
1418    /// Modulo with PySpark-style string/number coercion (used by Python Column operators).
1419    pub fn mod_pyspark(&self, other: &Column) -> Column {
1420        let args = [other.expr().clone()];
1421        let expr = self.expr().clone().map_many(
1422            |cols| expect_col(crate::udfs::apply_pyspark_mod(cols)),
1423            &args,
1424            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1425        );
1426        Self::from_expr(expr, None)
1427    }
1428
1429    /// Multiply by another column or literal (PySpark multiply). Broadcasts scalars.
1430    pub fn multiply(&self, other: &Column) -> Column {
1431        Self::from_expr(self.expr().clone() * other.expr().clone(), None)
1432    }
1433
1434    /// Add another column or literal (PySpark +). Broadcasts scalars.
1435    pub fn add(&self, other: &Column) -> Column {
1436        Self::from_expr(self.expr().clone() + other.expr().clone(), None)
1437    }
1438
1439    /// Subtract another column or literal (PySpark -). Broadcasts scalars.
1440    pub fn subtract(&self, other: &Column) -> Column {
1441        Self::from_expr(self.expr().clone() - other.expr().clone(), None)
1442    }
1443
1444    /// Divide by another column or literal (PySpark /). Broadcasts scalars.
1445    pub fn divide(&self, other: &Column) -> Column {
1446        Self::from_expr(self.expr().clone() / other.expr().clone(), None)
1447    }
1448
1449    /// Modulo (PySpark %). Broadcasts scalars.
1450    pub fn mod_(&self, other: &Column) -> Column {
1451        Self::from_expr(self.expr().clone() % other.expr().clone(), None)
1452    }
1453
1454    /// Square root (PySpark sqrt)
1455    pub fn sqrt(&self) -> Column {
1456        Self::from_expr(self.expr().clone().sqrt(), None)
1457    }
1458
1459    /// Power (PySpark pow). Exponent can be literal or expression.
1460    pub fn pow(&self, exp: i64) -> Column {
1461        use polars::prelude::*;
1462        Self::from_expr(self.expr().clone().pow(lit(exp)), None)
1463    }
1464
1465    /// Power with column or scalar exponent (for __pow__ / col ** other).
1466    pub fn pow_with(&self, exponent: &Column) -> Column {
1467        Self::from_expr(self.expr().clone().pow(exponent.expr().clone()), None)
1468    }
1469
1470    /// Alias for pow. PySpark power.
1471    pub fn power(&self, exp: i64) -> Column {
1472        self.pow(exp)
1473    }
1474
1475    /// Exponential (PySpark exp)
1476    pub fn exp(&self) -> Column {
1477        Self::from_expr(self.expr().clone().exp(), None)
1478    }
1479
1480    /// Natural logarithm (PySpark log)
1481    pub fn log(&self) -> Column {
1482        Self::from_expr(self.expr().clone().log(lit(std::f64::consts::E)), None)
1483    }
1484
1485    /// Alias for log. PySpark ln.
1486    pub fn ln(&self) -> Column {
1487        self.log()
1488    }
1489
1490    /// Sine (radians). PySpark sin.
1491    pub fn sin(&self) -> Column {
1492        let expr = self.expr().clone().map(
1493            |s| expect_col(crate::udfs::apply_sin(s)),
1494            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1495        );
1496        Self::from_expr(expr, None)
1497    }
1498
1499    /// Cosine (radians). PySpark cos.
1500    pub fn cos(&self) -> Column {
1501        let expr = self.expr().clone().map(
1502            |s| expect_col(crate::udfs::apply_cos(s)),
1503            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1504        );
1505        Self::from_expr(expr, None)
1506    }
1507
1508    /// Tangent (radians). PySpark tan.
1509    pub fn tan(&self) -> Column {
1510        let expr = self.expr().clone().map(
1511            |s| expect_col(crate::udfs::apply_tan(s)),
1512            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1513        );
1514        Self::from_expr(expr, None)
1515    }
1516
1517    /// Cotangent: 1/tan (PySpark cot).
1518    pub fn cot(&self) -> Column {
1519        let expr = self.expr().clone().map(
1520            |s| expect_col(crate::udfs::apply_cot(s)),
1521            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1522        );
1523        Self::from_expr(expr, None)
1524    }
1525
1526    /// Cosecant: 1/sin (PySpark csc).
1527    pub fn csc(&self) -> Column {
1528        let expr = self.expr().clone().map(
1529            |s| expect_col(crate::udfs::apply_csc(s)),
1530            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1531        );
1532        Self::from_expr(expr, None)
1533    }
1534
1535    /// Secant: 1/cos (PySpark sec).
1536    pub fn sec(&self) -> Column {
1537        let expr = self.expr().clone().map(
1538            |s| expect_col(crate::udfs::apply_sec(s)),
1539            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1540        );
1541        Self::from_expr(expr, None)
1542    }
1543
1544    /// Arc sine. PySpark asin.
1545    pub fn asin(&self) -> Column {
1546        let expr = self.expr().clone().map(
1547            |s| expect_col(crate::udfs::apply_asin(s)),
1548            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1549        );
1550        Self::from_expr(expr, None)
1551    }
1552
1553    /// Arc cosine. PySpark acos.
1554    pub fn acos(&self) -> Column {
1555        let expr = self.expr().clone().map(
1556            |s| expect_col(crate::udfs::apply_acos(s)),
1557            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1558        );
1559        Self::from_expr(expr, None)
1560    }
1561
1562    /// Arc tangent. PySpark atan.
1563    pub fn atan(&self) -> Column {
1564        let expr = self.expr().clone().map(
1565            |s| expect_col(crate::udfs::apply_atan(s)),
1566            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1567        );
1568        Self::from_expr(expr, None)
1569    }
1570
1571    /// Two-argument arc tangent (y, x) -> angle in radians. PySpark atan2.
1572    pub fn atan2(&self, x: &Column) -> Column {
1573        let args = [x.expr().clone()];
1574        let expr = self.expr().clone().map_many(
1575            |cols| expect_col(crate::udfs::apply_atan2(cols)),
1576            &args,
1577            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1578        );
1579        Self::from_expr(expr, None)
1580    }
1581
1582    /// Convert radians to degrees. PySpark degrees.
1583    pub fn degrees(&self) -> Column {
1584        let expr = self.expr().clone().map(
1585            |s| expect_col(crate::udfs::apply_degrees(s)),
1586            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1587        );
1588        Self::from_expr(expr, None)
1589    }
1590
1591    /// Alias for degrees. PySpark toDegrees.
1592    pub fn to_degrees(&self) -> Column {
1593        self.degrees()
1594    }
1595
1596    /// Convert degrees to radians. PySpark radians.
1597    pub fn radians(&self) -> Column {
1598        let expr = self.expr().clone().map(
1599            |s| expect_col(crate::udfs::apply_radians(s)),
1600            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1601        );
1602        Self::from_expr(expr, None)
1603    }
1604
1605    /// Alias for radians. PySpark toRadians.
1606    pub fn to_radians(&self) -> Column {
1607        self.radians()
1608    }
1609
1610    /// Sign of the number (-1, 0, or 1). PySpark signum.
1611    pub fn signum(&self) -> Column {
1612        let expr = self.expr().clone().map(
1613            |s| expect_col(crate::udfs::apply_signum(s)),
1614            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1615        );
1616        Self::from_expr(expr, None)
1617    }
1618
1619    /// Hyperbolic cosine. PySpark cosh.
1620    pub fn cosh(&self) -> Column {
1621        let expr = self.expr().clone().map(
1622            |s| expect_col(crate::udfs::apply_cosh(s)),
1623            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1624        );
1625        Self::from_expr(expr, None)
1626    }
1627    /// Hyperbolic sine. PySpark sinh.
1628    pub fn sinh(&self) -> Column {
1629        let expr = self.expr().clone().map(
1630            |s| expect_col(crate::udfs::apply_sinh(s)),
1631            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1632        );
1633        Self::from_expr(expr, None)
1634    }
1635    /// Hyperbolic tangent. PySpark tanh.
1636    pub fn tanh(&self) -> Column {
1637        let expr = self.expr().clone().map(
1638            |s| expect_col(crate::udfs::apply_tanh(s)),
1639            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1640        );
1641        Self::from_expr(expr, None)
1642    }
1643    /// Inverse hyperbolic cosine. PySpark acosh.
1644    pub fn acosh(&self) -> Column {
1645        let expr = self.expr().clone().map(
1646            |s| expect_col(crate::udfs::apply_acosh(s)),
1647            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1648        );
1649        Self::from_expr(expr, None)
1650    }
1651    /// Inverse hyperbolic sine. PySpark asinh.
1652    pub fn asinh(&self) -> Column {
1653        let expr = self.expr().clone().map(
1654            |s| expect_col(crate::udfs::apply_asinh(s)),
1655            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1656        );
1657        Self::from_expr(expr, None)
1658    }
1659    /// Inverse hyperbolic tangent. PySpark atanh.
1660    pub fn atanh(&self) -> Column {
1661        let expr = self.expr().clone().map(
1662            |s| expect_col(crate::udfs::apply_atanh(s)),
1663            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1664        );
1665        Self::from_expr(expr, None)
1666    }
1667    /// Cube root. PySpark cbrt.
1668    pub fn cbrt(&self) -> Column {
1669        let expr = self.expr().clone().map(
1670            |s| expect_col(crate::udfs::apply_cbrt(s)),
1671            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1672        );
1673        Self::from_expr(expr, None)
1674    }
1675    /// exp(x) - 1. PySpark expm1.
1676    pub fn expm1(&self) -> Column {
1677        let expr = self.expr().clone().map(
1678            |s| expect_col(crate::udfs::apply_expm1(s)),
1679            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1680        );
1681        Self::from_expr(expr, None)
1682    }
1683    /// log(1 + x). PySpark log1p.
1684    pub fn log1p(&self) -> Column {
1685        let expr = self.expr().clone().map(
1686            |s| expect_col(crate::udfs::apply_log1p(s)),
1687            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1688        );
1689        Self::from_expr(expr, None)
1690    }
1691    /// Base-10 logarithm. PySpark log10.
1692    pub fn log10(&self) -> Column {
1693        let expr = self.expr().clone().map(
1694            |s| expect_col(crate::udfs::apply_log10(s)),
1695            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1696        );
1697        Self::from_expr(expr, None)
1698    }
1699    /// Base-2 logarithm. PySpark log2.
1700    pub fn log2(&self) -> Column {
1701        let expr = self.expr().clone().map(
1702            |s| expect_col(crate::udfs::apply_log2(s)),
1703            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1704        );
1705        Self::from_expr(expr, None)
1706    }
1707    /// Round to nearest integer. PySpark rint.
1708    pub fn rint(&self) -> Column {
1709        let expr = self.expr().clone().map(
1710            |s| expect_col(crate::udfs::apply_rint(s)),
1711            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1712        );
1713        Self::from_expr(expr, None)
1714    }
1715
1716    /// sqrt(x^2 + y^2). PySpark hypot.
1717    pub fn hypot(&self, other: &Column) -> Column {
1718        let xx = self.expr().clone() * self.expr().clone();
1719        let yy = other.expr().clone() * other.expr().clone();
1720        Self::from_expr((xx + yy).sqrt(), None)
1721    }
1722
1723    /// Cast to the given type (PySpark cast). Fails on invalid conversion.
1724    pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
1725        crate::functions::cast(self, type_name)
1726    }
1727
1728    /// Cast to the given type, null on invalid conversion (PySpark try_cast).
1729    pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
1730        crate::functions::try_cast(self, type_name)
1731    }
1732
1733    /// True where the float value is NaN (PySpark isnan).
1734    pub fn is_nan(&self) -> Column {
1735        Self::from_expr(self.expr().clone().is_nan(), None)
1736    }
1737
1738    // --- Datetime functions ---
1739
1740    /// Extract year from datetime column (PySpark year)
1741    pub fn year(&self) -> Column {
1742        Self::from_expr(self.expr().clone().dt().year(), None)
1743    }
1744
1745    /// Extract month from datetime column (PySpark month)
1746    pub fn month(&self) -> Column {
1747        Self::from_expr(self.expr().clone().dt().month(), None)
1748    }
1749
1750    /// Extract day of month from datetime column (PySpark day)
1751    pub fn day(&self) -> Column {
1752        Self::from_expr(self.expr().clone().dt().day(), None)
1753    }
1754
1755    /// Alias for day. PySpark dayofmonth.
1756    pub fn dayofmonth(&self) -> Column {
1757        self.day()
1758    }
1759
1760    /// Extract quarter (1-4) from date/datetime column (PySpark quarter).
1761    pub fn quarter(&self) -> Column {
1762        Self::from_expr(self.expr().clone().dt().quarter(), None)
1763    }
1764
1765    /// Extract ISO week of year (1-53) (PySpark weekofyear / week).
1766    pub fn weekofyear(&self) -> Column {
1767        Self::from_expr(self.expr().clone().dt().week(), None)
1768    }
1769
1770    /// Alias for weekofyear (PySpark week).
1771    pub fn week(&self) -> Column {
1772        self.weekofyear()
1773    }
1774
1775    /// Day of week: 1 = Sunday, 2 = Monday, ..., 7 = Saturday (PySpark dayofweek).
1776    /// Polars weekday is Mon=1..Sun=7; we convert to Sun=1..Sat=7.
1777    pub fn dayofweek(&self) -> Column {
1778        let w = self.expr().clone().dt().weekday();
1779        let dayofweek = (w % lit(7i32)) + lit(1i32); // 7->1 (Sun), 1->2 (Mon), ..., 6->7 (Sat)
1780        Self::from_expr(dayofweek, None)
1781    }
1782
1783    /// Day of year (1-366) (PySpark dayofyear).
1784    pub fn dayofyear(&self) -> Column {
1785        Self::from_expr(
1786            self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
1787            None,
1788        )
1789    }
1790
1791    /// Cast to date (PySpark to_date). Drops time component from datetime/timestamp.
1792    pub fn to_date(&self) -> Column {
1793        use polars::prelude::DataType;
1794        Self::from_expr(self.expr().clone().cast(DataType::Date), None)
1795    }
1796
1797    /// Format date/datetime as string (PySpark date_format). Uses chrono strftime format.
1798    pub fn date_format(&self, format: &str) -> Column {
1799        Self::from_expr(self.expr().clone().dt().strftime(format), None)
1800    }
1801
1802    /// Extract hour from datetime column (PySpark hour). Accepts string timestamp (#403).
1803    pub fn hour(&self) -> Column {
1804        let expr = self.expr().clone().map(
1805            |s| expect_col(crate::udfs::apply_hour(s)),
1806            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1807        );
1808        Self::from_expr(expr, None)
1809    }
1810
1811    /// Extract minute from datetime column (PySpark minute). Accepts string timestamp (#403).
1812    pub fn minute(&self) -> Column {
1813        let expr = self.expr().clone().map(
1814            |s| expect_col(crate::udfs::apply_minute(s)),
1815            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1816        );
1817        Self::from_expr(expr, None)
1818    }
1819
1820    /// Extract second from datetime column (PySpark second). Accepts string timestamp (#403).
1821    pub fn second(&self) -> Column {
1822        let expr = self.expr().clone().map(
1823            |s| expect_col(crate::udfs::apply_second(s)),
1824            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1825        );
1826        Self::from_expr(expr, None)
1827    }
1828
1829    /// Extract field from date/datetime (PySpark extract). field: "year","month","day","hour","minute","second","quarter","week","dayofweek","dayofyear".
1830    pub fn extract(&self, field: &str) -> Column {
1831        use polars::prelude::*;
1832        let e = self.expr().clone();
1833        let expr = match field.trim().to_lowercase().as_str() {
1834            "year" => e.dt().year(),
1835            "month" => e.dt().month(),
1836            "day" => e.dt().day(),
1837            "hour" => e.dt().hour(),
1838            "minute" => e.dt().minute(),
1839            "second" => e.dt().second(),
1840            "quarter" => e.dt().quarter(),
1841            "week" | "weekofyear" => e.dt().week(),
1842            "dayofweek" | "dow" => {
1843                let w = e.dt().weekday();
1844                (w % lit(7i32)) + lit(1i32)
1845            }
1846            "dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
1847            _ => e.dt().year(), // fallback
1848        };
1849        Self::from_expr(expr, None)
1850    }
1851
1852    /// Timestamp to microseconds since epoch (PySpark unix_micros).
1853    pub fn unix_micros(&self) -> Column {
1854        use polars::prelude::*;
1855        Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
1856    }
1857
1858    /// Timestamp to milliseconds since epoch (PySpark unix_millis).
1859    pub fn unix_millis(&self) -> Column {
1860        use polars::prelude::*;
1861        let micros = self.expr().clone().cast(DataType::Int64);
1862        Self::from_expr(micros / lit(1000i64), None)
1863    }
1864
1865    /// Timestamp to seconds since epoch (PySpark unix_seconds).
1866    pub fn unix_seconds(&self) -> Column {
1867        use polars::prelude::*;
1868        let micros = self.expr().clone().cast(DataType::Int64);
1869        Self::from_expr(micros / lit(1_000_000i64), None)
1870    }
1871
1872    /// Weekday name "Mon","Tue",... (PySpark dayname).
1873    pub fn dayname(&self) -> Column {
1874        let expr = self.expr().clone().map(
1875            |s| expect_col(crate::udfs::apply_dayname(s)),
1876            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1877        );
1878        Self::from_expr(expr, None)
1879    }
1880
1881    /// Weekday 0=Mon, 6=Sun (PySpark weekday).
1882    pub fn weekday(&self) -> Column {
1883        let expr = self.expr().clone().map(
1884            |s| expect_col(crate::udfs::apply_weekday(s)),
1885            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1886        );
1887        Self::from_expr(expr, None)
1888    }
1889
1890    /// Add n days to date/datetime column (PySpark date_add).
1891    pub fn date_add(&self, n: i32) -> Column {
1892        use polars::prelude::*;
1893        let date_expr = self.expr().clone().cast(DataType::Date);
1894        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
1895        Self::from_expr(date_expr + dur, None)
1896    }
1897
1898    /// Subtract n days from date/datetime column (PySpark date_sub).
1899    pub fn date_sub(&self, n: i32) -> Column {
1900        use polars::prelude::*;
1901        let date_expr = self.expr().clone().cast(DataType::Date);
1902        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
1903        Self::from_expr(date_expr - dur, None)
1904    }
1905
1906    /// Number of days between two date/datetime columns (PySpark datediff). (end - start).
1907    pub fn datediff(&self, other: &Column) -> Column {
1908        use polars::prelude::*;
1909        let start = self.expr().clone().cast(DataType::Date);
1910        let end = other.expr().clone().cast(DataType::Date);
1911        Self::from_expr((end - start).dt().total_days(false), None)
1912    }
1913
1914    /// Last day of the month for date/datetime column (PySpark last_day).
1915    pub fn last_day(&self) -> Column {
1916        Self::from_expr(self.expr().clone().dt().month_end(), None)
1917    }
1918
1919    /// Add amount of unit to timestamp (PySpark timestampadd). unit: DAY, HOUR, MINUTE, SECOND, etc.
1920    pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
1921        use polars::prelude::*;
1922        let ts = self.expr().clone();
1923        let amt = amount.expr().clone().cast(DataType::Int64);
1924        let dur = match unit.trim().to_uppercase().as_str() {
1925            "DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
1926            "HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
1927            "MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
1928            "SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
1929            "WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
1930            _ => duration(DurationArgs::new().with_days(amt)),
1931        };
1932        Self::from_expr(ts + dur, None)
1933    }
1934
1935    /// Difference between timestamps in given unit (PySpark timestampdiff). unit: DAY, HOUR, MINUTE, SECOND.
1936    pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
1937        let start = self.expr().clone();
1938        let end = other.expr().clone();
1939        let diff = end - start;
1940        let expr = match unit.trim().to_uppercase().as_str() {
1941            "HOUR" | "HOURS" => diff.dt().total_hours(false),
1942            "MINUTE" | "MINUTES" => diff.dt().total_minutes(false),
1943            "SECOND" | "SECONDS" => diff.dt().total_seconds(false),
1944            "DAY" | "DAYS" => diff.dt().total_days(false),
1945            _ => diff.dt().total_days(false),
1946        };
1947        Self::from_expr(expr, None)
1948    }
1949
1950    /// Interpret timestamp as UTC, convert to target timezone (PySpark from_utc_timestamp).
1951    pub fn from_utc_timestamp(&self, tz: &str) -> Column {
1952        let tz = tz.to_string();
1953        let expr = self.expr().clone().map(
1954            move |s| expect_col(crate::udfs::apply_from_utc_timestamp(s, &tz)),
1955            |_schema, field| Ok(field.clone()),
1956        );
1957        Self::from_expr(expr, None)
1958    }
1959
1960    /// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1961    pub fn to_utc_timestamp(&self, tz: &str) -> Column {
1962        let tz = tz.to_string();
1963        let expr = self.expr().clone().map(
1964            move |s| expect_col(crate::udfs::apply_to_utc_timestamp(s, &tz)),
1965            |_schema, field| Ok(field.clone()),
1966        );
1967        Self::from_expr(expr, None)
1968    }
1969
1970    /// Truncate date/datetime to unit (e.g. "mo", "wk", "day"). PySpark trunc.
1971    pub fn trunc(&self, format: &str) -> Column {
1972        use polars::prelude::*;
1973        Self::from_expr(
1974            self.expr().clone().dt().truncate(lit(format.to_string())),
1975            None,
1976        )
1977    }
1978
1979    /// Add n months to date/datetime column (PySpark add_months). Month-aware.
1980    pub fn add_months(&self, n: i32) -> Column {
1981        let expr = self.expr().clone().map(
1982            move |col| expect_col(crate::udfs::apply_add_months(col, n)),
1983            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1984        );
1985        Self::from_expr(expr, None)
1986    }
1987
1988    /// Number of months between end and start dates, as fractional (PySpark months_between).
1989    /// When round_off is true, rounds to 8 decimal places (PySpark default).
1990    pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
1991        let args = [start.expr().clone()];
1992        let expr = self.expr().clone().map_many(
1993            move |cols| expect_col(crate::udfs::apply_months_between(cols, round_off)),
1994            &args,
1995            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1996        );
1997        Self::from_expr(expr, None)
1998    }
1999
2000    /// Next date that is the given day of week (e.g. "Mon", "Tue") (PySpark next_day).
2001    pub fn next_day(&self, day_of_week: &str) -> Column {
2002        let day = day_of_week.to_string();
2003        let expr = self.expr().clone().map(
2004            move |col| expect_col(crate::udfs::apply_next_day(col, &day)),
2005            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2006        );
2007        Self::from_expr(expr, None)
2008    }
2009
2010    /// Parse string timestamp to seconds since epoch (PySpark unix_timestamp).
2011    pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
2012        let fmt = format.map(String::from);
2013        let expr = self.expr().clone().map(
2014            move |col| expect_col(crate::udfs::apply_unix_timestamp(col, fmt.as_deref())),
2015            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2016        );
2017        Self::from_expr(expr, None)
2018    }
2019
2020    /// Convert seconds since epoch to formatted string (PySpark from_unixtime).
2021    pub fn from_unixtime(&self, format: Option<&str>) -> Column {
2022        let fmt = format.map(String::from);
2023        let expr = self.expr().clone().map(
2024            move |col| expect_col(crate::udfs::apply_from_unixtime(col, fmt.as_deref())),
2025            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2026        );
2027        Self::from_expr(expr, None)
2028    }
2029
2030    /// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
2031    pub fn timestamp_seconds(&self) -> Column {
2032        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
2033            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2034        Self::from_expr(expr, None)
2035    }
2036
2037    /// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
2038    pub fn timestamp_millis(&self) -> Column {
2039        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
2040            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2041        Self::from_expr(expr, None)
2042    }
2043
2044    /// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
2045    pub fn timestamp_micros(&self) -> Column {
2046        let expr = self
2047            .expr()
2048            .clone()
2049            .cast(DataType::Int64)
2050            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2051        Self::from_expr(expr, None)
2052    }
2053
2054    /// Date to days since 1970-01-01 (PySpark unix_date).
2055    pub fn unix_date(&self) -> Column {
2056        let expr = self.expr().clone().map(
2057            |s| expect_col(crate::udfs::apply_unix_date(s)),
2058            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2059        );
2060        Self::from_expr(expr, None)
2061    }
2062
2063    /// Days since epoch to date (PySpark date_from_unix_date).
2064    pub fn date_from_unix_date(&self) -> Column {
2065        let expr = self.expr().clone().map(
2066            |s| expect_col(crate::udfs::apply_date_from_unix_date(s)),
2067            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2068        );
2069        Self::from_expr(expr, None)
2070    }
2071
2072    /// Positive modulus (PySpark pmod). Column method: pmod(self, other).
2073    pub fn pmod(&self, divisor: &Column) -> Column {
2074        let args = [divisor.expr().clone()];
2075        let expr = self.expr().clone().map_many(
2076            |cols| expect_col(crate::udfs::apply_pmod(cols)),
2077            &args,
2078            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
2079        );
2080        Self::from_expr(expr, None)
2081    }
2082
2083    /// Factorial n! for n in 0..=20 (PySpark factorial).
2084    pub fn factorial(&self) -> Column {
2085        let expr = self.expr().clone().map(
2086            |s| expect_col(crate::udfs::apply_factorial(s)),
2087            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2088        );
2089        Self::from_expr(expr, None)
2090    }
2091
2092    // --- Window functions ---
2093
2094    /// Apply window partitioning. Returns a new Column with `.over(partition_by)`.
2095    /// Use after rank(), dense_rank(), row_number(), lag(), lead().
2096    pub fn over(&self, partition_by: &[&str]) -> Column {
2097        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2098        Self::from_expr(self.expr().clone().over(partition_exprs), None)
2099    }
2100
2101    /// Rank (with ties, gaps). Use with `.over(partition_by)`.
2102    pub fn rank(&self, descending: bool) -> Column {
2103        let opts = RankOptions {
2104            method: RankMethod::Min,
2105            descending,
2106        };
2107        Self::from_expr(self.expr().clone().rank(opts, None), None)
2108    }
2109
2110    /// Dense rank (no gaps). Use with `.over(partition_by)`.
2111    pub fn dense_rank(&self, descending: bool) -> Column {
2112        let opts = RankOptions {
2113            method: RankMethod::Dense,
2114            descending,
2115        };
2116        Self::from_expr(self.expr().clone().rank(opts, None), None)
2117    }
2118
2119    /// Row number (1, 2, 3 by this column's order). Use with `.over(partition_by)`.
2120    pub fn row_number(&self, descending: bool) -> Column {
2121        let opts = RankOptions {
2122            method: RankMethod::Ordinal,
2123            descending,
2124        };
2125        Self::from_expr(self.expr().clone().rank(opts, None), None)
2126    }
2127
2128    /// Lag: value from n rows before. Use with `.over(partition_by)`.
2129    pub fn lag(&self, n: i64) -> Column {
2130        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
2131    }
2132
2133    /// Lead: value from n rows after. Use with `.over(partition_by)`.
2134    pub fn lead(&self, n: i64) -> Column {
2135        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
2136    }
2137
2138    /// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2139    pub fn first_value(&self) -> Column {
2140        Self::from_expr(self.expr().clone().first(), None)
2141    }
2142
2143    /// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2144    pub fn last_value(&self) -> Column {
2145        Self::from_expr(self.expr().clone().last(), None)
2146    }
2147
2148    /// Percent rank in partition: (rank - 1) / (count - 1). Window is applied; do not call .over() again.
2149    pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
2150        use polars::prelude::*;
2151        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2152        let opts = RankOptions {
2153            method: RankMethod::Min,
2154            descending,
2155        };
2156        let rank_expr = self
2157            .expr()
2158            .clone()
2159            .rank(opts, None)
2160            .over(partition_exprs.clone());
2161        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2162        let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
2163        let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
2164        // Avoid division by zero: single-row partition -> 0.0 (PySpark parity)
2165        let pct = when(count_f.clone().gt(lit(1.0)))
2166            .then(rank_f / count_f)
2167            .otherwise(lit(0.0));
2168        Self::from_expr(pct, None)
2169    }
2170
2171    /// Cumulative distribution in partition: row_number / count. Window is applied; do not call .over() again.
2172    pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
2173        use polars::prelude::*;
2174        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2175        let opts = RankOptions {
2176            method: RankMethod::Ordinal,
2177            descending,
2178        };
2179        let row_num = self
2180            .expr()
2181            .clone()
2182            .rank(opts, None)
2183            .over(partition_exprs.clone());
2184        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2185        // Avoid division by zero when partition is empty
2186        let count_f = count_expr.clone().cast(DataType::Float64);
2187        let cume = when(count_f.clone().eq(lit(0.0)))
2188            .then(lit(0.0))
2189            .otherwise(row_num.cast(DataType::Float64) / count_f);
2190        Self::from_expr(cume, None)
2191    }
2192
2193    /// Ntile: bucket 1..n by rank within partition (ceil(rank * n / count)). Window is applied; do not call .over() again.
2194    pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
2195        use polars::prelude::*;
2196        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2197        let opts = RankOptions {
2198            method: RankMethod::Ordinal,
2199            descending,
2200        };
2201        let rank_expr = self
2202            .expr()
2203            .clone()
2204            .rank(opts, None)
2205            .over(partition_exprs.clone());
2206        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2207        let n_expr = lit(n as f64);
2208        let rank_f = rank_expr.cast(DataType::Float64);
2209        let count_f = count_expr.cast(DataType::Float64);
2210        // Avoid division by zero when partition is empty: use bucket 1
2211        let bucket = when(count_f.clone().eq(lit(0.0)))
2212            .then(lit(1.0))
2213            .otherwise((rank_f * n_expr / count_f).ceil());
2214        let clamped = bucket.clip(lit(1.0), lit(n as f64));
2215        Self::from_expr(clamped.cast(DataType::Int32), None)
2216    }
2217
2218    /// Nth value in partition by order (1-based n). Returns a Column with window already applied; do not call .over() again.
2219    pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
2220        use polars::prelude::*;
2221        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2222        let opts = RankOptions {
2223            method: RankMethod::Ordinal,
2224            descending,
2225        };
2226        let rank_expr = self
2227            .expr()
2228            .clone()
2229            .rank(opts, None)
2230            .over(partition_exprs.clone());
2231        let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
2232        let null_col = Self::from_expr(lit(NULL), None);
2233        let value_col = Self::from_expr(self.expr().clone(), None);
2234        let when_expr = crate::functions::when(&cond_col)
2235            .then(&value_col)
2236            .otherwise(&null_col)
2237            .into_expr();
2238        let windowed = when_expr.max().over(partition_exprs);
2239        Self::from_expr(windowed, None)
2240    }
2241
2242    /// Number of elements in list (PySpark size / array_size). Returns Int32.
2243    pub fn array_size(&self) -> Column {
2244        use polars::prelude::*;
2245        Self::from_expr(
2246            self.expr().clone().list().len().cast(DataType::Int32),
2247            Some("size".to_string()),
2248        )
2249    }
2250
2251    /// Cardinality: number of elements in array/list (PySpark cardinality). Alias for array_size.
2252    pub fn cardinality(&self) -> Column {
2253        self.array_size()
2254    }
2255
2256    /// Check if list contains value (PySpark array_contains).
2257    pub fn array_contains(&self, value: Expr) -> Column {
2258        Self::from_expr(self.expr().clone().list().contains(value, false), None)
2259    }
2260
2261    /// Join list of strings with separator (PySpark array_join).
2262    pub fn array_join(&self, separator: &str) -> Column {
2263        use polars::prelude::*;
2264        Self::from_expr(
2265            self.expr()
2266                .clone()
2267                .list()
2268                .join(lit(separator.to_string()), false),
2269            None,
2270        )
2271    }
2272
2273    /// Maximum element in list (PySpark array_max).
2274    pub fn array_max(&self) -> Column {
2275        Self::from_expr(self.expr().clone().list().max(), None)
2276    }
2277
2278    /// Minimum element in list (PySpark array_min).
2279    pub fn array_min(&self) -> Column {
2280        Self::from_expr(self.expr().clone().list().min(), None)
2281    }
2282
2283    /// Get element at 1-based index (PySpark element_at). Returns null if out of bounds.
2284    pub fn element_at(&self, index: i64) -> Column {
2285        use polars::prelude::*;
2286        // PySpark uses 1-based indexing; Polars uses 0-based. index 1 -> get(0).
2287        let idx = if index >= 1 { index - 1 } else { index };
2288        Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
2289    }
2290
2291    /// Get element at 0-based index (PySpark Column.getItem). Returns null if out of bounds.
2292    pub fn get_item(&self, index: i64) -> Column {
2293        use polars::prelude::*;
2294        Self::from_expr(self.expr().clone().list().get(lit(index), true), None)
2295    }
2296
2297    /// Get struct field by name (PySpark Column.getField).
2298    pub fn get_field(&self, name: &str) -> Column {
2299        Self::from_expr(
2300            self.expr().clone().struct_().field_by_name(name),
2301            Some(name.to_string()),
2302        )
2303    }
2304
2305    /// Add or replace a struct field (PySpark Column.withField).
2306    ///
2307    /// Panics if the column is not a struct type. If you need error handling, use
2308    /// [`Column::try_with_field`].
2309    pub fn with_field(&self, name: &str, value: &Column) -> Column {
2310        self.try_with_field(name, value)
2311            .expect("with_field: column must be struct type")
2312    }
2313
2314    /// Add or replace a struct field (PySpark Column.withField), returning an error if the
2315    /// column is not a struct type. Uses a map_many UDF so we don't rely on Polars "*" wildcard
2316    /// (removed in 0.53).
2317    pub fn try_with_field(
2318        &self,
2319        name: &str,
2320        value: &Column,
2321    ) -> Result<Column, polars::error::PolarsError> {
2322        let name = name.to_string();
2323        let args = [value.expr().clone()];
2324        let expr = self.expr().clone().map_many(
2325            move |cols| {
2326                // map_many passes [self, ...args]: self is struct, args[0] is value
2327                expect_col(crate::udfs::apply_struct_with_field(
2328                    cols[0].clone(),
2329                    cols[1].clone(),
2330                    &name,
2331                ))
2332            },
2333            &args,
2334            |_schema, fields| Ok(fields[0].clone()),
2335        );
2336        Ok(Self::from_expr(expr, None))
2337    }
2338
2339    /// Sort list elements (PySpark array_sort). Ascending, nulls last.
2340    pub fn array_sort(&self) -> Column {
2341        use polars::prelude::SortOptions;
2342        let opts = SortOptions {
2343            descending: false,
2344            nulls_last: true,
2345            ..Default::default()
2346        };
2347        Self::from_expr(self.expr().clone().list().sort(opts), None)
2348    }
2349
2350    /// Distinct elements in list (PySpark array_distinct). Preserves first-occurrence order.
2351    pub fn array_distinct(&self) -> Column {
2352        let expr = self.expr().clone().map(
2353            |s| expect_col(crate::udfs::apply_array_distinct_first_order(s)),
2354            |_schema, field| Ok(field.clone()),
2355        );
2356        Self::from_expr(expr, None)
2357    }
2358
2359    /// Mode aggregation - most frequent value (PySpark mode).
2360    /// Uses value_counts sorted by count descending, then first.
2361    pub fn mode(&self) -> Column {
2362        // value_counts(sort=true, parallel=false, name="count", normalize=false)
2363        // puts highest count first; first() gives the mode
2364        // Struct has "count" and value field; field 0 is typically the value
2365        let vc = self
2366            .expr()
2367            .clone()
2368            .value_counts(true, false, "count", false);
2369        let first_struct = vc.first();
2370        let val_expr = first_struct.struct_().field_by_index(0);
2371        Self::from_expr(val_expr, Some("mode".to_string()))
2372    }
2373
2374    /// Slice list from start with optional length (PySpark slice). 1-based start.
2375    pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
2376        use polars::prelude::*;
2377        let start_expr = lit((start - 1).max(0)); // 1-based to 0-based
2378        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
2379        Self::from_expr(
2380            self.expr().clone().list().slice(start_expr, length_expr),
2381            None,
2382        )
2383    }
2384
2385    /// Explode list into one row per element (PySpark explode).
2386    pub fn explode(&self) -> Column {
2387        use polars::prelude::ExplodeOptions;
2388        Self::from_expr(
2389            self.expr().clone().explode(ExplodeOptions {
2390                empty_as_null: false,
2391                keep_nulls: false,
2392            }),
2393            None,
2394        )
2395    }
2396
2397    /// Explode list; null/empty produces one row with null (PySpark explode_outer).
2398    pub fn explode_outer(&self) -> Column {
2399        use polars::prelude::ExplodeOptions;
2400        Self::from_expr(
2401            self.expr().clone().explode(ExplodeOptions {
2402                empty_as_null: true,
2403                keep_nulls: true,
2404            }),
2405            None,
2406        )
2407    }
2408
2409    /// Posexplode with null preservation (PySpark posexplode_outer).
2410    pub fn posexplode_outer(&self) -> (Column, Column) {
2411        self.posexplode()
2412    }
2413
2414    /// Zip two arrays element-wise into array of structs (PySpark arrays_zip).
2415    pub fn arrays_zip(&self, other: &Column) -> Column {
2416        let args = [other.expr().clone()];
2417        let expr = self.expr().clone().map_many(
2418            |cols| expect_col(crate::udfs::apply_arrays_zip(cols)),
2419            &args,
2420            |_schema, fields| Ok(fields[0].clone()),
2421        );
2422        Self::from_expr(expr, None)
2423    }
2424
2425    /// True if two arrays have any element in common (PySpark arrays_overlap).
2426    pub fn arrays_overlap(&self, other: &Column) -> Column {
2427        let args = [other.expr().clone()];
2428        let expr = self.expr().clone().map_many(
2429            |cols| expect_col(crate::udfs::apply_arrays_overlap(cols)),
2430            &args,
2431            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
2432        );
2433        Self::from_expr(expr, None)
2434    }
2435
2436    /// Collect to array (PySpark array_agg). Alias for implode in group context.
2437    pub fn array_agg(&self) -> Column {
2438        Self::from_expr(self.expr().clone().implode(), None)
2439    }
2440
2441    /// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2442    /// Uses Polars list.eval with col("") as element (requires polars list_eval feature).
2443    pub fn array_position(&self, value: Expr) -> Column {
2444        use polars::prelude::{DataType, NULL};
2445        // In list.eval context, col("") refers to the current list element.
2446        let cond = Self::from_expr(col("").eq(value), None);
2447        let then_val = Self::from_expr(col("").cum_count(false), None);
2448        let else_val = Self::from_expr(lit(NULL), None);
2449        let idx_expr = crate::functions::when(&cond)
2450            .then(&then_val)
2451            .otherwise(&else_val)
2452            .into_expr();
2453        let list_expr = self
2454            .expr()
2455            .clone()
2456            .list()
2457            .eval(idx_expr)
2458            .list()
2459            .min()
2460            .fill_null(lit(0i64))
2461            .cast(DataType::Int64);
2462        Self::from_expr(list_expr, Some("array_position".to_string()))
2463    }
2464
2465    /// Remove null elements from list (PySpark array_compact). Preserves order.
2466    pub fn array_compact(&self) -> Column {
2467        let list_expr = self.expr().clone().list().drop_nulls();
2468        Self::from_expr(list_expr, None)
2469    }
2470
2471    /// New list with all elements equal to value removed (PySpark array_remove).
2472    /// Uses list.eval + drop_nulls (requires polars list_eval and list_drop_nulls).
2473    pub fn array_remove(&self, value: Expr) -> Column {
2474        use polars::prelude::NULL;
2475        // when(element != value) then element else null; then drop_nulls.
2476        let cond = Self::from_expr(col("").neq(value), None);
2477        let then_val = Self::from_expr(col(""), None);
2478        let else_val = Self::from_expr(lit(NULL), None);
2479        let elem_neq = crate::functions::when(&cond)
2480            .then(&then_val)
2481            .otherwise(&else_val)
2482            .into_expr();
2483        let list_expr = self
2484            .expr()
2485            .clone()
2486            .list()
2487            .eval(elem_neq)
2488            .list()
2489            .drop_nulls();
2490        Self::from_expr(list_expr, None)
2491    }
2492
2493    /// Repeat each element n times (PySpark array_repeat). Implemented via map UDF.
2494    pub fn array_repeat(&self, n: i64) -> Column {
2495        let expr = self.expr().clone().map(
2496            move |c| expect_col(crate::udfs::apply_array_repeat(c, n)),
2497            |_schema, field| Ok(field.clone()),
2498        );
2499        Self::from_expr(expr, None)
2500    }
2501
2502    /// Flatten list of lists to one list (PySpark flatten). Implemented via map UDF.
2503    pub fn array_flatten(&self) -> Column {
2504        let expr = self.expr().clone().map(
2505            |s| expect_col(crate::udfs::apply_array_flatten(s)),
2506            |_schema, field| Ok(field.clone()),
2507        );
2508        Self::from_expr(expr, None)
2509    }
2510
2511    /// Append element to end of list (PySpark array_append).
2512    pub fn array_append(&self, elem: &Column) -> Column {
2513        let args = [elem.expr().clone()];
2514        let expr = self.expr().clone().map_many(
2515            |cols| expect_col(crate::udfs::apply_array_append(cols)),
2516            &args,
2517            |_schema, fields| Ok(fields[0].clone()),
2518        );
2519        Self::from_expr(expr, None)
2520    }
2521
2522    /// Prepend element to start of list (PySpark array_prepend).
2523    pub fn array_prepend(&self, elem: &Column) -> Column {
2524        let args = [elem.expr().clone()];
2525        let expr = self.expr().clone().map_many(
2526            |cols| expect_col(crate::udfs::apply_array_prepend(cols)),
2527            &args,
2528            |_schema, fields| Ok(fields[0].clone()),
2529        );
2530        Self::from_expr(expr, None)
2531    }
2532
2533    /// Insert element at 1-based position (PySpark array_insert).
2534    pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
2535        let args = [pos.expr().clone(), elem.expr().clone()];
2536        let expr = self.expr().clone().map_many(
2537            |cols| expect_col(crate::udfs::apply_array_insert(cols)),
2538            &args,
2539            |_schema, fields| Ok(fields[0].clone()),
2540        );
2541        Self::from_expr(expr, None)
2542    }
2543
2544    /// Elements in first array not in second (PySpark array_except).
2545    pub fn array_except(&self, other: &Column) -> Column {
2546        let args = [other.expr().clone()];
2547        let expr = self.expr().clone().map_many(
2548            |cols| expect_col(crate::udfs::apply_array_except(cols)),
2549            &args,
2550            |_schema, fields| Ok(fields[0].clone()),
2551        );
2552        Self::from_expr(expr, None)
2553    }
2554
2555    /// Elements in both arrays (PySpark array_intersect).
2556    pub fn array_intersect(&self, other: &Column) -> Column {
2557        let args = [other.expr().clone()];
2558        let expr = self.expr().clone().map_many(
2559            |cols| expect_col(crate::udfs::apply_array_intersect(cols)),
2560            &args,
2561            |_schema, fields| Ok(fields[0].clone()),
2562        );
2563        Self::from_expr(expr, None)
2564    }
2565
2566    /// Distinct elements from both arrays (PySpark array_union).
2567    pub fn array_union(&self, other: &Column) -> Column {
2568        let args = [other.expr().clone()];
2569        let expr = self.expr().clone().map_many(
2570            |cols| expect_col(crate::udfs::apply_array_union(cols)),
2571            &args,
2572            |_schema, fields| Ok(fields[0].clone()),
2573        );
2574        Self::from_expr(expr, None)
2575    }
2576
2577    /// Zip two arrays element-wise with merge function (PySpark zip_with). Shorter array padded with null.
2578    /// Merge Expr uses col("").struct_().field_by_name("left") and field_by_name("right").
2579    pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
2580        let args = [other.expr().clone()];
2581        let zip_expr = self.expr().clone().map_many(
2582            |cols| expect_col(crate::udfs::apply_zip_arrays_to_struct(cols)),
2583            &args,
2584            |_schema, fields| {
2585                let left_inner = match &fields[0].dtype {
2586                    DataType::List(inner) => *inner.clone(),
2587                    _ => DataType::Unknown(Default::default()),
2588                };
2589                let right_inner = match fields.get(1).map(|f| &f.dtype) {
2590                    Some(DataType::List(inner)) => *inner.clone(),
2591                    _ => DataType::Unknown(Default::default()),
2592                };
2593                let struct_dtype = DataType::Struct(vec![
2594                    Field::new("left".into(), left_inner),
2595                    Field::new("right".into(), right_inner),
2596                ]);
2597                Ok(Field::new(
2598                    fields[0].name().clone(),
2599                    DataType::List(Box::new(struct_dtype)),
2600                ))
2601            },
2602        );
2603        let list_expr = zip_expr.list().eval(merge);
2604        Self::from_expr(list_expr, None)
2605    }
2606
2607    /// True if any list element satisfies the predicate (PySpark exists). Uses list.eval(pred).list().any().
2608    pub fn array_exists(&self, predicate: Expr) -> Column {
2609        let pred_expr = self.expr().clone().list().eval(predicate).list().any();
2610        Self::from_expr(pred_expr, Some("exists".to_string()))
2611    }
2612
2613    /// True if all list elements satisfy the predicate (PySpark forall). Uses list.eval(pred).list().all().
2614    pub fn array_forall(&self, predicate: Expr) -> Column {
2615        let pred_expr = self.expr().clone().list().eval(predicate).list().all();
2616        Self::from_expr(pred_expr, Some("forall".to_string()))
2617    }
2618
2619    /// Filter list elements by predicate (PySpark filter). Keeps elements where predicate is true.
2620    pub fn array_filter(&self, predicate: Expr) -> Column {
2621        use polars::prelude::NULL;
2622        let then_val = Self::from_expr(col(""), None);
2623        let else_val = Self::from_expr(lit(NULL), None);
2624        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
2625            .then(&then_val)
2626            .otherwise(&else_val)
2627            .into_expr();
2628        let list_expr = self
2629            .expr()
2630            .clone()
2631            .list()
2632            .eval(elem_expr)
2633            .list()
2634            .drop_nulls();
2635        Self::from_expr(list_expr, None)
2636    }
2637
2638    /// Transform list elements by expression (PySpark transform). list.eval(expr).
2639    pub fn array_transform(&self, f: Expr) -> Column {
2640        let list_expr = self.expr().clone().list().eval(f);
2641        Self::from_expr(list_expr, None)
2642    }
2643
2644    /// Sum of list elements (PySpark aggregate with sum). Uses list.sum().
2645    pub fn array_sum(&self) -> Column {
2646        Self::from_expr(self.expr().clone().list().sum(), None)
2647    }
2648
2649    /// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list). Full (zero, merge, finish) deferred.
2650    pub fn array_aggregate(&self, zero: &Column) -> Column {
2651        let sum_expr = self.expr().clone().list().sum();
2652        Self::from_expr(sum_expr + zero.expr().clone(), None)
2653    }
2654
2655    /// Mean of list elements (PySpark aggregate with avg). Uses list.mean().
2656    pub fn array_mean(&self) -> Column {
2657        Self::from_expr(self.expr().clone().list().mean(), None)
2658    }
2659
2660    /// Explode list with position (PySpark posexplode). Returns (pos_col, value_col).
2661    /// pos is 1-based; uses list.eval(cum_count()).explode() and explode().
2662    pub fn posexplode(&self) -> (Column, Column) {
2663        use polars::prelude::ExplodeOptions;
2664        let opts = ExplodeOptions {
2665            empty_as_null: false,
2666            keep_nulls: false,
2667        };
2668        let pos_expr = self
2669            .expr()
2670            .clone()
2671            .list()
2672            .eval(col("").cum_count(false))
2673            .explode(opts);
2674        let val_expr = self.expr().clone().explode(opts);
2675        (
2676            Self::from_expr(pos_expr, Some("pos".to_string())),
2677            Self::from_expr(val_expr, Some("col".to_string())),
2678        )
2679    }
2680
2681    /// Extract keys from a map column (PySpark map_keys). Map column is List(Struct{key, value}).
2682    pub fn map_keys(&self) -> Column {
2683        let elem_key = col("").struct_().field_by_name("key");
2684        let list_expr = self.expr().clone().list().eval(elem_key);
2685        Self::from_expr(list_expr, None)
2686    }
2687
2688    /// Extract values from a map column (PySpark map_values). Map column is List(Struct{key, value}).
2689    pub fn map_values(&self) -> Column {
2690        let elem_val = col("").struct_().field_by_name("value");
2691        let list_expr = self.expr().clone().list().eval(elem_val);
2692        Self::from_expr(list_expr, None)
2693    }
2694
2695    /// Return map as list of structs {key, value} (PySpark map_entries). Identity for List(Struct) column.
2696    pub fn map_entries(&self) -> Column {
2697        Self::from_expr(self.expr().clone(), None)
2698    }
2699
2700    /// Build map from two array columns (keys, values) (PySpark map_from_arrays). Implemented via map_many UDF.
2701    pub fn map_from_arrays(&self, values: &Column) -> Column {
2702        let args = [values.expr().clone()];
2703        let expr = self.expr().clone().map_many(
2704            |cols| expect_col(crate::udfs::apply_map_from_arrays(cols)),
2705            &args,
2706            |_schema, fields| Ok(fields[0].clone()),
2707        );
2708        Self::from_expr(expr, None)
2709    }
2710
2711    /// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2712    pub fn map_concat(&self, other: &Column) -> Column {
2713        let args = [other.expr().clone()];
2714        let expr = self.expr().clone().map_many(
2715            |cols| expect_col(crate::udfs::apply_map_concat(cols)),
2716            &args,
2717            |_schema, fields| Ok(fields[0].clone()),
2718        );
2719        Self::from_expr(expr, None)
2720    }
2721
2722    /// Transform each map key by expr (PySpark transform_keys). key_expr should use col("").struct_().field_by_name("key").
2723    pub fn transform_keys(&self, key_expr: Expr) -> Column {
2724        use polars::prelude::as_struct;
2725        let value = col("").struct_().field_by_name("value");
2726        let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
2727        let list_expr = self.expr().clone().list().eval(new_struct);
2728        Self::from_expr(list_expr, None)
2729    }
2730
2731    /// Transform each map value by expr (PySpark transform_values). value_expr should use col("").struct_().field_by_name("value").
2732    pub fn transform_values(&self, value_expr: Expr) -> Column {
2733        use polars::prelude::as_struct;
2734        let key = col("").struct_().field_by_name("key");
2735        let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
2736        let list_expr = self.expr().clone().list().eval(new_struct);
2737        Self::from_expr(list_expr, None)
2738    }
2739
2740    /// Merge two maps by key with merge function (PySpark map_zip_with).
2741    /// Merge Expr uses col("").struct_().field_by_name("value1") and field_by_name("value2").
2742    pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
2743        use polars::prelude::as_struct;
2744        let args = [other.expr().clone()];
2745        let zip_expr = self.expr().clone().map_many(
2746            |cols| expect_col(crate::udfs::apply_map_zip_to_struct(cols)),
2747            &args,
2748            |_schema, fields| {
2749                let list_inner = match &fields[0].dtype {
2750                    DataType::List(inner) => *inner.clone(),
2751                    _ => return Ok(fields[0].clone()),
2752                };
2753                let (key_dtype, value_dtype) = match &list_inner {
2754                    DataType::Struct(struct_fields) => {
2755                        let k = struct_fields
2756                            .iter()
2757                            .find(|f| f.name.as_str() == "key")
2758                            .map(|f| f.dtype.clone())
2759                            .unwrap_or(DataType::String);
2760                        let v = struct_fields
2761                            .iter()
2762                            .find(|f| f.name.as_str() == "value")
2763                            .map(|f| f.dtype.clone())
2764                            .unwrap_or(DataType::String);
2765                        (k, v)
2766                    }
2767                    _ => (DataType::String, DataType::String),
2768                };
2769                let out_struct = DataType::Struct(vec![
2770                    Field::new("key".into(), key_dtype),
2771                    Field::new("value1".into(), value_dtype.clone()),
2772                    Field::new("value2".into(), value_dtype),
2773                ]);
2774                Ok(Field::new(
2775                    fields[0].name().clone(),
2776                    DataType::List(Box::new(out_struct)),
2777                ))
2778            },
2779        );
2780        let key_field = col("").struct_().field_by_name("key").alias("key");
2781        let value_field = merge.alias("value");
2782        let merge_expr = as_struct(vec![key_field, value_field]);
2783        let list_expr = zip_expr.list().eval(merge_expr);
2784        Self::from_expr(list_expr, None)
2785    }
2786
2787    /// Filter map entries by predicate (PySpark map_filter). Keeps key-value pairs where predicate is true.
2788    /// Predicate uses col("").struct_().field_by_name("key") and field_by_name("value") to reference key/value.
2789    pub fn map_filter(&self, predicate: Expr) -> Column {
2790        use polars::prelude::NULL;
2791        let then_val = Self::from_expr(col(""), None);
2792        let else_val = Self::from_expr(lit(NULL), None);
2793        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
2794            .then(&then_val)
2795            .otherwise(&else_val)
2796            .into_expr();
2797        let list_expr = self
2798            .expr()
2799            .clone()
2800            .list()
2801            .eval(elem_expr)
2802            .list()
2803            .drop_nulls();
2804        Self::from_expr(list_expr, None)
2805    }
2806
2807    /// Array of structs {key, value} to map (PySpark map_from_entries). Identity for List(Struct) format.
2808    pub fn map_from_entries(&self) -> Column {
2809        Self::from_expr(self.expr().clone(), None)
2810    }
2811
2812    /// True if map contains key (PySpark map_contains_key).
2813    pub fn map_contains_key(&self, key: &Column) -> Column {
2814        let args = [key.expr().clone()];
2815        let expr = self.expr().clone().map_many(
2816            |cols| expect_col(crate::udfs::apply_map_contains_key(cols)),
2817            &args,
2818            |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
2819        );
2820        Self::from_expr(expr, None)
2821    }
2822
2823    /// Get value for key from map, or null (PySpark get).
2824    pub fn get(&self, key: &Column) -> Column {
2825        let args = [key.expr().clone()];
2826        let expr = self.expr().clone().map_many(
2827            |cols| expect_col(crate::udfs::apply_get(cols)),
2828            &args,
2829            |_schema, fields| Ok(fields[0].clone()),
2830        );
2831        Self::from_expr(expr, None)
2832    }
2833
2834    /// Extract JSON path from string column (PySpark get_json_object). Uses Polars str().json_path_match.
2835    pub fn get_json_object(&self, path: &str) -> Column {
2836        let path_expr = polars::prelude::lit(path.to_string());
2837        let out = self.expr().clone().str().json_path_match(path_expr);
2838        Self::from_expr(out, None)
2839    }
2840
2841    /// Parse string column as JSON into struct (PySpark from_json). Uses Polars str().json_decode.
2842    pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
2843        use polars::prelude::DataType;
2844        let dtype = schema.unwrap_or(DataType::String);
2845        let out = self.expr().clone().str().json_decode(dtype);
2846        Self::from_expr(out, None)
2847    }
2848
2849    /// Serialize struct column to JSON string (PySpark to_json). Uses Polars struct().json_encode.
2850    pub fn to_json(&self) -> Column {
2851        let out = self.expr().clone().struct_().json_encode();
2852        Self::from_expr(out, None)
2853    }
2854
2855    /// Length of JSON array at path (PySpark json_array_length). UDF.
2856    pub fn json_array_length(&self, path: &str) -> Column {
2857        let path = path.to_string();
2858        let expr = self.expr().clone().map(
2859            move |s| expect_col(crate::udfs::apply_json_array_length(s, &path)),
2860            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2861        );
2862        Self::from_expr(expr, None)
2863    }
2864
2865    /// Keys of JSON object (PySpark json_object_keys). Returns list of strings. UDF.
2866    pub fn json_object_keys(&self) -> Column {
2867        let expr = self.expr().clone().map(
2868            |s| expect_col(crate::udfs::apply_json_object_keys(s)),
2869            |_schema, field| {
2870                Ok(Field::new(
2871                    field.name().clone(),
2872                    DataType::List(Box::new(DataType::String)),
2873                ))
2874            },
2875        );
2876        Self::from_expr(expr, None)
2877    }
2878
2879    /// Extract keys from JSON as struct (PySpark json_tuple). UDF. Returns struct with one string field per key.
2880    pub fn json_tuple(&self, keys: &[&str]) -> Column {
2881        let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
2882        let struct_fields: Vec<polars::datatypes::Field> = keys_vec
2883            .iter()
2884            .map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
2885            .collect();
2886        let expr = self.expr().clone().map(
2887            move |s| expect_col(crate::udfs::apply_json_tuple(s, &keys_vec)),
2888            move |_schema, field| {
2889                Ok(Field::new(
2890                    field.name().clone(),
2891                    DataType::Struct(struct_fields.clone()),
2892                ))
2893            },
2894        );
2895        Self::from_expr(expr, None)
2896    }
2897
2898    /// Parse CSV string to struct (PySpark from_csv). Minimal: split by comma, up to 32 columns. UDF.
2899    pub fn from_csv(&self) -> Column {
2900        let expr = self.expr().clone().map(
2901            |s| expect_col(crate::udfs::apply_from_csv(s)),
2902            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Struct(vec![]))),
2903        );
2904        Self::from_expr(expr, None)
2905    }
2906
2907    /// Format struct as CSV string (PySpark to_csv). Minimal. UDF.
2908    pub fn to_csv(&self) -> Column {
2909        let expr = self.expr().clone().map(
2910            |s| expect_col(crate::udfs::apply_to_csv(s)),
2911            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2912        );
2913        Self::from_expr(expr, None)
2914    }
2915
2916    /// Parse URL and extract part (PySpark parse_url). UDF.
2917    /// When part is QUERY/QUERYSTRING and key is Some(k), returns the value for that query parameter only.
2918    pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
2919        let part = part.to_string();
2920        let key_owned = key.map(String::from);
2921        let expr = self.expr().clone().map(
2922            move |s| expect_col(crate::udfs::apply_parse_url(s, &part, key_owned.as_deref())),
2923            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2924        );
2925        Self::from_expr(expr, None)
2926    }
2927
2928    /// Hash of column value (PySpark hash). Single-column version.
2929    pub fn hash(&self) -> Column {
2930        let expr = self.expr().clone().map(
2931            |s| expect_col(crate::udfs::apply_hash_one(s)),
2932            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2933        );
2934        Self::from_expr(expr, None)
2935    }
2936
2937    /// Check if column values are in the other column's list/series (PySpark isin).
2938    pub fn isin(&self, other: &Column) -> Column {
2939        let out = self.expr().clone().is_in(other.expr().clone(), false);
2940        Self::from_expr(out, None)
2941    }
2942
2943    /// Percent-decode URL-encoded string (PySpark url_decode). Uses UDF.
2944    pub fn url_decode(&self) -> Column {
2945        let expr = self.expr().clone().map(
2946            |s| expect_col(crate::udfs::apply_url_decode(s)),
2947            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2948        );
2949        Self::from_expr(expr, None)
2950    }
2951
2952    /// Percent-encode string for URL (PySpark url_encode). Uses UDF.
2953    pub fn url_encode(&self) -> Column {
2954        let expr = self.expr().clone().map(
2955            |s| expect_col(crate::udfs::apply_url_encode(s)),
2956            |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2957        );
2958        Self::from_expr(expr, None)
2959    }
2960
2961    /// Bitwise left shift (PySpark shiftLeft). col << n = col * 2^n.
2962    pub fn shift_left(&self, n: i32) -> Column {
2963        use polars::prelude::*;
2964        let pow = lit(2i64).pow(lit(n as i64));
2965        Self::from_expr(
2966            (self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
2967            None,
2968        )
2969    }
2970
2971    /// Bitwise signed right shift (PySpark shiftRight). col >> n = col / 2^n.
2972    pub fn shift_right(&self, n: i32) -> Column {
2973        use polars::prelude::*;
2974        let pow = lit(2i64).pow(lit(n as i64));
2975        Self::from_expr(
2976            (self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
2977            None,
2978        )
2979    }
2980
2981    /// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift.
2982    pub fn shift_right_unsigned(&self, n: i32) -> Column {
2983        let expr = self.expr().clone().map(
2984            move |s| expect_col(crate::udfs::apply_shift_right_unsigned(s, n)),
2985            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2986        );
2987        Self::from_expr(expr, None)
2988    }
2989}
2990
2991#[cfg(test)]
2992mod tests {
2993    use super::Column;
2994    use polars::prelude::{IntoLazy, col, df, lit};
2995
2996    /// Helper to create a simple DataFrame for testing
2997    fn test_df() -> polars::prelude::DataFrame {
2998        df!(
2999            "a" => &[1, 2, 3, 4, 5],
3000            "b" => &[10, 20, 30, 40, 50]
3001        )
3002        .unwrap()
3003    }
3004
3005    /// Helper to create a DataFrame with nulls for testing
3006    fn test_df_with_nulls() -> polars::prelude::DataFrame {
3007        df!(
3008            "a" => &[Some(1), Some(2), None, Some(4), None],
3009            "b" => &[Some(10), None, Some(30), None, None]
3010        )
3011        .unwrap()
3012    }
3013
3014    #[test]
3015    fn test_column_new() {
3016        let column = Column::new("age".to_string());
3017        assert_eq!(column.name(), "age");
3018    }
3019
3020    #[test]
3021    fn test_column_from_expr() {
3022        let expr = col("test");
3023        let column = Column::from_expr(expr, Some("test".to_string()));
3024        assert_eq!(column.name(), "test");
3025    }
3026
3027    #[test]
3028    fn test_column_from_expr_default_name() {
3029        let expr = col("test").gt(lit(5));
3030        let column = Column::from_expr(expr, None);
3031        assert_eq!(column.name(), "<expr>");
3032    }
3033
3034    #[test]
3035    fn test_column_alias() {
3036        let column = Column::new("original".to_string());
3037        let aliased = column.alias("new_name");
3038        assert_eq!(aliased.name(), "new_name");
3039    }
3040
3041    #[test]
3042    fn test_column_gt() {
3043        let df = test_df();
3044        let column = Column::new("a".to_string());
3045        let result = column.gt(lit(3));
3046
3047        // Apply the expression to filter the DataFrame
3048        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3049        assert_eq!(filtered.height(), 2); // rows with a > 3: 4, 5
3050    }
3051
3052    #[test]
3053    fn test_column_lt() {
3054        let df = test_df();
3055        let column = Column::new("a".to_string());
3056        let result = column.lt(lit(3));
3057
3058        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3059        assert_eq!(filtered.height(), 2); // rows with a < 3: 1, 2
3060    }
3061
3062    #[test]
3063    fn test_column_eq() {
3064        let df = test_df();
3065        let column = Column::new("a".to_string());
3066        let result = column.eq(lit(3));
3067
3068        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3069        assert_eq!(filtered.height(), 1); // only row with a == 3
3070    }
3071
3072    #[test]
3073    fn test_column_neq() {
3074        let df = test_df();
3075        let column = Column::new("a".to_string());
3076        let result = column.neq(lit(3));
3077
3078        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3079        assert_eq!(filtered.height(), 4); // rows with a != 3
3080    }
3081
3082    #[test]
3083    fn test_column_gt_eq() {
3084        let df = test_df();
3085        let column = Column::new("a".to_string());
3086        let result = column.gt_eq(lit(3));
3087
3088        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3089        assert_eq!(filtered.height(), 3); // rows with a >= 3: 3, 4, 5
3090    }
3091
3092    #[test]
3093    fn test_column_lt_eq() {
3094        let df = test_df();
3095        let column = Column::new("a".to_string());
3096        let result = column.lt_eq(lit(3));
3097
3098        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3099        assert_eq!(filtered.height(), 3); // rows with a <= 3: 1, 2, 3
3100    }
3101
3102    #[test]
3103    fn test_column_is_null() {
3104        let df = test_df_with_nulls();
3105        let column = Column::new("a".to_string());
3106        let result = column.is_null();
3107
3108        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3109        assert_eq!(filtered.height(), 2); // 2 null values in column 'a'
3110    }
3111
3112    #[test]
3113    fn test_column_is_not_null() {
3114        let df = test_df_with_nulls();
3115        let column = Column::new("a".to_string());
3116        let result = column.is_not_null();
3117
3118        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3119        assert_eq!(filtered.height(), 3); // 3 non-null values in column 'a'
3120    }
3121
3122    #[test]
3123    fn test_null_boolean_column_produces_null_bool_series() {
3124        let df = test_df();
3125        let expr = Column::null_boolean().into_expr();
3126        let out = df
3127            .lazy()
3128            .select([expr.alias("null_bool")])
3129            .collect()
3130            .unwrap();
3131        let s = out.column("null_bool").unwrap();
3132        assert_eq!(s.dtype(), &polars::prelude::DataType::Boolean);
3133        assert_eq!(s.null_count(), s.len());
3134    }
3135
3136    #[test]
3137    fn test_eq_null_safe_both_null() {
3138        // Create a DataFrame where both columns have NULL at the same row
3139        let df = df!(
3140            "a" => &[Some(1), None, Some(3)],
3141            "b" => &[Some(1), None, Some(4)]
3142        )
3143        .unwrap();
3144
3145        let col_a = Column::new("a".to_string());
3146        let col_b = Column::new("b".to_string());
3147        let result = col_a.eq_null_safe(&col_b);
3148
3149        // Apply the expression and collect
3150        let result_df = df
3151            .lazy()
3152            .with_column(result.into_expr().alias("eq_null_safe"))
3153            .collect()
3154            .unwrap();
3155
3156        // Get the result column
3157        let eq_col = result_df.column("eq_null_safe").unwrap();
3158        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
3159
3160        // Row 0: 1 == 1 -> true
3161        // Row 1: NULL <=> NULL -> true
3162        // Row 2: 3 == 4 -> false
3163        assert_eq!(values[0], Some(true));
3164        assert_eq!(values[1], Some(true)); // NULL-safe: both NULL = true
3165        assert_eq!(values[2], Some(false));
3166    }
3167
3168    #[test]
3169    fn test_eq_null_safe_one_null() {
3170        // Create a DataFrame where only one column has NULL
3171        let df = df!(
3172            "a" => &[Some(1), None, Some(3)],
3173            "b" => &[Some(1), Some(2), None]
3174        )
3175        .unwrap();
3176
3177        let col_a = Column::new("a".to_string());
3178        let col_b = Column::new("b".to_string());
3179        let result = col_a.eq_null_safe(&col_b);
3180
3181        let result_df = df
3182            .lazy()
3183            .with_column(result.into_expr().alias("eq_null_safe"))
3184            .collect()
3185            .unwrap();
3186
3187        let eq_col = result_df.column("eq_null_safe").unwrap();
3188        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
3189
3190        // Row 0: 1 == 1 -> true
3191        // Row 1: NULL <=> 2 -> false (one is null, not both)
3192        // Row 2: 3 <=> NULL -> false (one is null, not both)
3193        assert_eq!(values[0], Some(true));
3194        assert_eq!(values[1], Some(false));
3195        assert_eq!(values[2], Some(false));
3196    }
3197}