Skip to main content

robin_sparkless/
column.rs

1use polars::prelude::{
2    col, lit, DataType, Expr, GetOutput, ListNameSpaceExtension, RankMethod, RankOptions, TimeUnit,
3};
4
5/// Convert SQL LIKE pattern (% = any sequence, _ = one char) to regex. Escapes regex specials.
6/// When escape_char is Some(esc), esc + any char treats that char as literal (no %/_ expansion).
7fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
8    let mut out = String::with_capacity(pattern.len() * 2);
9    let mut it = pattern.chars();
10    while let Some(c) = it.next() {
11        if escape_char == Some(c) {
12            if let Some(next) = it.next() {
13                // Literal: escape for regex
14                if "\\.*+?[](){}^$|".contains(next) {
15                    out.push('\\');
16                }
17                out.push(next);
18            } else {
19                out.push('\\');
20                out.push(c);
21            }
22        } else {
23            match c {
24                '%' => out.push_str(".*"),
25                '_' => out.push('.'),
26                '\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
27                | '|' => {
28                    out.push('\\');
29                    out.push(c);
30                }
31                _ => out.push(c),
32            }
33        }
34    }
35    format!("^{out}$")
36}
37
38/// Deferred random column: when added via with_column, we generate a full-length series in one go (PySpark-like).
39#[derive(Debug, Clone, Copy)]
40pub enum DeferredRandom {
41    Rand(Option<u64>),
42    Randn(Option<u64>),
43}
44
45/// Column - represents a column in a DataFrame, used for building expressions
46/// Thin wrapper around Polars `Expr`. May carry a DeferredRandom for rand/randn so with_column can produce one value per row.
47/// May carry UdfCall for Python UDFs (eager execution at with_column).
48#[derive(Debug, Clone)]
49pub struct Column {
50    name: String,
51    expr: Expr, // Polars expression for lazy evaluation
52    /// When Some, with_column generates a full-length random series instead of using expr (PySpark-like per-row rand/randn).
53    pub(crate) deferred: Option<DeferredRandom>,
54    /// When Some, with_column executes Python UDF eagerly (name, arg columns).
55    pub(crate) udf_call: Option<(String, Vec<Column>)>,
56}
57
58impl Column {
59    /// Create a new Column from a column name
60    pub fn new(name: String) -> Self {
61        Column {
62            name: name.clone(),
63            expr: col(&name),
64            deferred: None,
65            udf_call: None,
66        }
67    }
68
69    /// Create a Column from a Polars Expr
70    pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
71        let display_name = name.unwrap_or_else(|| "<expr>".to_string());
72        Column {
73            name: display_name,
74            expr,
75            deferred: None,
76            udf_call: None,
77        }
78    }
79
80    /// Create a Column for Python UDF call (eager execution at with_column).
81    pub fn from_udf_call(name: String, args: Vec<Column>) -> Self {
82        Column {
83            name: format!("{name}()"),
84            expr: lit(0i32), // dummy, never used
85            deferred: None,
86            udf_call: Some((name, args)),
87        }
88    }
89
90    /// Create a Column for rand(seed). When used in with_column, generates one value per row (PySpark-like).
91    pub fn from_rand(seed: Option<u64>) -> Self {
92        let expr = lit(1i64).cum_sum(false).map(
93            move |c| crate::udfs::apply_rand_with_seed(c, seed),
94            GetOutput::from_type(DataType::Float64),
95        );
96        Column {
97            name: "rand".to_string(),
98            expr,
99            deferred: Some(DeferredRandom::Rand(seed)),
100            udf_call: None,
101        }
102    }
103
104    /// Create a Column for randn(seed). When used in with_column, generates one value per row (PySpark-like).
105    pub fn from_randn(seed: Option<u64>) -> Self {
106        let expr = lit(1i64).cum_sum(false).map(
107            move |c| crate::udfs::apply_randn_with_seed(c, seed),
108            GetOutput::from_type(DataType::Float64),
109        );
110        Column {
111            name: "randn".to_string(),
112            expr,
113            deferred: Some(DeferredRandom::Randn(seed)),
114            udf_call: None,
115        }
116    }
117
118    /// Get the underlying Polars Expr
119    pub fn expr(&self) -> &Expr {
120        &self.expr
121    }
122
123    /// Convert to Polars Expr (consumes self)
124    pub fn into_expr(self) -> Expr {
125        self.expr
126    }
127
128    /// Get the column name
129    pub fn name(&self) -> &str {
130        &self.name
131    }
132
133    /// Alias the column
134    pub fn alias(&self, name: &str) -> Column {
135        Column {
136            name: name.to_string(),
137            expr: self.expr.clone().alias(name),
138            deferred: self.deferred,
139            udf_call: self.udf_call.clone(),
140        }
141    }
142
143    /// Ascending sort, nulls first (Spark default for ASC). PySpark asc.
144    pub fn asc(&self) -> crate::functions::SortOrder {
145        crate::functions::asc(self)
146    }
147
148    /// Ascending sort, nulls first. PySpark asc_nulls_first.
149    pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
150        crate::functions::asc_nulls_first(self)
151    }
152
153    /// Ascending sort, nulls last. PySpark asc_nulls_last.
154    pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
155        crate::functions::asc_nulls_last(self)
156    }
157
158    /// Descending sort, nulls last (Spark default for DESC). PySpark desc.
159    pub fn desc(&self) -> crate::functions::SortOrder {
160        crate::functions::desc(self)
161    }
162
163    /// Descending sort, nulls first. PySpark desc_nulls_first.
164    pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
165        crate::functions::desc_nulls_first(self)
166    }
167
168    /// Descending sort, nulls last. PySpark desc_nulls_last.
169    pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
170        crate::functions::desc_nulls_last(self)
171    }
172
173    /// Check if column is null
174    pub fn is_null(&self) -> Column {
175        Column {
176            name: format!("({} IS NULL)", self.name),
177            expr: self.expr.clone().is_null(),
178            deferred: None,
179            udf_call: None,
180        }
181    }
182
183    /// Check if column is not null
184    pub fn is_not_null(&self) -> Column {
185        Column {
186            name: format!("({} IS NOT NULL)", self.name),
187            expr: self.expr.clone().is_not_null(),
188            deferred: None,
189            udf_call: None,
190        }
191    }
192
193    /// Alias for is_null. PySpark isnull.
194    pub fn isnull(&self) -> Column {
195        self.is_null()
196    }
197
198    /// Alias for is_not_null. PySpark isnotnull.
199    pub fn isnotnull(&self) -> Column {
200        self.is_not_null()
201    }
202
203    /// Create a null boolean expression
204    fn null_boolean_expr() -> Expr {
205        use polars::prelude::*;
206        // Create an expression that is always a null boolean
207        lit(NULL).cast(DataType::Boolean)
208    }
209
210    /// SQL LIKE pattern matching (% = any chars, _ = one char). PySpark like.
211    /// When escape_char is Some(esc), esc + char treats that char as literal (e.g. \\% = literal %).
212    pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
213        let regex = like_pattern_to_regex(pattern, escape_char);
214        self.regexp_like(&regex)
215    }
216
217    /// Case-insensitive LIKE. PySpark ilike.
218    /// When escape_char is Some(esc), esc + char treats that char as literal.
219    pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
220        use polars::prelude::*;
221        let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
222        Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
223    }
224
225    /// PySpark-style equality comparison (NULL == NULL returns NULL, not True)
226    /// Any comparison involving NULL returns NULL
227    ///
228    /// Explicitly wraps comparisons with null checks to ensure PySpark semantics.
229    /// If either side is NULL, the result is NULL.
230    pub fn eq_pyspark(&self, other: &Column) -> Column {
231        // Check if either side is NULL
232        let left_null = self.expr().clone().is_null();
233        let right_null = other.expr().clone().is_null();
234        let either_null = left_null.clone().or(right_null.clone());
235
236        // Standard equality comparison
237        let eq_result = self.expr().clone().eq(other.expr().clone());
238
239        // Wrap: if either is null, return null boolean, else return comparison result
240        let null_boolean = Self::null_boolean_expr();
241        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
242            .then(&Self::from_expr(null_boolean, None))
243            .otherwise(&Self::from_expr(eq_result, None));
244
245        Self::from_expr(null_aware_expr.into_expr(), None)
246    }
247
248    /// PySpark-style inequality comparison (NULL != NULL returns NULL, not False)
249    /// Any comparison involving NULL returns NULL
250    pub fn ne_pyspark(&self, other: &Column) -> Column {
251        // Check if either side is NULL
252        let left_null = self.expr().clone().is_null();
253        let right_null = other.expr().clone().is_null();
254        let either_null = left_null.clone().or(right_null.clone());
255
256        // Standard inequality comparison
257        let ne_result = self.expr().clone().neq(other.expr().clone());
258
259        // Wrap: if either is null, return null boolean, else return comparison result
260        let null_boolean = Self::null_boolean_expr();
261        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
262            .then(&Self::from_expr(null_boolean, None))
263            .otherwise(&Self::from_expr(ne_result, None));
264
265        Self::from_expr(null_aware_expr.into_expr(), None)
266    }
267
268    /// Null-safe equality (NULL <=> NULL returns True)
269    /// PySpark's eqNullSafe() method
270    pub fn eq_null_safe(&self, other: &Column) -> Column {
271        use crate::functions::{lit_bool, when};
272
273        let left_null = self.expr().clone().is_null();
274        let right_null = other.expr().clone().is_null();
275        let both_null = left_null.clone().and(right_null.clone());
276        let either_null = left_null.clone().or(right_null.clone());
277
278        // Standard equality
279        let eq_result = self.expr().clone().eq(other.expr().clone());
280
281        // If both are null, return True
282        // If either is null (but not both), return False
283        // Otherwise, return standard equality result
284        when(&Self::from_expr(both_null, None))
285            .then(&lit_bool(true))
286            .otherwise(
287                &when(&Self::from_expr(either_null, None))
288                    .then(&lit_bool(false))
289                    .otherwise(&Self::from_expr(eq_result, None)),
290            )
291    }
292
293    /// PySpark-style greater-than comparison (NULL > value returns NULL)
294    /// Any comparison involving NULL returns NULL
295    pub fn gt_pyspark(&self, other: &Column) -> Column {
296        // Check if either side is NULL
297        let left_null = self.expr().clone().is_null();
298        let right_null = other.expr().clone().is_null();
299        let either_null = left_null.clone().or(right_null.clone());
300
301        // Standard greater-than comparison
302        let gt_result = self.expr().clone().gt(other.expr().clone());
303
304        // Wrap: if either is null, return null boolean, else return comparison result
305        let null_boolean = Self::null_boolean_expr();
306        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
307            .then(&Self::from_expr(null_boolean, None))
308            .otherwise(&Self::from_expr(gt_result, None));
309
310        Self::from_expr(null_aware_expr.into_expr(), None)
311    }
312
313    /// PySpark-style greater-than-or-equal comparison
314    /// Any comparison involving NULL returns NULL
315    pub fn ge_pyspark(&self, other: &Column) -> Column {
316        // Check if either side is NULL
317        let left_null = self.expr().clone().is_null();
318        let right_null = other.expr().clone().is_null();
319        let either_null = left_null.clone().or(right_null.clone());
320
321        // Standard greater-than-or-equal comparison
322        let ge_result = self.expr().clone().gt_eq(other.expr().clone());
323
324        // Wrap: if either is null, return null boolean, else return comparison result
325        let null_boolean = Self::null_boolean_expr();
326        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
327            .then(&Self::from_expr(null_boolean, None))
328            .otherwise(&Self::from_expr(ge_result, None));
329
330        Self::from_expr(null_aware_expr.into_expr(), None)
331    }
332
333    /// PySpark-style less-than comparison
334    /// Any comparison involving NULL returns NULL
335    pub fn lt_pyspark(&self, other: &Column) -> Column {
336        // Check if either side is NULL
337        let left_null = self.expr().clone().is_null();
338        let right_null = other.expr().clone().is_null();
339        let either_null = left_null.clone().or(right_null.clone());
340
341        // Standard less-than comparison
342        let lt_result = self.expr().clone().lt(other.expr().clone());
343
344        // Wrap: if either is null, return null boolean, else return comparison result
345        let null_boolean = Self::null_boolean_expr();
346        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
347            .then(&Self::from_expr(null_boolean, None))
348            .otherwise(&Self::from_expr(lt_result, None));
349
350        Self::from_expr(null_aware_expr.into_expr(), None)
351    }
352
353    /// PySpark-style less-than-or-equal comparison
354    /// Any comparison involving NULL returns NULL
355    pub fn le_pyspark(&self, other: &Column) -> Column {
356        // Check if either side is NULL
357        let left_null = self.expr().clone().is_null();
358        let right_null = other.expr().clone().is_null();
359        let either_null = left_null.clone().or(right_null.clone());
360
361        // Standard less-than-or-equal comparison
362        let le_result = self.expr().clone().lt_eq(other.expr().clone());
363
364        // Wrap: if either is null, return null boolean, else return comparison result
365        let null_boolean = Self::null_boolean_expr();
366        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
367            .then(&Self::from_expr(null_boolean, None))
368            .otherwise(&Self::from_expr(le_result, None));
369
370        Self::from_expr(null_aware_expr.into_expr(), None)
371    }
372
373    // Standard comparison methods that work with Expr (for literals and columns)
374    // These delegate to Polars and may not match PySpark null semantics exactly.
375    // Use _pyspark variants for explicit PySpark semantics.
376
377    /// Greater than comparison
378    pub fn gt(&self, other: Expr) -> Column {
379        Self::from_expr(self.expr().clone().gt(other), None)
380    }
381
382    /// Greater than or equal comparison
383    pub fn gt_eq(&self, other: Expr) -> Column {
384        Self::from_expr(self.expr().clone().gt_eq(other), None)
385    }
386
387    /// Less than comparison
388    pub fn lt(&self, other: Expr) -> Column {
389        Self::from_expr(self.expr().clone().lt(other), None)
390    }
391
392    /// Less than or equal comparison
393    pub fn lt_eq(&self, other: Expr) -> Column {
394        Self::from_expr(self.expr().clone().lt_eq(other), None)
395    }
396
397    /// Equality comparison
398    pub fn eq(&self, other: Expr) -> Column {
399        Self::from_expr(self.expr().clone().eq(other), None)
400    }
401
402    /// Inequality comparison
403    pub fn neq(&self, other: Expr) -> Column {
404        Self::from_expr(self.expr().clone().neq(other), None)
405    }
406
407    // Equality comparison with special handling for string-vs-numeric literals (issue #235).
408    //
409    // When comparing a column to a numeric literal (e.g. col("s") == lit(123)), Polars
410    // normally raises `cannot compare string with numeric type` if the column is a
411    // string column. PySpark, however, coerces types (string → numeric) and performs
412    // the comparison, treating invalid strings as null (non-matching in filters).
413
414    // --- String functions ---
415
416    /// Convert string column to uppercase (PySpark upper)
417    pub fn upper(&self) -> Column {
418        Self::from_expr(self.expr().clone().str().to_uppercase(), None)
419    }
420
421    /// Convert string column to lowercase (PySpark lower)
422    pub fn lower(&self) -> Column {
423        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
424    }
425
426    /// Alias for lower. PySpark lcase.
427    pub fn lcase(&self) -> Column {
428        self.lower()
429    }
430
431    /// Alias for upper. PySpark ucase.
432    pub fn ucase(&self) -> Column {
433        self.upper()
434    }
435
436    /// Substring with 1-based start (PySpark substring semantics)
437    pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
438        use polars::prelude::*;
439        let offset = (start - 1).max(0);
440        let offset_expr = lit(offset);
441        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX)); // No length = rest of string
442        Self::from_expr(
443            self.expr().clone().str().slice(offset_expr, length_expr),
444            None,
445        )
446    }
447
448    /// String length in characters (PySpark length)
449    pub fn length(&self) -> Column {
450        Self::from_expr(self.expr().clone().str().len_chars(), None)
451    }
452
453    /// Bit length of string in bytes * 8 (PySpark bit_length).
454    pub fn bit_length(&self) -> Column {
455        use polars::prelude::*;
456        let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
457        Self::from_expr(len_bytes * lit(8i32), None)
458    }
459
460    /// Length of string in bytes (PySpark octet_length).
461    pub fn octet_length(&self) -> Column {
462        use polars::prelude::*;
463        Self::from_expr(
464            self.expr().clone().str().len_bytes().cast(DataType::Int32),
465            None,
466        )
467    }
468
469    /// Length of string in characters (PySpark char_length). Alias of length().
470    pub fn char_length(&self) -> Column {
471        self.length()
472    }
473
474    /// Length of string in characters (PySpark character_length). Alias of length().
475    pub fn character_length(&self) -> Column {
476        self.length()
477    }
478
479    /// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
480    pub fn encode(&self, charset: &str) -> Column {
481        let charset = charset.to_string();
482        let expr = self.expr().clone().map(
483            move |s| crate::udfs::apply_encode(s, &charset),
484            GetOutput::from_type(DataType::String),
485        );
486        Self::from_expr(expr, None)
487    }
488
489    /// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
490    pub fn decode(&self, charset: &str) -> Column {
491        let charset = charset.to_string();
492        let expr = self.expr().clone().map(
493            move |s| crate::udfs::apply_decode(s, &charset),
494            GetOutput::from_type(DataType::String),
495        );
496        Self::from_expr(expr, None)
497    }
498
499    /// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'. Returns hex string.
500    pub fn to_binary(&self, fmt: &str) -> Column {
501        let fmt = fmt.to_string();
502        let expr = self.expr().clone().map(
503            move |s| crate::udfs::apply_to_binary(s, &fmt),
504            GetOutput::from_type(DataType::String),
505        );
506        Self::from_expr(expr, None)
507    }
508
509    /// Try convert to binary; null on failure (PySpark try_to_binary).
510    pub fn try_to_binary(&self, fmt: &str) -> Column {
511        let fmt = fmt.to_string();
512        let expr = self.expr().clone().map(
513            move |s| crate::udfs::apply_try_to_binary(s, &fmt),
514            GetOutput::from_type(DataType::String),
515        );
516        Self::from_expr(expr, None)
517    }
518
519    /// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM. Output hex(nonce||ciphertext).
520    pub fn aes_encrypt(&self, key: &str) -> Column {
521        let key = key.to_string();
522        let expr = self.expr().clone().map(
523            move |s| crate::udfs::apply_aes_encrypt(s, &key),
524            GetOutput::from_type(DataType::String),
525        );
526        Self::from_expr(expr, None)
527    }
528
529    /// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext). Null on failure.
530    pub fn aes_decrypt(&self, key: &str) -> Column {
531        let key = key.to_string();
532        let expr = self.expr().clone().map(
533            move |s| crate::udfs::apply_aes_decrypt(s, &key),
534            GetOutput::from_type(DataType::String),
535        );
536        Self::from_expr(expr, None)
537    }
538
539    /// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
540    pub fn try_aes_decrypt(&self, key: &str) -> Column {
541        let key = key.to_string();
542        let expr = self.expr().clone().map(
543            move |s| crate::udfs::apply_try_aes_decrypt(s, &key),
544            GetOutput::from_type(DataType::String),
545        );
546        Self::from_expr(expr, None)
547    }
548
549    /// Data type as string (PySpark typeof). Uses dtype from schema.
550    pub fn typeof_(&self) -> Column {
551        Self::from_expr(
552            self.expr().clone().map(
553                crate::udfs::apply_typeof,
554                GetOutput::from_type(DataType::String),
555            ),
556            None,
557        )
558    }
559
560    /// Trim leading and trailing whitespace (PySpark trim)
561    pub fn trim(&self) -> Column {
562        use polars::prelude::*;
563        Self::from_expr(self.expr().clone().str().strip_chars(lit(" \t\n\r")), None)
564    }
565
566    /// Trim leading whitespace (PySpark ltrim)
567    pub fn ltrim(&self) -> Column {
568        use polars::prelude::*;
569        Self::from_expr(
570            self.expr().clone().str().strip_chars_start(lit(" \t\n\r")),
571            None,
572        )
573    }
574
575    /// Trim trailing whitespace (PySpark rtrim)
576    pub fn rtrim(&self) -> Column {
577        use polars::prelude::*;
578        Self::from_expr(
579            self.expr().clone().str().strip_chars_end(lit(" \t\n\r")),
580            None,
581        )
582    }
583
584    /// Trim leading and trailing characters (PySpark btrim). trim_str defaults to whitespace.
585    pub fn btrim(&self, trim_str: Option<&str>) -> Column {
586        use polars::prelude::*;
587        let chars = trim_str.unwrap_or(" \t\n\r");
588        Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
589    }
590
591    /// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
592    pub fn locate(&self, substr: &str, pos: i64) -> Column {
593        use polars::prelude::*;
594        if substr.is_empty() {
595            return Self::from_expr(lit(1i64), None);
596        }
597        let start = (pos - 1).max(0);
598        let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
599        let found = slice_expr.str().find_literal(lit(substr.to_string()));
600        Self::from_expr(
601            (found.cast(DataType::Int64) + lit(start + 1)).fill_null(lit(0i64)),
602            None,
603        )
604    }
605
606    /// Base conversion (PySpark conv). num_str from from_base to to_base.
607    pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
608        let expr = self.expr().clone().map(
609            move |s| crate::udfs::apply_conv(s, from_base, to_base),
610            GetOutput::from_type(DataType::String),
611        );
612        Self::from_expr(expr, None)
613    }
614
615    /// Convert to hex string (PySpark hex). Int or string input.
616    pub fn hex(&self) -> Column {
617        let expr = self.expr().clone().map(
618            crate::udfs::apply_hex,
619            GetOutput::from_type(DataType::String),
620        );
621        Self::from_expr(expr, None)
622    }
623
624    /// Convert hex string to binary/string (PySpark unhex).
625    pub fn unhex(&self) -> Column {
626        let expr = self.expr().clone().map(
627            crate::udfs::apply_unhex,
628            GetOutput::from_type(DataType::String),
629        );
630        Self::from_expr(expr, None)
631    }
632
633    /// Convert integer to binary string (PySpark bin).
634    pub fn bin(&self) -> Column {
635        let expr = self.expr().clone().map(
636            crate::udfs::apply_bin,
637            GetOutput::from_type(DataType::String),
638        );
639        Self::from_expr(expr, None)
640    }
641
642    /// Get bit at 0-based position (PySpark getbit).
643    pub fn getbit(&self, pos: i64) -> Column {
644        let expr = self.expr().clone().map(
645            move |s| crate::udfs::apply_getbit(s, pos),
646            GetOutput::from_type(DataType::Int64),
647        );
648        Self::from_expr(expr, None)
649    }
650
651    /// Bitwise AND of two integer/boolean columns (PySpark bit_and).
652    pub fn bit_and(&self, other: &Column) -> Column {
653        let args = [other.expr().clone()];
654        let expr = self.expr().clone().cast(DataType::Int64).map_many(
655            crate::udfs::apply_bit_and,
656            &args,
657            GetOutput::from_type(DataType::Int64),
658        );
659        Self::from_expr(expr, None)
660    }
661
662    /// Bitwise OR of two integer/boolean columns (PySpark bit_or).
663    pub fn bit_or(&self, other: &Column) -> Column {
664        let args = [other.expr().clone()];
665        let expr = self.expr().clone().cast(DataType::Int64).map_many(
666            crate::udfs::apply_bit_or,
667            &args,
668            GetOutput::from_type(DataType::Int64),
669        );
670        Self::from_expr(expr, None)
671    }
672
673    /// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
674    pub fn bit_xor(&self, other: &Column) -> Column {
675        let args = [other.expr().clone()];
676        let expr = self.expr().clone().cast(DataType::Int64).map_many(
677            crate::udfs::apply_bit_xor,
678            &args,
679            GetOutput::from_type(DataType::Int64),
680        );
681        Self::from_expr(expr, None)
682    }
683
684    /// Count of set bits in the integer representation (PySpark bit_count).
685    pub fn bit_count(&self) -> Column {
686        let expr = self.expr().clone().map(
687            crate::udfs::apply_bit_count,
688            GetOutput::from_type(DataType::Int64),
689        );
690        Self::from_expr(expr, None)
691    }
692
693    /// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
694    /// When err_msg is Some, it is used in the error message when assertion fails.
695    pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
696        let msg = err_msg.map(String::from);
697        let expr = self.expr().clone().map(
698            move |c| crate::udfs::apply_assert_true(c, msg.as_deref()),
699            GetOutput::same_type(),
700        );
701        Self::from_expr(expr, None)
702    }
703
704    /// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
705    pub fn bitwise_not(&self) -> Column {
706        // Use arithmetic identity: !n == -1 - n for two's-complement integers.
707        let expr = (lit(-1i64) - self.expr().clone().cast(DataType::Int64)).cast(DataType::Int64);
708        Self::from_expr(expr, None)
709    }
710
711    /// Parse string to map (PySpark str_to_map). "k1:v1,k2:v2" -> map.
712    pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
713        let pair_delim = pair_delim.to_string();
714        let key_value_delim = key_value_delim.to_string();
715        let expr = self.expr().clone().map(
716            move |s| crate::udfs::apply_str_to_map(s, &pair_delim, &key_value_delim),
717            GetOutput::same_type(),
718        );
719        Self::from_expr(expr, None)
720    }
721
722    /// Extract first match of regex pattern (PySpark regexp_extract). Group 0 = full match.
723    pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
724        use polars::prelude::*;
725        let pat = pattern.to_string();
726        Self::from_expr(
727            self.expr().clone().str().extract(lit(pat), group_index),
728            None,
729        )
730    }
731
732    /// Replace first match of regex pattern (PySpark regexp_replace). literal=false for regex.
733    pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
734        use polars::prelude::*;
735        let pat = pattern.to_string();
736        let rep = replacement.to_string();
737        Self::from_expr(
738            self.expr().clone().str().replace(lit(pat), lit(rep), false),
739            None,
740        )
741    }
742
743    /// Leftmost n characters (PySpark left).
744    pub fn left(&self, n: i64) -> Column {
745        use polars::prelude::*;
746        let len = n.max(0) as u32;
747        Self::from_expr(
748            self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
749            None,
750        )
751    }
752
753    /// Rightmost n characters (PySpark right).
754    pub fn right(&self, n: i64) -> Column {
755        use polars::prelude::*;
756        let n_val = n.max(0);
757        let n_expr = lit(n_val);
758        let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
759        let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
760            .then(lit(0i64))
761            .otherwise(len_chars - n_expr.clone());
762        Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
763    }
764
765    /// Replace all occurrences of literal search string with replacement (PySpark replace for literal).
766    pub fn replace(&self, search: &str, replacement: &str) -> Column {
767        use polars::prelude::*;
768        Self::from_expr(
769            self.expr().clone().str().replace_all(
770                lit(search.to_string()),
771                lit(replacement.to_string()),
772                true,
773            ),
774            None,
775        )
776    }
777
778    /// True if string starts with prefix (PySpark startswith).
779    pub fn startswith(&self, prefix: &str) -> Column {
780        use polars::prelude::*;
781        Self::from_expr(
782            self.expr()
783                .clone()
784                .str()
785                .starts_with(lit(prefix.to_string())),
786            None,
787        )
788    }
789
790    /// True if string ends with suffix (PySpark endswith).
791    pub fn endswith(&self, suffix: &str) -> Column {
792        use polars::prelude::*;
793        Self::from_expr(
794            self.expr().clone().str().ends_with(lit(suffix.to_string())),
795            None,
796        )
797    }
798
799    /// True if string contains substring (literal, not regex). PySpark contains.
800    pub fn contains(&self, substring: &str) -> Column {
801        use polars::prelude::*;
802        Self::from_expr(
803            self.expr()
804                .clone()
805                .str()
806                .contains(lit(substring.to_string()), true),
807            None,
808        )
809    }
810
811    /// Split string by delimiter (PySpark split). Returns list of strings.
812    /// Uses literal split so "|" is not interpreted as regex alternation.
813    pub fn split(&self, delimiter: &str) -> Column {
814        use polars::prelude::*;
815        Self::from_expr(
816            self.expr().clone().str().split(lit(delimiter.to_string())),
817            None,
818        )
819    }
820
821    /// Title case: first letter of each word uppercase (PySpark initcap).
822    /// Approximates with lowercase when Polars to_titlecase is not enabled.
823    pub fn initcap(&self) -> Column {
824        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
825    }
826
827    /// Extract all matches of regex (PySpark regexp_extract_all). Returns list of strings.
828    pub fn regexp_extract_all(&self, pattern: &str) -> Column {
829        use polars::prelude::*;
830        Self::from_expr(
831            self.expr()
832                .clone()
833                .str()
834                .extract_all(lit(pattern.to_string())),
835            None,
836        )
837    }
838
839    /// Check if string matches regex (PySpark regexp_like / rlike).
840    pub fn regexp_like(&self, pattern: &str) -> Column {
841        use polars::prelude::*;
842        Self::from_expr(
843            self.expr()
844                .clone()
845                .str()
846                .contains(lit(pattern.to_string()), false),
847            None,
848        )
849    }
850
851    /// Count of non-overlapping regex matches (PySpark regexp_count).
852    pub fn regexp_count(&self, pattern: &str) -> Column {
853        use polars::prelude::*;
854        Self::from_expr(
855            self.expr()
856                .clone()
857                .str()
858                .count_matches(lit(pattern.to_string()), false)
859                .cast(DataType::Int64),
860            None,
861        )
862    }
863
864    /// First substring matching regex (PySpark regexp_substr). Null if no match.
865    pub fn regexp_substr(&self, pattern: &str) -> Column {
866        self.regexp_extract(pattern, 0)
867    }
868
869    /// 1-based position of first regex match (PySpark regexp_instr). group_idx 0 = full match; null if no match.
870    pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
871        let idx = group_idx.unwrap_or(0);
872        let pattern = pattern.to_string();
873        let expr = self.expr().clone().map(
874            move |s| crate::udfs::apply_regexp_instr(s, pattern.clone(), idx),
875            GetOutput::from_type(DataType::Int64),
876        );
877        Self::from_expr(expr, None)
878    }
879
880    /// 1-based index of self in comma-delimited set column (PySpark find_in_set). 0 if not found or self contains comma.
881    pub fn find_in_set(&self, set_column: &Column) -> Column {
882        let args = [set_column.expr().clone()];
883        let expr = self.expr().clone().map_many(
884            crate::udfs::apply_find_in_set,
885            &args,
886            GetOutput::from_type(DataType::Int64),
887        );
888        Self::from_expr(expr, None)
889    }
890
891    /// Repeat string column n times (PySpark repeat). Each element repeated n times.
892    pub fn repeat(&self, n: i32) -> Column {
893        use polars::prelude::*;
894        // repeat_by yields List[str]; join to get a single string per row.
895        Self::from_expr(
896            self.expr()
897                .clone()
898                .repeat_by(lit(n as u32))
899                .list()
900                .join(lit(""), false),
901            None,
902        )
903    }
904
905    /// Reverse string (PySpark reverse).
906    pub fn reverse(&self) -> Column {
907        Self::from_expr(self.expr().clone().str().reverse(), None)
908    }
909
910    /// Find substring position (1-based; 0 if not found). PySpark instr(col, substr).
911    pub fn instr(&self, substr: &str) -> Column {
912        use polars::prelude::*;
913        let found = self
914            .expr()
915            .clone()
916            .str()
917            .find_literal(lit(substr.to_string()));
918        // Polars find_literal returns 0-based index (null if not found); PySpark is 1-based, 0 when not found.
919        Self::from_expr(
920            (found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
921            None,
922        )
923    }
924
925    /// Left-pad string to length with pad character (PySpark lpad).
926    pub fn lpad(&self, length: i32, pad: &str) -> Column {
927        let pad_str = if pad.is_empty() { " " } else { pad };
928        let fill = pad_str.chars().next().unwrap_or(' ');
929        Self::from_expr(
930            self.expr().clone().str().pad_start(length as usize, fill),
931            None,
932        )
933    }
934
935    /// Right-pad string to length with pad character (PySpark rpad).
936    pub fn rpad(&self, length: i32, pad: &str) -> Column {
937        let pad_str = if pad.is_empty() { " " } else { pad };
938        let fill = pad_str.chars().next().unwrap_or(' ');
939        Self::from_expr(
940            self.expr().clone().str().pad_end(length as usize, fill),
941            None,
942        )
943    }
944
945    /// Character-by-character translation (PySpark translate). Replaces each char in from_str with corresponding in to_str; if to_str is shorter, extra from chars are removed.
946    pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
947        use polars::prelude::*;
948        let mut e = self.expr().clone();
949        let from_chars: Vec<char> = from_str.chars().collect();
950        let to_chars: Vec<char> = to_str.chars().collect();
951        for (i, fc) in from_chars.iter().enumerate() {
952            let f = fc.to_string();
953            let t = to_chars
954                .get(i)
955                .map(|c| c.to_string())
956                .unwrap_or_else(String::new); // PySpark: no replacement = drop char
957            e = e.str().replace_all(lit(f), lit(t), true);
958        }
959        Self::from_expr(e, None)
960    }
961
962    /// Mask string: replace uppercase with upper_char, lowercase with lower_char, digits with digit_char (PySpark mask).
963    /// Defaults: upper 'X', lower 'x', digit 'n'; other chars unchanged.
964    pub fn mask(
965        &self,
966        upper_char: Option<char>,
967        lower_char: Option<char>,
968        digit_char: Option<char>,
969        other_char: Option<char>,
970    ) -> Column {
971        use polars::prelude::*;
972        let upper = upper_char.unwrap_or('X').to_string();
973        let lower = lower_char.unwrap_or('x').to_string();
974        let digit = digit_char.unwrap_or('n').to_string();
975        let other = other_char.map(|c| c.to_string());
976        let mut e = self
977            .expr()
978            .clone()
979            .str()
980            .replace_all(lit("[A-Z]".to_string()), lit(upper), false)
981            .str()
982            .replace_all(lit("[a-z]".to_string()), lit(lower), false)
983            .str()
984            .replace_all(lit(r"\d".to_string()), lit(digit), false);
985        if let Some(o) = other {
986            e = e
987                .str()
988                .replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
989        }
990        Self::from_expr(e, None)
991    }
992
993    /// Split by delimiter and return 1-based part (PySpark split_part).
994    /// part_num > 0: from left; part_num < 0: from right; part_num = 0: null; out-of-range: empty string.
995    pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
996        use polars::prelude::*;
997        if part_num == 0 {
998            return Self::from_expr(Expr::Literal(LiteralValue::Null), None);
999        }
1000        let use_regex = delimiter == "|";
1001        if use_regex {
1002            let pattern = delimiter.to_string();
1003            let part = part_num;
1004            let get_expr = self.expr().clone().map(
1005                move |col| crate::udfs::apply_split_part_regex(col, &pattern, part),
1006                GetOutput::from_type(DataType::String),
1007            );
1008            let expr = when(self.expr().clone().is_null())
1009                .then(Expr::Literal(LiteralValue::Null))
1010                .otherwise(get_expr.fill_null(lit("")));
1011            return Self::from_expr(expr, None);
1012        }
1013        let delim = delimiter.to_string();
1014        let split_expr = self.expr().clone().str().split(lit(delim));
1015        let index = if part_num > 0 {
1016            lit(part_num - 1)
1017        } else {
1018            lit(part_num)
1019        };
1020        let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
1021        let expr = when(self.expr().clone().is_null())
1022            .then(Expr::Literal(LiteralValue::Null))
1023            .otherwise(get_expr);
1024        Self::from_expr(expr, None)
1025    }
1026
1027    /// Substring before/after nth delimiter (PySpark substring_index). count > 0: before nth from left; count < 0: after nth from right.
1028    pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
1029        use polars::prelude::*;
1030        let delim = delimiter.to_string();
1031        let split_expr = self.expr().clone().str().split(lit(delim.clone()));
1032        let n = count.unsigned_abs() as i64;
1033        let expr = if count > 0 {
1034            split_expr
1035                .clone()
1036                .list()
1037                .slice(lit(0i64), lit(n))
1038                .list()
1039                .join(lit(delim), false)
1040        } else {
1041            let len = split_expr.clone().list().len();
1042            let start = when(len.clone().gt(lit(n)))
1043                .then(len.clone() - lit(n))
1044                .otherwise(lit(0i64));
1045            let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
1046            split_expr
1047                .list()
1048                .slice(start, slice_len)
1049                .list()
1050                .join(lit(delim), false)
1051        };
1052        Self::from_expr(expr, None)
1053    }
1054
1055    /// Soundex code (PySpark soundex). Implemented via map UDF (strsim/soundex crates).
1056    pub fn soundex(&self) -> Column {
1057        let expr = self
1058            .expr()
1059            .clone()
1060            .map(crate::udfs::apply_soundex, GetOutput::same_type());
1061        Self::from_expr(expr, None)
1062    }
1063
1064    /// Levenshtein distance to another string (PySpark levenshtein). Implemented via map_many UDF (strsim).
1065    pub fn levenshtein(&self, other: &Column) -> Column {
1066        let args = [other.expr().clone()];
1067        let expr = self.expr().clone().map_many(
1068            crate::udfs::apply_levenshtein,
1069            &args,
1070            GetOutput::from_type(DataType::Int64),
1071        );
1072        Self::from_expr(expr, None)
1073    }
1074
1075    /// CRC32 checksum of string bytes (PySpark crc32). Implemented via map UDF (crc32fast).
1076    pub fn crc32(&self) -> Column {
1077        let expr = self.expr().clone().map(
1078            crate::udfs::apply_crc32,
1079            GetOutput::from_type(DataType::Int64),
1080        );
1081        Self::from_expr(expr, None)
1082    }
1083
1084    /// XXH64 hash of string (PySpark xxhash64). Implemented via map UDF (twox-hash).
1085    pub fn xxhash64(&self) -> Column {
1086        let expr = self.expr().clone().map(
1087            crate::udfs::apply_xxhash64,
1088            GetOutput::from_type(DataType::Int64),
1089        );
1090        Self::from_expr(expr, None)
1091    }
1092
1093    /// ASCII value of first character (PySpark ascii). Returns Int32.
1094    pub fn ascii(&self) -> Column {
1095        let expr = self.expr().clone().map(
1096            crate::udfs::apply_ascii,
1097            GetOutput::from_type(DataType::Int32),
1098        );
1099        Self::from_expr(expr, None)
1100    }
1101
1102    /// Format numeric as string with fixed decimal places (PySpark format_number).
1103    pub fn format_number(&self, decimals: u32) -> Column {
1104        let expr = self.expr().clone().map(
1105            move |s| crate::udfs::apply_format_number(s, decimals),
1106            GetOutput::from_type(DataType::String),
1107        );
1108        Self::from_expr(expr, None)
1109    }
1110
1111    /// Int to single-character string (PySpark char / chr). Valid codepoint only.
1112    pub fn char(&self) -> Column {
1113        let expr = self.expr().clone().map(
1114            crate::udfs::apply_char,
1115            GetOutput::from_type(DataType::String),
1116        );
1117        Self::from_expr(expr, None)
1118    }
1119
1120    /// Alias for char (PySpark chr).
1121    pub fn chr(&self) -> Column {
1122        self.char()
1123    }
1124
1125    /// Base64 encode string bytes (PySpark base64).
1126    pub fn base64(&self) -> Column {
1127        let expr = self
1128            .expr()
1129            .clone()
1130            .map(crate::udfs::apply_base64, GetOutput::same_type());
1131        Self::from_expr(expr, None)
1132    }
1133
1134    /// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1135    pub fn unbase64(&self) -> Column {
1136        let expr = self
1137            .expr()
1138            .clone()
1139            .map(crate::udfs::apply_unbase64, GetOutput::same_type());
1140        Self::from_expr(expr, None)
1141    }
1142
1143    /// SHA1 hash of string bytes, return hex string (PySpark sha1).
1144    pub fn sha1(&self) -> Column {
1145        let expr = self
1146            .expr()
1147            .clone()
1148            .map(crate::udfs::apply_sha1, GetOutput::same_type());
1149        Self::from_expr(expr, None)
1150    }
1151
1152    /// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2). Default 256.
1153    pub fn sha2(&self, bit_length: i32) -> Column {
1154        let expr = self.expr().clone().map(
1155            move |s| crate::udfs::apply_sha2(s, bit_length),
1156            GetOutput::same_type(),
1157        );
1158        Self::from_expr(expr, None)
1159    }
1160
1161    /// MD5 hash of string bytes, return hex string (PySpark md5).
1162    pub fn md5(&self) -> Column {
1163        let expr = self
1164            .expr()
1165            .clone()
1166            .map(crate::udfs::apply_md5, GetOutput::same_type());
1167        Self::from_expr(expr, None)
1168    }
1169
1170    /// Replace substring at 1-based position (PySpark overlay). replace is literal string.
1171    pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
1172        use polars::prelude::*;
1173        let pos = pos.max(1);
1174        let replace_len = length.max(0);
1175        let start_left = 0i64;
1176        let len_left = (pos - 1).max(0);
1177        let start_right = (pos - 1 + replace_len).max(0);
1178        let len_right = 1_000_000i64; // "rest of string"
1179        let left = self
1180            .expr()
1181            .clone()
1182            .str()
1183            .slice(lit(start_left), lit(len_left));
1184        let mid = lit(replace.to_string());
1185        let right = self
1186            .expr()
1187            .clone()
1188            .str()
1189            .slice(lit(start_right), lit(len_right));
1190        let exprs = [left, mid, right];
1191        let concat_expr = polars::prelude::concat_str(&exprs, "", false);
1192        Self::from_expr(concat_expr, None)
1193    }
1194
1195    // --- Math functions ---
1196
1197    /// Absolute value (PySpark abs)
1198    pub fn abs(&self) -> Column {
1199        Self::from_expr(self.expr().clone().abs(), None)
1200    }
1201
1202    /// Ceiling (PySpark ceil)
1203    pub fn ceil(&self) -> Column {
1204        Self::from_expr(self.expr().clone().ceil(), None)
1205    }
1206
1207    /// Alias for ceil. PySpark ceiling.
1208    pub fn ceiling(&self) -> Column {
1209        self.ceil()
1210    }
1211
1212    /// Floor (PySpark floor)
1213    pub fn floor(&self) -> Column {
1214        Self::from_expr(self.expr().clone().floor(), None)
1215    }
1216
1217    /// Round to given decimal places (PySpark round)
1218    pub fn round(&self, decimals: u32) -> Column {
1219        Self::from_expr(self.expr().clone().round(decimals), None)
1220    }
1221
1222    /// Banker's rounding - round half to even (PySpark bround).
1223    pub fn bround(&self, scale: i32) -> Column {
1224        let expr = self.expr().clone().map(
1225            move |s| crate::udfs::apply_bround(s, scale),
1226            GetOutput::from_type(DataType::Float64),
1227        );
1228        Self::from_expr(expr, None)
1229    }
1230
1231    /// Unary minus (PySpark negate, negative).
1232    pub fn negate(&self) -> Column {
1233        use polars::prelude::*;
1234        Self::from_expr(self.expr().clone() * lit(-1), None)
1235    }
1236
1237    /// Multiply with PySpark-style string/number coercion (used by Python Column operators).
1238    ///
1239    /// Both operands are coerced to Double when used from Python; string columns are parsed
1240    /// as doubles where possible, invalid strings become null.
1241    pub fn multiply_pyspark(&self, other: &Column) -> Column {
1242        use polars::prelude::GetOutput;
1243        let args = [other.expr().clone()];
1244        let expr = self.expr().clone().map_many(
1245            crate::udfs::apply_pyspark_multiply,
1246            &args,
1247            GetOutput::from_type(DataType::Float64),
1248        );
1249        Self::from_expr(expr, None)
1250    }
1251
1252    /// Add with PySpark-style string/number coercion (used by Python Column operators).
1253    pub fn add_pyspark(&self, other: &Column) -> Column {
1254        use polars::prelude::GetOutput;
1255        let args = [other.expr().clone()];
1256        let expr = self.expr().clone().map_many(
1257            crate::udfs::apply_pyspark_add,
1258            &args,
1259            GetOutput::from_type(DataType::Float64),
1260        );
1261        Self::from_expr(expr, None)
1262    }
1263
1264    /// Subtract with PySpark-style string/number coercion (used by Python Column operators).
1265    pub fn subtract_pyspark(&self, other: &Column) -> Column {
1266        use polars::prelude::GetOutput;
1267        let args = [other.expr().clone()];
1268        let expr = self.expr().clone().map_many(
1269            crate::udfs::apply_pyspark_subtract,
1270            &args,
1271            GetOutput::from_type(DataType::Float64),
1272        );
1273        Self::from_expr(expr, None)
1274    }
1275
1276    /// Divide with PySpark-style string/number coercion (used by Python Column operators).
1277    pub fn divide_pyspark(&self, other: &Column) -> Column {
1278        use polars::prelude::GetOutput;
1279        let args = [other.expr().clone()];
1280        let expr = self.expr().clone().map_many(
1281            crate::udfs::apply_pyspark_divide,
1282            &args,
1283            GetOutput::from_type(DataType::Float64),
1284        );
1285        Self::from_expr(expr, None)
1286    }
1287
1288    /// Modulo with PySpark-style string/number coercion (used by Python Column operators).
1289    pub fn mod_pyspark(&self, other: &Column) -> Column {
1290        use polars::prelude::GetOutput;
1291        let args = [other.expr().clone()];
1292        let expr = self.expr().clone().map_many(
1293            crate::udfs::apply_pyspark_mod,
1294            &args,
1295            GetOutput::from_type(DataType::Float64),
1296        );
1297        Self::from_expr(expr, None)
1298    }
1299
1300    /// Multiply by another column or literal (PySpark multiply). Broadcasts scalars.
1301    pub fn multiply(&self, other: &Column) -> Column {
1302        Self::from_expr(self.expr().clone() * other.expr().clone(), None)
1303    }
1304
1305    /// Add another column or literal (PySpark +). Broadcasts scalars.
1306    pub fn add(&self, other: &Column) -> Column {
1307        Self::from_expr(self.expr().clone() + other.expr().clone(), None)
1308    }
1309
1310    /// Subtract another column or literal (PySpark -). Broadcasts scalars.
1311    pub fn subtract(&self, other: &Column) -> Column {
1312        Self::from_expr(self.expr().clone() - other.expr().clone(), None)
1313    }
1314
1315    /// Divide by another column or literal (PySpark /). Broadcasts scalars.
1316    pub fn divide(&self, other: &Column) -> Column {
1317        Self::from_expr(self.expr().clone() / other.expr().clone(), None)
1318    }
1319
1320    /// Modulo (PySpark %). Broadcasts scalars.
1321    pub fn mod_(&self, other: &Column) -> Column {
1322        Self::from_expr(self.expr().clone() % other.expr().clone(), None)
1323    }
1324
1325    /// Square root (PySpark sqrt)
1326    pub fn sqrt(&self) -> Column {
1327        Self::from_expr(self.expr().clone().sqrt(), None)
1328    }
1329
1330    /// Power (PySpark pow). Exponent can be literal or expression.
1331    pub fn pow(&self, exp: i64) -> Column {
1332        use polars::prelude::*;
1333        Self::from_expr(self.expr().clone().pow(lit(exp)), None)
1334    }
1335
1336    /// Alias for pow. PySpark power.
1337    pub fn power(&self, exp: i64) -> Column {
1338        self.pow(exp)
1339    }
1340
1341    /// Exponential (PySpark exp)
1342    pub fn exp(&self) -> Column {
1343        Self::from_expr(self.expr().clone().exp(), None)
1344    }
1345
1346    /// Natural logarithm (PySpark log)
1347    pub fn log(&self) -> Column {
1348        Self::from_expr(self.expr().clone().log(std::f64::consts::E), None)
1349    }
1350
1351    /// Alias for log. PySpark ln.
1352    pub fn ln(&self) -> Column {
1353        self.log()
1354    }
1355
1356    /// Sine (radians). PySpark sin.
1357    pub fn sin(&self) -> Column {
1358        let expr = self.expr().clone().map(
1359            crate::udfs::apply_sin,
1360            GetOutput::from_type(DataType::Float64),
1361        );
1362        Self::from_expr(expr, None)
1363    }
1364
1365    /// Cosine (radians). PySpark cos.
1366    pub fn cos(&self) -> Column {
1367        let expr = self.expr().clone().map(
1368            crate::udfs::apply_cos,
1369            GetOutput::from_type(DataType::Float64),
1370        );
1371        Self::from_expr(expr, None)
1372    }
1373
1374    /// Tangent (radians). PySpark tan.
1375    pub fn tan(&self) -> Column {
1376        let expr = self.expr().clone().map(
1377            crate::udfs::apply_tan,
1378            GetOutput::from_type(DataType::Float64),
1379        );
1380        Self::from_expr(expr, None)
1381    }
1382
1383    /// Cotangent: 1/tan (PySpark cot).
1384    pub fn cot(&self) -> Column {
1385        let expr = self.expr().clone().map(
1386            crate::udfs::apply_cot,
1387            GetOutput::from_type(DataType::Float64),
1388        );
1389        Self::from_expr(expr, None)
1390    }
1391
1392    /// Cosecant: 1/sin (PySpark csc).
1393    pub fn csc(&self) -> Column {
1394        let expr = self.expr().clone().map(
1395            crate::udfs::apply_csc,
1396            GetOutput::from_type(DataType::Float64),
1397        );
1398        Self::from_expr(expr, None)
1399    }
1400
1401    /// Secant: 1/cos (PySpark sec).
1402    pub fn sec(&self) -> Column {
1403        let expr = self.expr().clone().map(
1404            crate::udfs::apply_sec,
1405            GetOutput::from_type(DataType::Float64),
1406        );
1407        Self::from_expr(expr, None)
1408    }
1409
1410    /// Arc sine. PySpark asin.
1411    pub fn asin(&self) -> Column {
1412        let expr = self.expr().clone().map(
1413            crate::udfs::apply_asin,
1414            GetOutput::from_type(DataType::Float64),
1415        );
1416        Self::from_expr(expr, None)
1417    }
1418
1419    /// Arc cosine. PySpark acos.
1420    pub fn acos(&self) -> Column {
1421        let expr = self.expr().clone().map(
1422            crate::udfs::apply_acos,
1423            GetOutput::from_type(DataType::Float64),
1424        );
1425        Self::from_expr(expr, None)
1426    }
1427
1428    /// Arc tangent. PySpark atan.
1429    pub fn atan(&self) -> Column {
1430        let expr = self.expr().clone().map(
1431            crate::udfs::apply_atan,
1432            GetOutput::from_type(DataType::Float64),
1433        );
1434        Self::from_expr(expr, None)
1435    }
1436
1437    /// Two-argument arc tangent (y, x) -> angle in radians. PySpark atan2.
1438    pub fn atan2(&self, x: &Column) -> Column {
1439        let args = [x.expr().clone()];
1440        let expr = self.expr().clone().map_many(
1441            crate::udfs::apply_atan2,
1442            &args,
1443            GetOutput::from_type(DataType::Float64),
1444        );
1445        Self::from_expr(expr, None)
1446    }
1447
1448    /// Convert radians to degrees. PySpark degrees.
1449    pub fn degrees(&self) -> Column {
1450        let expr = self.expr().clone().map(
1451            crate::udfs::apply_degrees,
1452            GetOutput::from_type(DataType::Float64),
1453        );
1454        Self::from_expr(expr, None)
1455    }
1456
1457    /// Alias for degrees. PySpark toDegrees.
1458    pub fn to_degrees(&self) -> Column {
1459        self.degrees()
1460    }
1461
1462    /// Convert degrees to radians. PySpark radians.
1463    pub fn radians(&self) -> Column {
1464        let expr = self.expr().clone().map(
1465            crate::udfs::apply_radians,
1466            GetOutput::from_type(DataType::Float64),
1467        );
1468        Self::from_expr(expr, None)
1469    }
1470
1471    /// Alias for radians. PySpark toRadians.
1472    pub fn to_radians(&self) -> Column {
1473        self.radians()
1474    }
1475
1476    /// Sign of the number (-1, 0, or 1). PySpark signum.
1477    pub fn signum(&self) -> Column {
1478        let expr = self.expr().clone().map(
1479            crate::udfs::apply_signum,
1480            GetOutput::from_type(DataType::Float64),
1481        );
1482        Self::from_expr(expr, None)
1483    }
1484
1485    /// Hyperbolic cosine. PySpark cosh.
1486    pub fn cosh(&self) -> Column {
1487        let expr = self.expr().clone().map(
1488            crate::udfs::apply_cosh,
1489            GetOutput::from_type(DataType::Float64),
1490        );
1491        Self::from_expr(expr, None)
1492    }
1493    /// Hyperbolic sine. PySpark sinh.
1494    pub fn sinh(&self) -> Column {
1495        let expr = self.expr().clone().map(
1496            crate::udfs::apply_sinh,
1497            GetOutput::from_type(DataType::Float64),
1498        );
1499        Self::from_expr(expr, None)
1500    }
1501    /// Hyperbolic tangent. PySpark tanh.
1502    pub fn tanh(&self) -> Column {
1503        let expr = self.expr().clone().map(
1504            crate::udfs::apply_tanh,
1505            GetOutput::from_type(DataType::Float64),
1506        );
1507        Self::from_expr(expr, None)
1508    }
1509    /// Inverse hyperbolic cosine. PySpark acosh.
1510    pub fn acosh(&self) -> Column {
1511        let expr = self.expr().clone().map(
1512            crate::udfs::apply_acosh,
1513            GetOutput::from_type(DataType::Float64),
1514        );
1515        Self::from_expr(expr, None)
1516    }
1517    /// Inverse hyperbolic sine. PySpark asinh.
1518    pub fn asinh(&self) -> Column {
1519        let expr = self.expr().clone().map(
1520            crate::udfs::apply_asinh,
1521            GetOutput::from_type(DataType::Float64),
1522        );
1523        Self::from_expr(expr, None)
1524    }
1525    /// Inverse hyperbolic tangent. PySpark atanh.
1526    pub fn atanh(&self) -> Column {
1527        let expr = self.expr().clone().map(
1528            crate::udfs::apply_atanh,
1529            GetOutput::from_type(DataType::Float64),
1530        );
1531        Self::from_expr(expr, None)
1532    }
1533    /// Cube root. PySpark cbrt.
1534    pub fn cbrt(&self) -> Column {
1535        let expr = self.expr().clone().map(
1536            crate::udfs::apply_cbrt,
1537            GetOutput::from_type(DataType::Float64),
1538        );
1539        Self::from_expr(expr, None)
1540    }
1541    /// exp(x) - 1. PySpark expm1.
1542    pub fn expm1(&self) -> Column {
1543        let expr = self.expr().clone().map(
1544            crate::udfs::apply_expm1,
1545            GetOutput::from_type(DataType::Float64),
1546        );
1547        Self::from_expr(expr, None)
1548    }
1549    /// log(1 + x). PySpark log1p.
1550    pub fn log1p(&self) -> Column {
1551        let expr = self.expr().clone().map(
1552            crate::udfs::apply_log1p,
1553            GetOutput::from_type(DataType::Float64),
1554        );
1555        Self::from_expr(expr, None)
1556    }
1557    /// Base-10 logarithm. PySpark log10.
1558    pub fn log10(&self) -> Column {
1559        let expr = self.expr().clone().map(
1560            crate::udfs::apply_log10,
1561            GetOutput::from_type(DataType::Float64),
1562        );
1563        Self::from_expr(expr, None)
1564    }
1565    /// Base-2 logarithm. PySpark log2.
1566    pub fn log2(&self) -> Column {
1567        let expr = self.expr().clone().map(
1568            crate::udfs::apply_log2,
1569            GetOutput::from_type(DataType::Float64),
1570        );
1571        Self::from_expr(expr, None)
1572    }
1573    /// Round to nearest integer. PySpark rint.
1574    pub fn rint(&self) -> Column {
1575        let expr = self.expr().clone().map(
1576            crate::udfs::apply_rint,
1577            GetOutput::from_type(DataType::Float64),
1578        );
1579        Self::from_expr(expr, None)
1580    }
1581
1582    /// sqrt(x^2 + y^2). PySpark hypot.
1583    pub fn hypot(&self, other: &Column) -> Column {
1584        let xx = self.expr().clone() * self.expr().clone();
1585        let yy = other.expr().clone() * other.expr().clone();
1586        Self::from_expr((xx + yy).sqrt(), None)
1587    }
1588
1589    /// Cast to the given type (PySpark cast). Fails on invalid conversion.
1590    pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
1591        crate::functions::cast(self, type_name)
1592    }
1593
1594    /// Cast to the given type, null on invalid conversion (PySpark try_cast).
1595    pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
1596        crate::functions::try_cast(self, type_name)
1597    }
1598
1599    /// True where the float value is NaN (PySpark isnan).
1600    pub fn is_nan(&self) -> Column {
1601        Self::from_expr(self.expr().clone().is_nan(), None)
1602    }
1603
1604    // --- Datetime functions ---
1605
1606    /// Extract year from datetime column (PySpark year)
1607    pub fn year(&self) -> Column {
1608        Self::from_expr(self.expr().clone().dt().year(), None)
1609    }
1610
1611    /// Extract month from datetime column (PySpark month)
1612    pub fn month(&self) -> Column {
1613        Self::from_expr(self.expr().clone().dt().month(), None)
1614    }
1615
1616    /// Extract day of month from datetime column (PySpark day)
1617    pub fn day(&self) -> Column {
1618        Self::from_expr(self.expr().clone().dt().day(), None)
1619    }
1620
1621    /// Alias for day. PySpark dayofmonth.
1622    pub fn dayofmonth(&self) -> Column {
1623        self.day()
1624    }
1625
1626    /// Extract quarter (1-4) from date/datetime column (PySpark quarter).
1627    pub fn quarter(&self) -> Column {
1628        Self::from_expr(self.expr().clone().dt().quarter(), None)
1629    }
1630
1631    /// Extract ISO week of year (1-53) (PySpark weekofyear / week).
1632    pub fn weekofyear(&self) -> Column {
1633        Self::from_expr(self.expr().clone().dt().week(), None)
1634    }
1635
1636    /// Alias for weekofyear (PySpark week).
1637    pub fn week(&self) -> Column {
1638        self.weekofyear()
1639    }
1640
1641    /// Day of week: 1 = Sunday, 2 = Monday, ..., 7 = Saturday (PySpark dayofweek).
1642    /// Polars weekday is Mon=1..Sun=7; we convert to Sun=1..Sat=7.
1643    pub fn dayofweek(&self) -> Column {
1644        let w = self.expr().clone().dt().weekday();
1645        let dayofweek = (w % lit(7i32)) + lit(1i32); // 7->1 (Sun), 1->2 (Mon), ..., 6->7 (Sat)
1646        Self::from_expr(dayofweek, None)
1647    }
1648
1649    /// Day of year (1-366) (PySpark dayofyear).
1650    pub fn dayofyear(&self) -> Column {
1651        Self::from_expr(
1652            self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
1653            None,
1654        )
1655    }
1656
1657    /// Cast to date (PySpark to_date). Drops time component from datetime/timestamp.
1658    pub fn to_date(&self) -> Column {
1659        use polars::prelude::DataType;
1660        Self::from_expr(self.expr().clone().cast(DataType::Date), None)
1661    }
1662
1663    /// Format date/datetime as string (PySpark date_format). Uses chrono strftime format.
1664    pub fn date_format(&self, format: &str) -> Column {
1665        Self::from_expr(self.expr().clone().dt().strftime(format), None)
1666    }
1667
1668    /// Extract hour from datetime column (PySpark hour).
1669    pub fn hour(&self) -> Column {
1670        Self::from_expr(self.expr().clone().dt().hour(), None)
1671    }
1672
1673    /// Extract minute from datetime column (PySpark minute).
1674    pub fn minute(&self) -> Column {
1675        Self::from_expr(self.expr().clone().dt().minute(), None)
1676    }
1677
1678    /// Extract second from datetime column (PySpark second).
1679    pub fn second(&self) -> Column {
1680        Self::from_expr(self.expr().clone().dt().second(), None)
1681    }
1682
1683    /// Extract field from date/datetime (PySpark extract). field: "year","month","day","hour","minute","second","quarter","week","dayofweek","dayofyear".
1684    pub fn extract(&self, field: &str) -> Column {
1685        use polars::prelude::*;
1686        let e = self.expr().clone();
1687        let expr = match field.trim().to_lowercase().as_str() {
1688            "year" => e.dt().year(),
1689            "month" => e.dt().month(),
1690            "day" => e.dt().day(),
1691            "hour" => e.dt().hour(),
1692            "minute" => e.dt().minute(),
1693            "second" => e.dt().second(),
1694            "quarter" => e.dt().quarter(),
1695            "week" | "weekofyear" => e.dt().week(),
1696            "dayofweek" | "dow" => {
1697                let w = e.dt().weekday();
1698                (w % lit(7i32)) + lit(1i32)
1699            }
1700            "dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
1701            _ => e.dt().year(), // fallback
1702        };
1703        Self::from_expr(expr, None)
1704    }
1705
1706    /// Timestamp to microseconds since epoch (PySpark unix_micros).
1707    pub fn unix_micros(&self) -> Column {
1708        use polars::prelude::*;
1709        Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
1710    }
1711
1712    /// Timestamp to milliseconds since epoch (PySpark unix_millis).
1713    pub fn unix_millis(&self) -> Column {
1714        use polars::prelude::*;
1715        let micros = self.expr().clone().cast(DataType::Int64);
1716        Self::from_expr(micros / lit(1000i64), None)
1717    }
1718
1719    /// Timestamp to seconds since epoch (PySpark unix_seconds).
1720    pub fn unix_seconds(&self) -> Column {
1721        use polars::prelude::*;
1722        let micros = self.expr().clone().cast(DataType::Int64);
1723        Self::from_expr(micros / lit(1_000_000i64), None)
1724    }
1725
1726    /// Weekday name "Mon","Tue",... (PySpark dayname).
1727    pub fn dayname(&self) -> Column {
1728        let expr = self.expr().clone().map(
1729            crate::udfs::apply_dayname,
1730            GetOutput::from_type(DataType::String),
1731        );
1732        Self::from_expr(expr, None)
1733    }
1734
1735    /// Weekday 0=Mon, 6=Sun (PySpark weekday).
1736    pub fn weekday(&self) -> Column {
1737        let expr = self.expr().clone().map(
1738            crate::udfs::apply_weekday,
1739            GetOutput::from_type(DataType::Int32),
1740        );
1741        Self::from_expr(expr, None)
1742    }
1743
1744    /// Add n days to date/datetime column (PySpark date_add).
1745    pub fn date_add(&self, n: i32) -> Column {
1746        use polars::prelude::*;
1747        let date_expr = self.expr().clone().cast(DataType::Date);
1748        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
1749        Self::from_expr(date_expr + dur, None)
1750    }
1751
1752    /// Subtract n days from date/datetime column (PySpark date_sub).
1753    pub fn date_sub(&self, n: i32) -> Column {
1754        use polars::prelude::*;
1755        let date_expr = self.expr().clone().cast(DataType::Date);
1756        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
1757        Self::from_expr(date_expr - dur, None)
1758    }
1759
1760    /// Number of days between two date/datetime columns (PySpark datediff). (end - start).
1761    pub fn datediff(&self, other: &Column) -> Column {
1762        use polars::prelude::*;
1763        let start = self.expr().clone().cast(DataType::Date);
1764        let end = other.expr().clone().cast(DataType::Date);
1765        Self::from_expr((end - start).dt().total_days(), None)
1766    }
1767
1768    /// Last day of the month for date/datetime column (PySpark last_day).
1769    pub fn last_day(&self) -> Column {
1770        Self::from_expr(self.expr().clone().dt().month_end(), None)
1771    }
1772
1773    /// Add amount of unit to timestamp (PySpark timestampadd). unit: DAY, HOUR, MINUTE, SECOND, etc.
1774    pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
1775        use polars::prelude::*;
1776        let ts = self.expr().clone();
1777        let amt = amount.expr().clone().cast(DataType::Int64);
1778        let dur = match unit.trim().to_uppercase().as_str() {
1779            "DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
1780            "HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
1781            "MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
1782            "SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
1783            "WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
1784            _ => duration(DurationArgs::new().with_days(amt)),
1785        };
1786        Self::from_expr(ts + dur, None)
1787    }
1788
1789    /// Difference between timestamps in given unit (PySpark timestampdiff). unit: DAY, HOUR, MINUTE, SECOND.
1790    pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
1791        let start = self.expr().clone();
1792        let end = other.expr().clone();
1793        let diff = end - start;
1794        let expr = match unit.trim().to_uppercase().as_str() {
1795            "HOUR" | "HOURS" => diff.dt().total_hours(),
1796            "MINUTE" | "MINUTES" => diff.dt().total_minutes(),
1797            "SECOND" | "SECONDS" => diff.dt().total_seconds(),
1798            "DAY" | "DAYS" => diff.dt().total_days(),
1799            _ => diff.dt().total_days(),
1800        };
1801        Self::from_expr(expr, None)
1802    }
1803
1804    /// Interpret timestamp as UTC, convert to target timezone (PySpark from_utc_timestamp).
1805    pub fn from_utc_timestamp(&self, tz: &str) -> Column {
1806        let tz = tz.to_string();
1807        let expr = self.expr().clone().map(
1808            move |s| crate::udfs::apply_from_utc_timestamp(s, &tz),
1809            GetOutput::same_type(),
1810        );
1811        Self::from_expr(expr, None)
1812    }
1813
1814    /// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1815    pub fn to_utc_timestamp(&self, tz: &str) -> Column {
1816        let tz = tz.to_string();
1817        let expr = self.expr().clone().map(
1818            move |s| crate::udfs::apply_to_utc_timestamp(s, &tz),
1819            GetOutput::same_type(),
1820        );
1821        Self::from_expr(expr, None)
1822    }
1823
1824    /// Truncate date/datetime to unit (e.g. "mo", "wk", "day"). PySpark trunc.
1825    pub fn trunc(&self, format: &str) -> Column {
1826        use polars::prelude::*;
1827        Self::from_expr(
1828            self.expr().clone().dt().truncate(lit(format.to_string())),
1829            None,
1830        )
1831    }
1832
1833    /// Add n months to date/datetime column (PySpark add_months). Month-aware.
1834    pub fn add_months(&self, n: i32) -> Column {
1835        let expr = self.expr().clone().map(
1836            move |col| crate::udfs::apply_add_months(col, n),
1837            GetOutput::from_type(DataType::Date),
1838        );
1839        Self::from_expr(expr, None)
1840    }
1841
1842    /// Number of months between end and start dates, as fractional (PySpark months_between).
1843    /// When round_off is true, rounds to 8 decimal places (PySpark default).
1844    pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
1845        let args = [start.expr().clone()];
1846        let expr = self.expr().clone().map_many(
1847            move |cols| crate::udfs::apply_months_between(cols, round_off),
1848            &args,
1849            GetOutput::from_type(DataType::Float64),
1850        );
1851        Self::from_expr(expr, None)
1852    }
1853
1854    /// Next date that is the given day of week (e.g. "Mon", "Tue") (PySpark next_day).
1855    pub fn next_day(&self, day_of_week: &str) -> Column {
1856        let day = day_of_week.to_string();
1857        let expr = self.expr().clone().map(
1858            move |col| crate::udfs::apply_next_day(col, &day),
1859            GetOutput::from_type(DataType::Date),
1860        );
1861        Self::from_expr(expr, None)
1862    }
1863
1864    /// Parse string timestamp to seconds since epoch (PySpark unix_timestamp).
1865    pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
1866        let fmt = format.map(String::from);
1867        let expr = self.expr().clone().map(
1868            move |col| crate::udfs::apply_unix_timestamp(col, fmt.as_deref()),
1869            GetOutput::from_type(DataType::Int64),
1870        );
1871        Self::from_expr(expr, None)
1872    }
1873
1874    /// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1875    pub fn from_unixtime(&self, format: Option<&str>) -> Column {
1876        let fmt = format.map(String::from);
1877        let expr = self.expr().clone().map(
1878            move |col| crate::udfs::apply_from_unixtime(col, fmt.as_deref()),
1879            GetOutput::from_type(DataType::String),
1880        );
1881        Self::from_expr(expr, None)
1882    }
1883
1884    /// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
1885    pub fn timestamp_seconds(&self) -> Column {
1886        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
1887            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
1888        Self::from_expr(expr, None)
1889    }
1890
1891    /// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
1892    pub fn timestamp_millis(&self) -> Column {
1893        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
1894            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
1895        Self::from_expr(expr, None)
1896    }
1897
1898    /// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
1899    pub fn timestamp_micros(&self) -> Column {
1900        let expr = self
1901            .expr()
1902            .clone()
1903            .cast(DataType::Int64)
1904            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
1905        Self::from_expr(expr, None)
1906    }
1907
1908    /// Date to days since 1970-01-01 (PySpark unix_date).
1909    pub fn unix_date(&self) -> Column {
1910        let expr = self.expr().clone().map(
1911            crate::udfs::apply_unix_date,
1912            GetOutput::from_type(DataType::Int32),
1913        );
1914        Self::from_expr(expr, None)
1915    }
1916
1917    /// Days since epoch to date (PySpark date_from_unix_date).
1918    pub fn date_from_unix_date(&self) -> Column {
1919        let expr = self.expr().clone().map(
1920            crate::udfs::apply_date_from_unix_date,
1921            GetOutput::from_type(DataType::Date),
1922        );
1923        Self::from_expr(expr, None)
1924    }
1925
1926    /// Positive modulus (PySpark pmod). Column method: pmod(self, other).
1927    pub fn pmod(&self, divisor: &Column) -> Column {
1928        let args = [divisor.expr().clone()];
1929        let expr = self.expr().clone().map_many(
1930            crate::udfs::apply_pmod,
1931            &args,
1932            GetOutput::from_type(DataType::Float64),
1933        );
1934        Self::from_expr(expr, None)
1935    }
1936
1937    /// Factorial n! for n in 0..=20 (PySpark factorial).
1938    pub fn factorial(&self) -> Column {
1939        let expr = self.expr().clone().map(
1940            crate::udfs::apply_factorial,
1941            GetOutput::from_type(DataType::Int64),
1942        );
1943        Self::from_expr(expr, None)
1944    }
1945
1946    // --- Window functions ---
1947
1948    /// Apply window partitioning. Returns a new Column with `.over(partition_by)`.
1949    /// Use after rank(), dense_rank(), row_number(), lag(), lead().
1950    pub fn over(&self, partition_by: &[&str]) -> Column {
1951        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
1952        Self::from_expr(self.expr().clone().over(partition_exprs), None)
1953    }
1954
1955    /// Rank (with ties, gaps). Use with `.over(partition_by)`.
1956    pub fn rank(&self, descending: bool) -> Column {
1957        let opts = RankOptions {
1958            method: RankMethod::Min,
1959            descending,
1960        };
1961        Self::from_expr(self.expr().clone().rank(opts, None), None)
1962    }
1963
1964    /// Dense rank (no gaps). Use with `.over(partition_by)`.
1965    pub fn dense_rank(&self, descending: bool) -> Column {
1966        let opts = RankOptions {
1967            method: RankMethod::Dense,
1968            descending,
1969        };
1970        Self::from_expr(self.expr().clone().rank(opts, None), None)
1971    }
1972
1973    /// Row number (1, 2, 3 by this column's order). Use with `.over(partition_by)`.
1974    pub fn row_number(&self, descending: bool) -> Column {
1975        let opts = RankOptions {
1976            method: RankMethod::Ordinal,
1977            descending,
1978        };
1979        Self::from_expr(self.expr().clone().rank(opts, None), None)
1980    }
1981
1982    /// Lag: value from n rows before. Use with `.over(partition_by)`.
1983    pub fn lag(&self, n: i64) -> Column {
1984        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
1985    }
1986
1987    /// Lead: value from n rows after. Use with `.over(partition_by)`.
1988    pub fn lead(&self, n: i64) -> Column {
1989        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
1990    }
1991
1992    /// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
1993    pub fn first_value(&self) -> Column {
1994        Self::from_expr(self.expr().clone().first(), None)
1995    }
1996
1997    /// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
1998    pub fn last_value(&self) -> Column {
1999        Self::from_expr(self.expr().clone().last(), None)
2000    }
2001
2002    /// Percent rank in partition: (rank - 1) / (count - 1). Window is applied; do not call .over() again.
2003    pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
2004        use polars::prelude::*;
2005        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2006        let opts = RankOptions {
2007            method: RankMethod::Min,
2008            descending,
2009        };
2010        let rank_expr = self
2011            .expr()
2012            .clone()
2013            .rank(opts, None)
2014            .over(partition_exprs.clone());
2015        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2016        let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
2017        let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
2018        let pct = rank_f / count_f;
2019        Self::from_expr(pct, None)
2020    }
2021
2022    /// Cumulative distribution in partition: row_number / count. Window is applied; do not call .over() again.
2023    pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
2024        use polars::prelude::*;
2025        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2026        let opts = RankOptions {
2027            method: RankMethod::Ordinal,
2028            descending,
2029        };
2030        let row_num = self
2031            .expr()
2032            .clone()
2033            .rank(opts, None)
2034            .over(partition_exprs.clone());
2035        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2036        let cume = row_num / count_expr;
2037        Self::from_expr(cume.cast(DataType::Float64), None)
2038    }
2039
2040    /// Ntile: bucket 1..n by rank within partition (ceil(rank * n / count)). Window is applied; do not call .over() again.
2041    pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
2042        use polars::prelude::*;
2043        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2044        let opts = RankOptions {
2045            method: RankMethod::Ordinal,
2046            descending,
2047        };
2048        let rank_expr = self
2049            .expr()
2050            .clone()
2051            .rank(opts, None)
2052            .over(partition_exprs.clone());
2053        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2054        let n_expr = lit(n as f64);
2055        let rank_f = rank_expr.cast(DataType::Float64);
2056        let count_f = count_expr.cast(DataType::Float64);
2057        let bucket = (rank_f * n_expr / count_f).ceil();
2058        let clamped = bucket.clip(lit(1.0), lit(n as f64));
2059        Self::from_expr(clamped.cast(DataType::Int32), None)
2060    }
2061
2062    /// Nth value in partition by order (1-based n). Returns a Column with window already applied; do not call .over() again.
2063    pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
2064        use polars::prelude::*;
2065        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2066        let opts = RankOptions {
2067            method: RankMethod::Ordinal,
2068            descending,
2069        };
2070        let rank_expr = self
2071            .expr()
2072            .clone()
2073            .rank(opts, None)
2074            .over(partition_exprs.clone());
2075        let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
2076        let null_col = Self::from_expr(Expr::Literal(LiteralValue::Null), None);
2077        let value_col = Self::from_expr(self.expr().clone(), None);
2078        let when_expr = crate::functions::when(&cond_col)
2079            .then(&value_col)
2080            .otherwise(&null_col)
2081            .into_expr();
2082        let windowed = when_expr.max().over(partition_exprs);
2083        Self::from_expr(windowed, None)
2084    }
2085
2086    /// Number of elements in list (PySpark size / array_size). Returns Int32.
2087    pub fn array_size(&self) -> Column {
2088        use polars::prelude::*;
2089        Self::from_expr(
2090            self.expr().clone().list().len().cast(DataType::Int32),
2091            Some("size".to_string()),
2092        )
2093    }
2094
2095    /// Cardinality: number of elements in array/list (PySpark cardinality). Alias for array_size.
2096    pub fn cardinality(&self) -> Column {
2097        self.array_size()
2098    }
2099
2100    /// Check if list contains value (PySpark array_contains).
2101    pub fn array_contains(&self, value: Expr) -> Column {
2102        Self::from_expr(self.expr().clone().list().contains(value), None)
2103    }
2104
2105    /// Join list of strings with separator (PySpark array_join).
2106    pub fn array_join(&self, separator: &str) -> Column {
2107        use polars::prelude::*;
2108        Self::from_expr(
2109            self.expr()
2110                .clone()
2111                .list()
2112                .join(lit(separator.to_string()), false),
2113            None,
2114        )
2115    }
2116
2117    /// Maximum element in list (PySpark array_max).
2118    pub fn array_max(&self) -> Column {
2119        Self::from_expr(self.expr().clone().list().max(), None)
2120    }
2121
2122    /// Minimum element in list (PySpark array_min).
2123    pub fn array_min(&self) -> Column {
2124        Self::from_expr(self.expr().clone().list().min(), None)
2125    }
2126
2127    /// Get element at 1-based index (PySpark element_at). Returns null if out of bounds.
2128    pub fn element_at(&self, index: i64) -> Column {
2129        use polars::prelude::*;
2130        // PySpark uses 1-based indexing; Polars uses 0-based. index 1 -> get(0).
2131        let idx = if index >= 1 { index - 1 } else { index };
2132        Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
2133    }
2134
2135    /// Sort list elements (PySpark array_sort). Ascending, nulls last.
2136    pub fn array_sort(&self) -> Column {
2137        use polars::prelude::SortOptions;
2138        let opts = SortOptions {
2139            descending: false,
2140            nulls_last: true,
2141            ..Default::default()
2142        };
2143        Self::from_expr(self.expr().clone().list().sort(opts), None)
2144    }
2145
2146    /// Distinct elements in list (PySpark array_distinct). Preserves first-occurrence order.
2147    pub fn array_distinct(&self) -> Column {
2148        let expr = self.expr().clone().map(
2149            crate::udfs::apply_array_distinct_first_order,
2150            GetOutput::same_type(),
2151        );
2152        Self::from_expr(expr, None)
2153    }
2154
2155    /// Mode aggregation - most frequent value (PySpark mode).
2156    /// Uses value_counts sorted by count descending, then first.
2157    pub fn mode(&self) -> Column {
2158        // value_counts(sort=true, parallel=false, name="count", normalize=false)
2159        // puts highest count first; first() gives the mode
2160        // Struct has "count" and value field; field 0 is typically the value
2161        let vc = self
2162            .expr()
2163            .clone()
2164            .value_counts(true, false, "count", false);
2165        let first_struct = vc.first();
2166        let val_expr = first_struct.struct_().field_by_index(0);
2167        Self::from_expr(val_expr, Some("mode".to_string()))
2168    }
2169
2170    /// Slice list from start with optional length (PySpark slice). 1-based start.
2171    pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
2172        use polars::prelude::*;
2173        let start_expr = lit((start - 1).max(0)); // 1-based to 0-based
2174        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
2175        Self::from_expr(
2176            self.expr().clone().list().slice(start_expr, length_expr),
2177            None,
2178        )
2179    }
2180
2181    /// Explode list into one row per element (PySpark explode).
2182    pub fn explode(&self) -> Column {
2183        Self::from_expr(self.expr().clone().explode(), None)
2184    }
2185
2186    /// Explode list; null/empty produces one row with null (PySpark explode_outer).
2187    pub fn explode_outer(&self) -> Column {
2188        Self::from_expr(self.expr().clone().explode(), None)
2189    }
2190
2191    /// Posexplode with null preservation (PySpark posexplode_outer).
2192    pub fn posexplode_outer(&self) -> (Column, Column) {
2193        self.posexplode()
2194    }
2195
2196    /// Zip two arrays element-wise into array of structs (PySpark arrays_zip).
2197    pub fn arrays_zip(&self, other: &Column) -> Column {
2198        let args = [other.expr().clone()];
2199        let expr = self.expr().clone().map_many(
2200            crate::udfs::apply_arrays_zip,
2201            &args,
2202            GetOutput::same_type(),
2203        );
2204        Self::from_expr(expr, None)
2205    }
2206
2207    /// True if two arrays have any element in common (PySpark arrays_overlap).
2208    pub fn arrays_overlap(&self, other: &Column) -> Column {
2209        let args = [other.expr().clone()];
2210        let expr = self.expr().clone().map_many(
2211            crate::udfs::apply_arrays_overlap,
2212            &args,
2213            GetOutput::from_type(DataType::Boolean),
2214        );
2215        Self::from_expr(expr, None)
2216    }
2217
2218    /// Collect to array (PySpark array_agg). Alias for implode in group context.
2219    pub fn array_agg(&self) -> Column {
2220        Self::from_expr(self.expr().clone().implode(), None)
2221    }
2222
2223    /// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2224    /// Uses Polars list.eval with col("") as element (requires polars list_eval feature).
2225    pub fn array_position(&self, value: Expr) -> Column {
2226        use polars::prelude::{DataType, NULL};
2227        // In list.eval context, col("") refers to the current list element.
2228        let cond = Self::from_expr(col("").eq(value), None);
2229        let then_val = Self::from_expr(col("").cum_count(false), None);
2230        let else_val = Self::from_expr(lit(NULL), None);
2231        let idx_expr = crate::functions::when(&cond)
2232            .then(&then_val)
2233            .otherwise(&else_val)
2234            .into_expr();
2235        let list_expr = self
2236            .expr()
2237            .clone()
2238            .list()
2239            .eval(idx_expr, false)
2240            .list()
2241            .min()
2242            .fill_null(lit(0i64))
2243            .cast(DataType::Int64);
2244        Self::from_expr(list_expr, Some("array_position".to_string()))
2245    }
2246
2247    /// Remove null elements from list (PySpark array_compact). Preserves order.
2248    pub fn array_compact(&self) -> Column {
2249        let list_expr = self.expr().clone().list().drop_nulls();
2250        Self::from_expr(list_expr, None)
2251    }
2252
2253    /// New list with all elements equal to value removed (PySpark array_remove).
2254    /// Uses list.eval + drop_nulls (requires polars list_eval and list_drop_nulls).
2255    pub fn array_remove(&self, value: Expr) -> Column {
2256        use polars::prelude::NULL;
2257        // when(element != value) then element else null; then drop_nulls.
2258        let cond = Self::from_expr(col("").neq(value), None);
2259        let then_val = Self::from_expr(col(""), None);
2260        let else_val = Self::from_expr(lit(NULL), None);
2261        let elem_neq = crate::functions::when(&cond)
2262            .then(&then_val)
2263            .otherwise(&else_val)
2264            .into_expr();
2265        let list_expr = self
2266            .expr()
2267            .clone()
2268            .list()
2269            .eval(elem_neq, false)
2270            .list()
2271            .drop_nulls();
2272        Self::from_expr(list_expr, None)
2273    }
2274
2275    /// Repeat each element n times (PySpark array_repeat). Implemented via map UDF.
2276    pub fn array_repeat(&self, n: i64) -> Column {
2277        let expr = self.expr().clone().map(
2278            move |c| crate::udfs::apply_array_repeat(c, n),
2279            GetOutput::same_type(),
2280        );
2281        Self::from_expr(expr, None)
2282    }
2283
2284    /// Flatten list of lists to one list (PySpark flatten). Implemented via map UDF.
2285    pub fn array_flatten(&self) -> Column {
2286        let expr = self
2287            .expr()
2288            .clone()
2289            .map(crate::udfs::apply_array_flatten, GetOutput::same_type());
2290        Self::from_expr(expr, None)
2291    }
2292
2293    /// Append element to end of list (PySpark array_append).
2294    pub fn array_append(&self, elem: &Column) -> Column {
2295        let args = [elem.expr().clone()];
2296        let expr = self.expr().clone().map_many(
2297            crate::udfs::apply_array_append,
2298            &args,
2299            GetOutput::same_type(),
2300        );
2301        Self::from_expr(expr, None)
2302    }
2303
2304    /// Prepend element to start of list (PySpark array_prepend).
2305    pub fn array_prepend(&self, elem: &Column) -> Column {
2306        let args = [elem.expr().clone()];
2307        let expr = self.expr().clone().map_many(
2308            crate::udfs::apply_array_prepend,
2309            &args,
2310            GetOutput::same_type(),
2311        );
2312        Self::from_expr(expr, None)
2313    }
2314
2315    /// Insert element at 1-based position (PySpark array_insert).
2316    pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
2317        let args = [pos.expr().clone(), elem.expr().clone()];
2318        let expr = self.expr().clone().map_many(
2319            crate::udfs::apply_array_insert,
2320            &args,
2321            GetOutput::same_type(),
2322        );
2323        Self::from_expr(expr, None)
2324    }
2325
2326    /// Elements in first array not in second (PySpark array_except).
2327    pub fn array_except(&self, other: &Column) -> Column {
2328        let args = [other.expr().clone()];
2329        let expr = self.expr().clone().map_many(
2330            crate::udfs::apply_array_except,
2331            &args,
2332            GetOutput::same_type(),
2333        );
2334        Self::from_expr(expr, None)
2335    }
2336
2337    /// Elements in both arrays (PySpark array_intersect).
2338    pub fn array_intersect(&self, other: &Column) -> Column {
2339        let args = [other.expr().clone()];
2340        let expr = self.expr().clone().map_many(
2341            crate::udfs::apply_array_intersect,
2342            &args,
2343            GetOutput::same_type(),
2344        );
2345        Self::from_expr(expr, None)
2346    }
2347
2348    /// Distinct elements from both arrays (PySpark array_union).
2349    pub fn array_union(&self, other: &Column) -> Column {
2350        let args = [other.expr().clone()];
2351        let expr = self.expr().clone().map_many(
2352            crate::udfs::apply_array_union,
2353            &args,
2354            GetOutput::same_type(),
2355        );
2356        Self::from_expr(expr, None)
2357    }
2358
2359    /// Zip two arrays element-wise with merge function (PySpark zip_with). Shorter array padded with null.
2360    /// Merge Expr uses col("").struct_().field_by_name("left") and field_by_name("right").
2361    pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
2362        let args = [other.expr().clone()];
2363        let zip_expr = self.expr().clone().map_many(
2364            crate::udfs::apply_zip_arrays_to_struct,
2365            &args,
2366            GetOutput::same_type(),
2367        );
2368        let list_expr = zip_expr.list().eval(merge, false);
2369        Self::from_expr(list_expr, None)
2370    }
2371
2372    /// True if any list element satisfies the predicate (PySpark exists). Uses list.eval(pred).list().any().
2373    pub fn array_exists(&self, predicate: Expr) -> Column {
2374        let pred_expr = self
2375            .expr()
2376            .clone()
2377            .list()
2378            .eval(predicate, false)
2379            .list()
2380            .any();
2381        Self::from_expr(pred_expr, Some("exists".to_string()))
2382    }
2383
2384    /// True if all list elements satisfy the predicate (PySpark forall). Uses list.eval(pred).list().all().
2385    pub fn array_forall(&self, predicate: Expr) -> Column {
2386        let pred_expr = self
2387            .expr()
2388            .clone()
2389            .list()
2390            .eval(predicate, false)
2391            .list()
2392            .all();
2393        Self::from_expr(pred_expr, Some("forall".to_string()))
2394    }
2395
2396    /// Filter list elements by predicate (PySpark filter). Keeps elements where predicate is true.
2397    pub fn array_filter(&self, predicate: Expr) -> Column {
2398        use polars::prelude::NULL;
2399        let then_val = Self::from_expr(col(""), None);
2400        let else_val = Self::from_expr(lit(NULL), None);
2401        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
2402            .then(&then_val)
2403            .otherwise(&else_val)
2404            .into_expr();
2405        let list_expr = self
2406            .expr()
2407            .clone()
2408            .list()
2409            .eval(elem_expr, false)
2410            .list()
2411            .drop_nulls();
2412        Self::from_expr(list_expr, None)
2413    }
2414
2415    /// Transform list elements by expression (PySpark transform). list.eval(expr).
2416    pub fn array_transform(&self, f: Expr) -> Column {
2417        let list_expr = self.expr().clone().list().eval(f, false);
2418        Self::from_expr(list_expr, None)
2419    }
2420
2421    /// Sum of list elements (PySpark aggregate with sum). Uses list.sum().
2422    pub fn array_sum(&self) -> Column {
2423        Self::from_expr(self.expr().clone().list().sum(), None)
2424    }
2425
2426    /// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list). Full (zero, merge, finish) deferred.
2427    pub fn array_aggregate(&self, zero: &Column) -> Column {
2428        let sum_expr = self.expr().clone().list().sum();
2429        Self::from_expr(sum_expr + zero.expr().clone(), None)
2430    }
2431
2432    /// Mean of list elements (PySpark aggregate with avg). Uses list.mean().
2433    pub fn array_mean(&self) -> Column {
2434        Self::from_expr(self.expr().clone().list().mean(), None)
2435    }
2436
2437    /// Explode list with position (PySpark posexplode). Returns (pos_col, value_col).
2438    /// pos is 1-based; uses list.eval(cum_count()).explode() and explode().
2439    pub fn posexplode(&self) -> (Column, Column) {
2440        let pos_expr = self
2441            .expr()
2442            .clone()
2443            .list()
2444            .eval(col("").cum_count(false), false)
2445            .explode();
2446        let val_expr = self.expr().clone().explode();
2447        (
2448            Self::from_expr(pos_expr, Some("pos".to_string())),
2449            Self::from_expr(val_expr, Some("col".to_string())),
2450        )
2451    }
2452
2453    /// Extract keys from a map column (PySpark map_keys). Map column is List(Struct{key, value}).
2454    pub fn map_keys(&self) -> Column {
2455        let elem_key = col("").struct_().field_by_name("key");
2456        let list_expr = self.expr().clone().list().eval(elem_key, false);
2457        Self::from_expr(list_expr, None)
2458    }
2459
2460    /// Extract values from a map column (PySpark map_values). Map column is List(Struct{key, value}).
2461    pub fn map_values(&self) -> Column {
2462        let elem_val = col("").struct_().field_by_name("value");
2463        let list_expr = self.expr().clone().list().eval(elem_val, false);
2464        Self::from_expr(list_expr, None)
2465    }
2466
2467    /// Return map as list of structs {key, value} (PySpark map_entries). Identity for List(Struct) column.
2468    pub fn map_entries(&self) -> Column {
2469        Self::from_expr(self.expr().clone(), None)
2470    }
2471
2472    /// Build map from two array columns (keys, values) (PySpark map_from_arrays). Implemented via map_many UDF.
2473    pub fn map_from_arrays(&self, values: &Column) -> Column {
2474        let args = [values.expr().clone()];
2475        let expr = self.expr().clone().map_many(
2476            crate::udfs::apply_map_from_arrays,
2477            &args,
2478            GetOutput::same_type(),
2479        );
2480        Self::from_expr(expr, None)
2481    }
2482
2483    /// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2484    pub fn map_concat(&self, other: &Column) -> Column {
2485        let args = [other.expr().clone()];
2486        let expr = self.expr().clone().map_many(
2487            crate::udfs::apply_map_concat,
2488            &args,
2489            GetOutput::same_type(),
2490        );
2491        Self::from_expr(expr, None)
2492    }
2493
2494    /// Transform each map key by expr (PySpark transform_keys). key_expr should use col("").struct_().field_by_name("key").
2495    pub fn transform_keys(&self, key_expr: Expr) -> Column {
2496        use polars::prelude::as_struct;
2497        let value = col("").struct_().field_by_name("value");
2498        let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
2499        let list_expr = self.expr().clone().list().eval(new_struct, false);
2500        Self::from_expr(list_expr, None)
2501    }
2502
2503    /// Transform each map value by expr (PySpark transform_values). value_expr should use col("").struct_().field_by_name("value").
2504    pub fn transform_values(&self, value_expr: Expr) -> Column {
2505        use polars::prelude::as_struct;
2506        let key = col("").struct_().field_by_name("key");
2507        let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
2508        let list_expr = self.expr().clone().list().eval(new_struct, false);
2509        Self::from_expr(list_expr, None)
2510    }
2511
2512    /// Merge two maps by key with merge function (PySpark map_zip_with).
2513    /// Merge Expr uses col("").struct_().field_by_name("value1") and field_by_name("value2").
2514    pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
2515        use polars::prelude::as_struct;
2516        let args = [other.expr().clone()];
2517        let zip_expr = self.expr().clone().map_many(
2518            crate::udfs::apply_map_zip_to_struct,
2519            &args,
2520            GetOutput::same_type(),
2521        );
2522        let key_field = col("").struct_().field_by_name("key").alias("key");
2523        let value_field = merge.alias("value");
2524        let merge_expr = as_struct(vec![key_field, value_field]);
2525        let list_expr = zip_expr.list().eval(merge_expr, false);
2526        Self::from_expr(list_expr, None)
2527    }
2528
2529    /// Filter map entries by predicate (PySpark map_filter). Keeps key-value pairs where predicate is true.
2530    /// Predicate uses col("").struct_().field_by_name("key") and field_by_name("value") to reference key/value.
2531    pub fn map_filter(&self, predicate: Expr) -> Column {
2532        use polars::prelude::NULL;
2533        let then_val = Self::from_expr(col(""), None);
2534        let else_val = Self::from_expr(lit(NULL), None);
2535        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
2536            .then(&then_val)
2537            .otherwise(&else_val)
2538            .into_expr();
2539        let list_expr = self
2540            .expr()
2541            .clone()
2542            .list()
2543            .eval(elem_expr, false)
2544            .list()
2545            .drop_nulls();
2546        Self::from_expr(list_expr, None)
2547    }
2548
2549    /// Array of structs {key, value} to map (PySpark map_from_entries). Identity for List(Struct) format.
2550    pub fn map_from_entries(&self) -> Column {
2551        Self::from_expr(self.expr().clone(), None)
2552    }
2553
2554    /// True if map contains key (PySpark map_contains_key).
2555    pub fn map_contains_key(&self, key: &Column) -> Column {
2556        let args = [key.expr().clone()];
2557        let expr = self.expr().clone().map_many(
2558            crate::udfs::apply_map_contains_key,
2559            &args,
2560            GetOutput::from_type(DataType::Boolean),
2561        );
2562        Self::from_expr(expr, None)
2563    }
2564
2565    /// Get value for key from map, or null (PySpark get).
2566    pub fn get(&self, key: &Column) -> Column {
2567        let args = [key.expr().clone()];
2568        let expr =
2569            self.expr()
2570                .clone()
2571                .map_many(crate::udfs::apply_get, &args, GetOutput::same_type());
2572        Self::from_expr(expr, None)
2573    }
2574
2575    /// Extract JSON path from string column (PySpark get_json_object). Uses Polars str().json_path_match.
2576    pub fn get_json_object(&self, path: &str) -> Column {
2577        let path_expr = polars::prelude::lit(path.to_string());
2578        let out = self.expr().clone().str().json_path_match(path_expr);
2579        Self::from_expr(out, None)
2580    }
2581
2582    /// Parse string column as JSON into struct (PySpark from_json). Uses Polars str().json_decode.
2583    pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
2584        let out = self.expr().clone().str().json_decode(schema, None);
2585        Self::from_expr(out, None)
2586    }
2587
2588    /// Serialize struct column to JSON string (PySpark to_json). Uses Polars struct().json_encode.
2589    pub fn to_json(&self) -> Column {
2590        let out = self.expr().clone().struct_().json_encode();
2591        Self::from_expr(out, None)
2592    }
2593
2594    /// Length of JSON array at path (PySpark json_array_length). UDF.
2595    pub fn json_array_length(&self, path: &str) -> Column {
2596        let path = path.to_string();
2597        let expr = self.expr().clone().map(
2598            move |s| crate::udfs::apply_json_array_length(s, &path),
2599            GetOutput::from_type(DataType::Int64),
2600        );
2601        Self::from_expr(expr, None)
2602    }
2603
2604    /// Keys of JSON object (PySpark json_object_keys). Returns list of strings. UDF.
2605    pub fn json_object_keys(&self) -> Column {
2606        let expr = self.expr().clone().map(
2607            crate::udfs::apply_json_object_keys,
2608            GetOutput::from_type(DataType::List(Box::new(DataType::String))),
2609        );
2610        Self::from_expr(expr, None)
2611    }
2612
2613    /// Extract keys from JSON as struct (PySpark json_tuple). UDF. Returns struct with one string field per key.
2614    pub fn json_tuple(&self, keys: &[&str]) -> Column {
2615        let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
2616        let struct_fields: Vec<polars::datatypes::Field> = keys_vec
2617            .iter()
2618            .map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
2619            .collect();
2620        let expr = self.expr().clone().map(
2621            move |s| crate::udfs::apply_json_tuple(s, &keys_vec),
2622            GetOutput::from_type(DataType::Struct(struct_fields)),
2623        );
2624        Self::from_expr(expr, None)
2625    }
2626
2627    /// Parse CSV string to struct (PySpark from_csv). Minimal: split by comma, up to 32 columns. UDF.
2628    pub fn from_csv(&self) -> Column {
2629        let expr = self.expr().clone().map(
2630            crate::udfs::apply_from_csv,
2631            GetOutput::from_type(DataType::Struct(vec![])),
2632        );
2633        Self::from_expr(expr, None)
2634    }
2635
2636    /// Format struct as CSV string (PySpark to_csv). Minimal. UDF.
2637    pub fn to_csv(&self) -> Column {
2638        let expr = self.expr().clone().map(
2639            crate::udfs::apply_to_csv,
2640            GetOutput::from_type(DataType::String),
2641        );
2642        Self::from_expr(expr, None)
2643    }
2644
2645    /// Parse URL and extract part (PySpark parse_url). UDF.
2646    /// When part is QUERY/QUERYSTRING and key is Some(k), returns the value for that query parameter only.
2647    pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
2648        let part = part.to_string();
2649        let key_owned = key.map(String::from);
2650        let expr = self.expr().clone().map(
2651            move |s| crate::udfs::apply_parse_url(s, &part, key_owned.as_deref()),
2652            GetOutput::from_type(DataType::String),
2653        );
2654        Self::from_expr(expr, None)
2655    }
2656
2657    /// Hash of column value (PySpark hash). Single-column version.
2658    pub fn hash(&self) -> Column {
2659        let expr = self.expr().clone().map(
2660            crate::udfs::apply_hash_one,
2661            GetOutput::from_type(DataType::Int64),
2662        );
2663        Self::from_expr(expr, None)
2664    }
2665
2666    /// Check if column values are in the other column's list/series (PySpark isin).
2667    pub fn isin(&self, other: &Column) -> Column {
2668        let out = self.expr().clone().is_in(other.expr().clone());
2669        Self::from_expr(out, None)
2670    }
2671
2672    /// Percent-decode URL-encoded string (PySpark url_decode). Uses UDF.
2673    pub fn url_decode(&self) -> Column {
2674        let expr = self.expr().clone().map(
2675            crate::udfs::apply_url_decode,
2676            GetOutput::from_type(DataType::String),
2677        );
2678        Self::from_expr(expr, None)
2679    }
2680
2681    /// Percent-encode string for URL (PySpark url_encode). Uses UDF.
2682    pub fn url_encode(&self) -> Column {
2683        let expr = self.expr().clone().map(
2684            crate::udfs::apply_url_encode,
2685            GetOutput::from_type(DataType::String),
2686        );
2687        Self::from_expr(expr, None)
2688    }
2689
2690    /// Bitwise left shift (PySpark shiftLeft). col << n = col * 2^n.
2691    pub fn shift_left(&self, n: i32) -> Column {
2692        use polars::prelude::*;
2693        let pow = lit(2i64).pow(lit(n as i64));
2694        Self::from_expr(
2695            (self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
2696            None,
2697        )
2698    }
2699
2700    /// Bitwise signed right shift (PySpark shiftRight). col >> n = col / 2^n.
2701    pub fn shift_right(&self, n: i32) -> Column {
2702        use polars::prelude::*;
2703        let pow = lit(2i64).pow(lit(n as i64));
2704        Self::from_expr(
2705            (self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
2706            None,
2707        )
2708    }
2709
2710    /// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift.
2711    pub fn shift_right_unsigned(&self, n: i32) -> Column {
2712        let expr = self.expr().clone().map(
2713            move |s| crate::udfs::apply_shift_right_unsigned(s, n),
2714            GetOutput::from_type(DataType::Int64),
2715        );
2716        Self::from_expr(expr, None)
2717    }
2718}
2719
2720#[cfg(test)]
2721mod tests {
2722    use super::Column;
2723    use polars::prelude::{col, df, lit, IntoLazy};
2724
2725    /// Helper to create a simple DataFrame for testing
2726    fn test_df() -> polars::prelude::DataFrame {
2727        df!(
2728            "a" => &[1, 2, 3, 4, 5],
2729            "b" => &[10, 20, 30, 40, 50]
2730        )
2731        .unwrap()
2732    }
2733
2734    /// Helper to create a DataFrame with nulls for testing
2735    fn test_df_with_nulls() -> polars::prelude::DataFrame {
2736        df!(
2737            "a" => &[Some(1), Some(2), None, Some(4), None],
2738            "b" => &[Some(10), None, Some(30), None, None]
2739        )
2740        .unwrap()
2741    }
2742
2743    #[test]
2744    fn test_column_new() {
2745        let column = Column::new("age".to_string());
2746        assert_eq!(column.name(), "age");
2747    }
2748
2749    #[test]
2750    fn test_column_from_expr() {
2751        let expr = col("test");
2752        let column = Column::from_expr(expr, Some("test".to_string()));
2753        assert_eq!(column.name(), "test");
2754    }
2755
2756    #[test]
2757    fn test_column_from_expr_default_name() {
2758        let expr = col("test").gt(lit(5));
2759        let column = Column::from_expr(expr, None);
2760        assert_eq!(column.name(), "<expr>");
2761    }
2762
2763    #[test]
2764    fn test_column_alias() {
2765        let column = Column::new("original".to_string());
2766        let aliased = column.alias("new_name");
2767        assert_eq!(aliased.name(), "new_name");
2768    }
2769
2770    #[test]
2771    fn test_column_gt() {
2772        let df = test_df();
2773        let column = Column::new("a".to_string());
2774        let result = column.gt(lit(3));
2775
2776        // Apply the expression to filter the DataFrame
2777        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2778        assert_eq!(filtered.height(), 2); // rows with a > 3: 4, 5
2779    }
2780
2781    #[test]
2782    fn test_column_lt() {
2783        let df = test_df();
2784        let column = Column::new("a".to_string());
2785        let result = column.lt(lit(3));
2786
2787        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2788        assert_eq!(filtered.height(), 2); // rows with a < 3: 1, 2
2789    }
2790
2791    #[test]
2792    fn test_column_eq() {
2793        let df = test_df();
2794        let column = Column::new("a".to_string());
2795        let result = column.eq(lit(3));
2796
2797        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2798        assert_eq!(filtered.height(), 1); // only row with a == 3
2799    }
2800
2801    #[test]
2802    fn test_column_neq() {
2803        let df = test_df();
2804        let column = Column::new("a".to_string());
2805        let result = column.neq(lit(3));
2806
2807        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2808        assert_eq!(filtered.height(), 4); // rows with a != 3
2809    }
2810
2811    #[test]
2812    fn test_column_gt_eq() {
2813        let df = test_df();
2814        let column = Column::new("a".to_string());
2815        let result = column.gt_eq(lit(3));
2816
2817        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2818        assert_eq!(filtered.height(), 3); // rows with a >= 3: 3, 4, 5
2819    }
2820
2821    #[test]
2822    fn test_column_lt_eq() {
2823        let df = test_df();
2824        let column = Column::new("a".to_string());
2825        let result = column.lt_eq(lit(3));
2826
2827        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2828        assert_eq!(filtered.height(), 3); // rows with a <= 3: 1, 2, 3
2829    }
2830
2831    #[test]
2832    fn test_column_is_null() {
2833        let df = test_df_with_nulls();
2834        let column = Column::new("a".to_string());
2835        let result = column.is_null();
2836
2837        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2838        assert_eq!(filtered.height(), 2); // 2 null values in column 'a'
2839    }
2840
2841    #[test]
2842    fn test_column_is_not_null() {
2843        let df = test_df_with_nulls();
2844        let column = Column::new("a".to_string());
2845        let result = column.is_not_null();
2846
2847        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2848        assert_eq!(filtered.height(), 3); // 3 non-null values in column 'a'
2849    }
2850
2851    #[test]
2852    fn test_eq_null_safe_both_null() {
2853        // Create a DataFrame where both columns have NULL at the same row
2854        let df = df!(
2855            "a" => &[Some(1), None, Some(3)],
2856            "b" => &[Some(1), None, Some(4)]
2857        )
2858        .unwrap();
2859
2860        let col_a = Column::new("a".to_string());
2861        let col_b = Column::new("b".to_string());
2862        let result = col_a.eq_null_safe(&col_b);
2863
2864        // Apply the expression and collect
2865        let result_df = df
2866            .lazy()
2867            .with_column(result.into_expr().alias("eq_null_safe"))
2868            .collect()
2869            .unwrap();
2870
2871        // Get the result column
2872        let eq_col = result_df.column("eq_null_safe").unwrap();
2873        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
2874
2875        // Row 0: 1 == 1 -> true
2876        // Row 1: NULL <=> NULL -> true
2877        // Row 2: 3 == 4 -> false
2878        assert_eq!(values[0], Some(true));
2879        assert_eq!(values[1], Some(true)); // NULL-safe: both NULL = true
2880        assert_eq!(values[2], Some(false));
2881    }
2882
2883    #[test]
2884    fn test_eq_null_safe_one_null() {
2885        // Create a DataFrame where only one column has NULL
2886        let df = df!(
2887            "a" => &[Some(1), None, Some(3)],
2888            "b" => &[Some(1), Some(2), None]
2889        )
2890        .unwrap();
2891
2892        let col_a = Column::new("a".to_string());
2893        let col_b = Column::new("b".to_string());
2894        let result = col_a.eq_null_safe(&col_b);
2895
2896        let result_df = df
2897            .lazy()
2898            .with_column(result.into_expr().alias("eq_null_safe"))
2899            .collect()
2900            .unwrap();
2901
2902        let eq_col = result_df.column("eq_null_safe").unwrap();
2903        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
2904
2905        // Row 0: 1 == 1 -> true
2906        // Row 1: NULL <=> 2 -> false (one is null, not both)
2907        // Row 2: 3 <=> NULL -> false (one is null, not both)
2908        assert_eq!(values[0], Some(true));
2909        assert_eq!(values[1], Some(false));
2910        assert_eq!(values[2], Some(false));
2911    }
2912}