Skip to main content

robin_sparkless/
column.rs

1use polars::prelude::{
2    col, lit, DataType, Expr, GetOutput, ListNameSpaceExtension, RankMethod, RankOptions, TimeUnit,
3};
4
5/// Convert SQL LIKE pattern (% = any sequence, _ = one char) to regex. Escapes regex specials.
6/// When escape_char is Some(esc), esc + any char treats that char as literal (no %/_ expansion).
7fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
8    let mut out = String::with_capacity(pattern.len() * 2);
9    let mut it = pattern.chars();
10    while let Some(c) = it.next() {
11        if escape_char == Some(c) {
12            if let Some(next) = it.next() {
13                // Literal: escape for regex
14                if "\\.*+?[](){}^$|".contains(next) {
15                    out.push('\\');
16                }
17                out.push(next);
18            } else {
19                out.push('\\');
20                out.push(c);
21            }
22        } else {
23            match c {
24                '%' => out.push_str(".*"),
25                '_' => out.push('.'),
26                '\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
27                | '|' => {
28                    out.push('\\');
29                    out.push(c);
30                }
31                _ => out.push(c),
32            }
33        }
34    }
35    format!("^{out}$")
36}
37
38/// Deferred random column: when added via with_column, we generate a full-length series in one go (PySpark-like).
39#[derive(Debug, Clone, Copy)]
40pub enum DeferredRandom {
41    Rand(Option<u64>),
42    Randn(Option<u64>),
43}
44
45/// Column - represents a column in a DataFrame, used for building expressions
46/// Thin wrapper around Polars `Expr`. May carry a DeferredRandom for rand/randn so with_column can produce one value per row.
47#[derive(Debug, Clone)]
48pub struct Column {
49    name: String,
50    expr: Expr, // Polars expression for lazy evaluation
51    /// When Some, with_column generates a full-length random series instead of using expr (PySpark-like per-row rand/randn).
52    pub(crate) deferred: Option<DeferredRandom>,
53}
54
55impl Column {
56    /// Create a new Column from a column name
57    pub fn new(name: String) -> Self {
58        Column {
59            name: name.clone(),
60            expr: col(&name),
61            deferred: None,
62        }
63    }
64
65    /// Create a Column from a Polars Expr
66    pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
67        let display_name = name.unwrap_or_else(|| "<expr>".to_string());
68        Column {
69            name: display_name,
70            expr,
71            deferred: None,
72        }
73    }
74
75    /// Create a Column for rand(seed). When used in with_column, generates one value per row (PySpark-like).
76    pub fn from_rand(seed: Option<u64>) -> Self {
77        let expr = lit(1i64).cum_sum(false).map(
78            move |c| crate::udfs::apply_rand_with_seed(c, seed),
79            GetOutput::from_type(DataType::Float64),
80        );
81        Column {
82            name: "rand".to_string(),
83            expr,
84            deferred: Some(DeferredRandom::Rand(seed)),
85        }
86    }
87
88    /// Create a Column for randn(seed). When used in with_column, generates one value per row (PySpark-like).
89    pub fn from_randn(seed: Option<u64>) -> Self {
90        let expr = lit(1i64).cum_sum(false).map(
91            move |c| crate::udfs::apply_randn_with_seed(c, seed),
92            GetOutput::from_type(DataType::Float64),
93        );
94        Column {
95            name: "randn".to_string(),
96            expr,
97            deferred: Some(DeferredRandom::Randn(seed)),
98        }
99    }
100
101    /// Get the underlying Polars Expr
102    pub fn expr(&self) -> &Expr {
103        &self.expr
104    }
105
106    /// Convert to Polars Expr (consumes self)
107    pub fn into_expr(self) -> Expr {
108        self.expr
109    }
110
111    /// Get the column name
112    pub fn name(&self) -> &str {
113        &self.name
114    }
115
116    /// Alias the column
117    pub fn alias(&self, name: &str) -> Column {
118        Column {
119            name: name.to_string(),
120            expr: self.expr.clone().alias(name),
121            deferred: self.deferred,
122        }
123    }
124
125    /// Ascending sort, nulls first (Spark default for ASC). PySpark asc.
126    pub fn asc(&self) -> crate::functions::SortOrder {
127        crate::functions::asc(self)
128    }
129
130    /// Ascending sort, nulls first. PySpark asc_nulls_first.
131    pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
132        crate::functions::asc_nulls_first(self)
133    }
134
135    /// Ascending sort, nulls last. PySpark asc_nulls_last.
136    pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
137        crate::functions::asc_nulls_last(self)
138    }
139
140    /// Descending sort, nulls last (Spark default for DESC). PySpark desc.
141    pub fn desc(&self) -> crate::functions::SortOrder {
142        crate::functions::desc(self)
143    }
144
145    /// Descending sort, nulls first. PySpark desc_nulls_first.
146    pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
147        crate::functions::desc_nulls_first(self)
148    }
149
150    /// Descending sort, nulls last. PySpark desc_nulls_last.
151    pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
152        crate::functions::desc_nulls_last(self)
153    }
154
155    /// Check if column is null
156    pub fn is_null(&self) -> Column {
157        Column {
158            name: format!("({} IS NULL)", self.name),
159            expr: self.expr.clone().is_null(),
160            deferred: None,
161        }
162    }
163
164    /// Check if column is not null
165    pub fn is_not_null(&self) -> Column {
166        Column {
167            name: format!("({} IS NOT NULL)", self.name),
168            expr: self.expr.clone().is_not_null(),
169            deferred: None,
170        }
171    }
172
173    /// Alias for is_null. PySpark isnull.
174    pub fn isnull(&self) -> Column {
175        self.is_null()
176    }
177
178    /// Alias for is_not_null. PySpark isnotnull.
179    pub fn isnotnull(&self) -> Column {
180        self.is_not_null()
181    }
182
183    /// Create a null boolean expression
184    fn null_boolean_expr() -> Expr {
185        use polars::prelude::*;
186        // Create an expression that is always a null boolean
187        lit(NULL).cast(DataType::Boolean)
188    }
189
190    /// SQL LIKE pattern matching (% = any chars, _ = one char). PySpark like.
191    /// When escape_char is Some(esc), esc + char treats that char as literal (e.g. \\% = literal %).
192    pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
193        let regex = like_pattern_to_regex(pattern, escape_char);
194        self.regexp_like(&regex)
195    }
196
197    /// Case-insensitive LIKE. PySpark ilike.
198    /// When escape_char is Some(esc), esc + char treats that char as literal.
199    pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
200        use polars::prelude::*;
201        let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
202        Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
203    }
204
205    /// PySpark-style equality comparison (NULL == NULL returns NULL, not True)
206    /// Any comparison involving NULL returns NULL
207    ///
208    /// Explicitly wraps comparisons with null checks to ensure PySpark semantics.
209    /// If either side is NULL, the result is NULL.
210    pub fn eq_pyspark(&self, other: &Column) -> Column {
211        // Check if either side is NULL
212        let left_null = self.expr().clone().is_null();
213        let right_null = other.expr().clone().is_null();
214        let either_null = left_null.clone().or(right_null.clone());
215
216        // Standard equality comparison
217        let eq_result = self.expr().clone().eq(other.expr().clone());
218
219        // Wrap: if either is null, return null boolean, else return comparison result
220        let null_boolean = Self::null_boolean_expr();
221        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
222            .then(&Self::from_expr(null_boolean, None))
223            .otherwise(&Self::from_expr(eq_result, None));
224
225        Self::from_expr(null_aware_expr.into_expr(), None)
226    }
227
228    /// PySpark-style inequality comparison (NULL != NULL returns NULL, not False)
229    /// Any comparison involving NULL returns NULL
230    pub fn ne_pyspark(&self, other: &Column) -> Column {
231        // Check if either side is NULL
232        let left_null = self.expr().clone().is_null();
233        let right_null = other.expr().clone().is_null();
234        let either_null = left_null.clone().or(right_null.clone());
235
236        // Standard inequality comparison
237        let ne_result = self.expr().clone().neq(other.expr().clone());
238
239        // Wrap: if either is null, return null boolean, else return comparison result
240        let null_boolean = Self::null_boolean_expr();
241        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
242            .then(&Self::from_expr(null_boolean, None))
243            .otherwise(&Self::from_expr(ne_result, None));
244
245        Self::from_expr(null_aware_expr.into_expr(), None)
246    }
247
248    /// Null-safe equality (NULL <=> NULL returns True)
249    /// PySpark's eqNullSafe() method
250    pub fn eq_null_safe(&self, other: &Column) -> Column {
251        use crate::functions::{lit_bool, when};
252
253        let left_null = self.expr().clone().is_null();
254        let right_null = other.expr().clone().is_null();
255        let both_null = left_null.clone().and(right_null.clone());
256        let either_null = left_null.clone().or(right_null.clone());
257
258        // Standard equality
259        let eq_result = self.expr().clone().eq(other.expr().clone());
260
261        // If both are null, return True
262        // If either is null (but not both), return False
263        // Otherwise, return standard equality result
264        when(&Self::from_expr(both_null, None))
265            .then(&lit_bool(true))
266            .otherwise(
267                &when(&Self::from_expr(either_null, None))
268                    .then(&lit_bool(false))
269                    .otherwise(&Self::from_expr(eq_result, None)),
270            )
271    }
272
273    /// PySpark-style greater-than comparison (NULL > value returns NULL)
274    /// Any comparison involving NULL returns NULL
275    pub fn gt_pyspark(&self, other: &Column) -> Column {
276        // Check if either side is NULL
277        let left_null = self.expr().clone().is_null();
278        let right_null = other.expr().clone().is_null();
279        let either_null = left_null.clone().or(right_null.clone());
280
281        // Standard greater-than comparison
282        let gt_result = self.expr().clone().gt(other.expr().clone());
283
284        // Wrap: if either is null, return null boolean, else return comparison result
285        let null_boolean = Self::null_boolean_expr();
286        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
287            .then(&Self::from_expr(null_boolean, None))
288            .otherwise(&Self::from_expr(gt_result, None));
289
290        Self::from_expr(null_aware_expr.into_expr(), None)
291    }
292
293    /// PySpark-style greater-than-or-equal comparison
294    /// Any comparison involving NULL returns NULL
295    pub fn ge_pyspark(&self, other: &Column) -> Column {
296        // Check if either side is NULL
297        let left_null = self.expr().clone().is_null();
298        let right_null = other.expr().clone().is_null();
299        let either_null = left_null.clone().or(right_null.clone());
300
301        // Standard greater-than-or-equal comparison
302        let ge_result = self.expr().clone().gt_eq(other.expr().clone());
303
304        // Wrap: if either is null, return null boolean, else return comparison result
305        let null_boolean = Self::null_boolean_expr();
306        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
307            .then(&Self::from_expr(null_boolean, None))
308            .otherwise(&Self::from_expr(ge_result, None));
309
310        Self::from_expr(null_aware_expr.into_expr(), None)
311    }
312
313    /// PySpark-style less-than comparison
314    /// Any comparison involving NULL returns NULL
315    pub fn lt_pyspark(&self, other: &Column) -> Column {
316        // Check if either side is NULL
317        let left_null = self.expr().clone().is_null();
318        let right_null = other.expr().clone().is_null();
319        let either_null = left_null.clone().or(right_null.clone());
320
321        // Standard less-than comparison
322        let lt_result = self.expr().clone().lt(other.expr().clone());
323
324        // Wrap: if either is null, return null boolean, else return comparison result
325        let null_boolean = Self::null_boolean_expr();
326        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
327            .then(&Self::from_expr(null_boolean, None))
328            .otherwise(&Self::from_expr(lt_result, None));
329
330        Self::from_expr(null_aware_expr.into_expr(), None)
331    }
332
333    /// PySpark-style less-than-or-equal comparison
334    /// Any comparison involving NULL returns NULL
335    pub fn le_pyspark(&self, other: &Column) -> Column {
336        // Check if either side is NULL
337        let left_null = self.expr().clone().is_null();
338        let right_null = other.expr().clone().is_null();
339        let either_null = left_null.clone().or(right_null.clone());
340
341        // Standard less-than-or-equal comparison
342        let le_result = self.expr().clone().lt_eq(other.expr().clone());
343
344        // Wrap: if either is null, return null boolean, else return comparison result
345        let null_boolean = Self::null_boolean_expr();
346        let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
347            .then(&Self::from_expr(null_boolean, None))
348            .otherwise(&Self::from_expr(le_result, None));
349
350        Self::from_expr(null_aware_expr.into_expr(), None)
351    }
352
353    // Standard comparison methods that work with Expr (for literals and columns)
354    // These delegate to Polars and may not match PySpark null semantics exactly
355    // Use _pyspark variants for explicit PySpark semantics
356
357    /// Greater than comparison
358    pub fn gt(&self, other: Expr) -> Column {
359        Self::from_expr(self.expr().clone().gt(other), None)
360    }
361
362    /// Greater than or equal comparison
363    pub fn gt_eq(&self, other: Expr) -> Column {
364        Self::from_expr(self.expr().clone().gt_eq(other), None)
365    }
366
367    /// Less than comparison
368    pub fn lt(&self, other: Expr) -> Column {
369        Self::from_expr(self.expr().clone().lt(other), None)
370    }
371
372    /// Less than or equal comparison
373    pub fn lt_eq(&self, other: Expr) -> Column {
374        Self::from_expr(self.expr().clone().lt_eq(other), None)
375    }
376
377    /// Equality comparison
378    pub fn eq(&self, other: Expr) -> Column {
379        Self::from_expr(self.expr().clone().eq(other), None)
380    }
381
382    /// Inequality comparison
383    pub fn neq(&self, other: Expr) -> Column {
384        Self::from_expr(self.expr().clone().neq(other), None)
385    }
386
387    // --- String functions ---
388
389    /// Convert string column to uppercase (PySpark upper)
390    pub fn upper(&self) -> Column {
391        Self::from_expr(self.expr().clone().str().to_uppercase(), None)
392    }
393
394    /// Convert string column to lowercase (PySpark lower)
395    pub fn lower(&self) -> Column {
396        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
397    }
398
399    /// Alias for lower. PySpark lcase.
400    pub fn lcase(&self) -> Column {
401        self.lower()
402    }
403
404    /// Alias for upper. PySpark ucase.
405    pub fn ucase(&self) -> Column {
406        self.upper()
407    }
408
409    /// Substring with 1-based start (PySpark substring semantics)
410    pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
411        use polars::prelude::*;
412        let offset = (start - 1).max(0);
413        let offset_expr = lit(offset);
414        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX)); // No length = rest of string
415        Self::from_expr(
416            self.expr().clone().str().slice(offset_expr, length_expr),
417            None,
418        )
419    }
420
421    /// String length in characters (PySpark length)
422    pub fn length(&self) -> Column {
423        Self::from_expr(self.expr().clone().str().len_chars(), None)
424    }
425
426    /// Bit length of string in bytes * 8 (PySpark bit_length).
427    pub fn bit_length(&self) -> Column {
428        use polars::prelude::*;
429        let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
430        Self::from_expr(len_bytes * lit(8i32), None)
431    }
432
433    /// Length of string in bytes (PySpark octet_length).
434    pub fn octet_length(&self) -> Column {
435        use polars::prelude::*;
436        Self::from_expr(
437            self.expr().clone().str().len_bytes().cast(DataType::Int32),
438            None,
439        )
440    }
441
442    /// Length of string in characters (PySpark char_length). Alias of length().
443    pub fn char_length(&self) -> Column {
444        self.length()
445    }
446
447    /// Length of string in characters (PySpark character_length). Alias of length().
448    pub fn character_length(&self) -> Column {
449        self.length()
450    }
451
452    /// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
453    pub fn encode(&self, charset: &str) -> Column {
454        let charset = charset.to_string();
455        let expr = self.expr().clone().map(
456            move |s| crate::udfs::apply_encode(s, &charset),
457            GetOutput::from_type(DataType::String),
458        );
459        Self::from_expr(expr, None)
460    }
461
462    /// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
463    pub fn decode(&self, charset: &str) -> Column {
464        let charset = charset.to_string();
465        let expr = self.expr().clone().map(
466            move |s| crate::udfs::apply_decode(s, &charset),
467            GetOutput::from_type(DataType::String),
468        );
469        Self::from_expr(expr, None)
470    }
471
472    /// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'. Returns hex string.
473    pub fn to_binary(&self, fmt: &str) -> Column {
474        let fmt = fmt.to_string();
475        let expr = self.expr().clone().map(
476            move |s| crate::udfs::apply_to_binary(s, &fmt),
477            GetOutput::from_type(DataType::String),
478        );
479        Self::from_expr(expr, None)
480    }
481
482    /// Try convert to binary; null on failure (PySpark try_to_binary).
483    pub fn try_to_binary(&self, fmt: &str) -> Column {
484        let fmt = fmt.to_string();
485        let expr = self.expr().clone().map(
486            move |s| crate::udfs::apply_try_to_binary(s, &fmt),
487            GetOutput::from_type(DataType::String),
488        );
489        Self::from_expr(expr, None)
490    }
491
492    /// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM. Output hex(nonce||ciphertext).
493    pub fn aes_encrypt(&self, key: &str) -> Column {
494        let key = key.to_string();
495        let expr = self.expr().clone().map(
496            move |s| crate::udfs::apply_aes_encrypt(s, &key),
497            GetOutput::from_type(DataType::String),
498        );
499        Self::from_expr(expr, None)
500    }
501
502    /// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext). Null on failure.
503    pub fn aes_decrypt(&self, key: &str) -> Column {
504        let key = key.to_string();
505        let expr = self.expr().clone().map(
506            move |s| crate::udfs::apply_aes_decrypt(s, &key),
507            GetOutput::from_type(DataType::String),
508        );
509        Self::from_expr(expr, None)
510    }
511
512    /// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
513    pub fn try_aes_decrypt(&self, key: &str) -> Column {
514        let key = key.to_string();
515        let expr = self.expr().clone().map(
516            move |s| crate::udfs::apply_try_aes_decrypt(s, &key),
517            GetOutput::from_type(DataType::String),
518        );
519        Self::from_expr(expr, None)
520    }
521
522    /// Data type as string (PySpark typeof). Uses dtype from schema.
523    pub fn typeof_(&self) -> Column {
524        Self::from_expr(
525            self.expr().clone().map(
526                crate::udfs::apply_typeof,
527                GetOutput::from_type(DataType::String),
528            ),
529            None,
530        )
531    }
532
533    /// Trim leading and trailing whitespace (PySpark trim)
534    pub fn trim(&self) -> Column {
535        use polars::prelude::*;
536        Self::from_expr(self.expr().clone().str().strip_chars(lit(" \t\n\r")), None)
537    }
538
539    /// Trim leading whitespace (PySpark ltrim)
540    pub fn ltrim(&self) -> Column {
541        use polars::prelude::*;
542        Self::from_expr(
543            self.expr().clone().str().strip_chars_start(lit(" \t\n\r")),
544            None,
545        )
546    }
547
548    /// Trim trailing whitespace (PySpark rtrim)
549    pub fn rtrim(&self) -> Column {
550        use polars::prelude::*;
551        Self::from_expr(
552            self.expr().clone().str().strip_chars_end(lit(" \t\n\r")),
553            None,
554        )
555    }
556
557    /// Trim leading and trailing characters (PySpark btrim). trim_str defaults to whitespace.
558    pub fn btrim(&self, trim_str: Option<&str>) -> Column {
559        use polars::prelude::*;
560        let chars = trim_str.unwrap_or(" \t\n\r");
561        Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
562    }
563
564    /// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
565    pub fn locate(&self, substr: &str, pos: i64) -> Column {
566        use polars::prelude::*;
567        if substr.is_empty() {
568            return Self::from_expr(lit(1i64), None);
569        }
570        let start = (pos - 1).max(0);
571        let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
572        let found = slice_expr.str().find_literal(lit(substr.to_string()));
573        Self::from_expr(
574            (found.cast(DataType::Int64) + lit(start + 1)).fill_null(lit(0i64)),
575            None,
576        )
577    }
578
579    /// Base conversion (PySpark conv). num_str from from_base to to_base.
580    pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
581        let expr = self.expr().clone().map(
582            move |s| crate::udfs::apply_conv(s, from_base, to_base),
583            GetOutput::from_type(DataType::String),
584        );
585        Self::from_expr(expr, None)
586    }
587
588    /// Convert to hex string (PySpark hex). Int or string input.
589    pub fn hex(&self) -> Column {
590        let expr = self.expr().clone().map(
591            crate::udfs::apply_hex,
592            GetOutput::from_type(DataType::String),
593        );
594        Self::from_expr(expr, None)
595    }
596
597    /// Convert hex string to binary/string (PySpark unhex).
598    pub fn unhex(&self) -> Column {
599        let expr = self.expr().clone().map(
600            crate::udfs::apply_unhex,
601            GetOutput::from_type(DataType::String),
602        );
603        Self::from_expr(expr, None)
604    }
605
606    /// Convert integer to binary string (PySpark bin).
607    pub fn bin(&self) -> Column {
608        let expr = self.expr().clone().map(
609            crate::udfs::apply_bin,
610            GetOutput::from_type(DataType::String),
611        );
612        Self::from_expr(expr, None)
613    }
614
615    /// Get bit at 0-based position (PySpark getbit).
616    pub fn getbit(&self, pos: i64) -> Column {
617        let expr = self.expr().clone().map(
618            move |s| crate::udfs::apply_getbit(s, pos),
619            GetOutput::from_type(DataType::Int64),
620        );
621        Self::from_expr(expr, None)
622    }
623
624    /// Bitwise AND of two integer/boolean columns (PySpark bit_and).
625    pub fn bit_and(&self, other: &Column) -> Column {
626        let args = [other.expr().clone()];
627        let expr = self.expr().clone().cast(DataType::Int64).map_many(
628            crate::udfs::apply_bit_and,
629            &args,
630            GetOutput::from_type(DataType::Int64),
631        );
632        Self::from_expr(expr, None)
633    }
634
635    /// Bitwise OR of two integer/boolean columns (PySpark bit_or).
636    pub fn bit_or(&self, other: &Column) -> Column {
637        let args = [other.expr().clone()];
638        let expr = self.expr().clone().cast(DataType::Int64).map_many(
639            crate::udfs::apply_bit_or,
640            &args,
641            GetOutput::from_type(DataType::Int64),
642        );
643        Self::from_expr(expr, None)
644    }
645
646    /// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
647    pub fn bit_xor(&self, other: &Column) -> Column {
648        let args = [other.expr().clone()];
649        let expr = self.expr().clone().cast(DataType::Int64).map_many(
650            crate::udfs::apply_bit_xor,
651            &args,
652            GetOutput::from_type(DataType::Int64),
653        );
654        Self::from_expr(expr, None)
655    }
656
657    /// Count of set bits in the integer representation (PySpark bit_count).
658    pub fn bit_count(&self) -> Column {
659        let expr = self.expr().clone().map(
660            crate::udfs::apply_bit_count,
661            GetOutput::from_type(DataType::Int64),
662        );
663        Self::from_expr(expr, None)
664    }
665
666    /// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
667    /// When err_msg is Some, it is used in the error message when assertion fails.
668    pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
669        let msg = err_msg.map(String::from);
670        let expr = self.expr().clone().map(
671            move |c| crate::udfs::apply_assert_true(c, msg.as_deref()),
672            GetOutput::same_type(),
673        );
674        Self::from_expr(expr, None)
675    }
676
677    /// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
678    pub fn bitwise_not(&self) -> Column {
679        // Use arithmetic identity: !n == -1 - n for two's-complement integers.
680        let expr = (lit(-1i64) - self.expr().clone().cast(DataType::Int64)).cast(DataType::Int64);
681        Self::from_expr(expr, None)
682    }
683
684    /// Parse string to map (PySpark str_to_map). "k1:v1,k2:v2" -> map.
685    pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
686        let pair_delim = pair_delim.to_string();
687        let key_value_delim = key_value_delim.to_string();
688        let expr = self.expr().clone().map(
689            move |s| crate::udfs::apply_str_to_map(s, &pair_delim, &key_value_delim),
690            GetOutput::same_type(),
691        );
692        Self::from_expr(expr, None)
693    }
694
695    /// Extract first match of regex pattern (PySpark regexp_extract). Group 0 = full match.
696    pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
697        use polars::prelude::*;
698        let pat = pattern.to_string();
699        Self::from_expr(
700            self.expr().clone().str().extract(lit(pat), group_index),
701            None,
702        )
703    }
704
705    /// Replace first match of regex pattern (PySpark regexp_replace). literal=false for regex.
706    pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
707        use polars::prelude::*;
708        let pat = pattern.to_string();
709        let rep = replacement.to_string();
710        Self::from_expr(
711            self.expr().clone().str().replace(lit(pat), lit(rep), false),
712            None,
713        )
714    }
715
716    /// Leftmost n characters (PySpark left).
717    pub fn left(&self, n: i64) -> Column {
718        use polars::prelude::*;
719        let len = n.max(0) as u32;
720        Self::from_expr(
721            self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
722            None,
723        )
724    }
725
726    /// Rightmost n characters (PySpark right).
727    pub fn right(&self, n: i64) -> Column {
728        use polars::prelude::*;
729        let n_val = n.max(0);
730        let n_expr = lit(n_val);
731        let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
732        let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
733            .then(lit(0i64))
734            .otherwise(len_chars - n_expr.clone());
735        Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
736    }
737
738    /// Replace all occurrences of literal search string with replacement (PySpark replace for literal).
739    pub fn replace(&self, search: &str, replacement: &str) -> Column {
740        use polars::prelude::*;
741        Self::from_expr(
742            self.expr().clone().str().replace_all(
743                lit(search.to_string()),
744                lit(replacement.to_string()),
745                true,
746            ),
747            None,
748        )
749    }
750
751    /// True if string starts with prefix (PySpark startswith).
752    pub fn startswith(&self, prefix: &str) -> Column {
753        use polars::prelude::*;
754        Self::from_expr(
755            self.expr()
756                .clone()
757                .str()
758                .starts_with(lit(prefix.to_string())),
759            None,
760        )
761    }
762
763    /// True if string ends with suffix (PySpark endswith).
764    pub fn endswith(&self, suffix: &str) -> Column {
765        use polars::prelude::*;
766        Self::from_expr(
767            self.expr().clone().str().ends_with(lit(suffix.to_string())),
768            None,
769        )
770    }
771
772    /// True if string contains substring (literal, not regex). PySpark contains.
773    pub fn contains(&self, substring: &str) -> Column {
774        use polars::prelude::*;
775        Self::from_expr(
776            self.expr()
777                .clone()
778                .str()
779                .contains(lit(substring.to_string()), true),
780            None,
781        )
782    }
783
784    /// Split string by delimiter (PySpark split). Returns list of strings.
785    /// Uses literal split so "|" is not interpreted as regex alternation.
786    pub fn split(&self, delimiter: &str) -> Column {
787        use polars::prelude::*;
788        Self::from_expr(
789            self.expr().clone().str().split(lit(delimiter.to_string())),
790            None,
791        )
792    }
793
794    /// Title case: first letter of each word uppercase (PySpark initcap).
795    /// Approximates with lowercase when Polars to_titlecase is not enabled.
796    pub fn initcap(&self) -> Column {
797        Self::from_expr(self.expr().clone().str().to_lowercase(), None)
798    }
799
800    /// Extract all matches of regex (PySpark regexp_extract_all). Returns list of strings.
801    pub fn regexp_extract_all(&self, pattern: &str) -> Column {
802        use polars::prelude::*;
803        Self::from_expr(
804            self.expr()
805                .clone()
806                .str()
807                .extract_all(lit(pattern.to_string())),
808            None,
809        )
810    }
811
812    /// Check if string matches regex (PySpark regexp_like / rlike).
813    pub fn regexp_like(&self, pattern: &str) -> Column {
814        use polars::prelude::*;
815        Self::from_expr(
816            self.expr()
817                .clone()
818                .str()
819                .contains(lit(pattern.to_string()), false),
820            None,
821        )
822    }
823
824    /// Count of non-overlapping regex matches (PySpark regexp_count).
825    pub fn regexp_count(&self, pattern: &str) -> Column {
826        use polars::prelude::*;
827        Self::from_expr(
828            self.expr()
829                .clone()
830                .str()
831                .count_matches(lit(pattern.to_string()), false)
832                .cast(DataType::Int64),
833            None,
834        )
835    }
836
837    /// First substring matching regex (PySpark regexp_substr). Null if no match.
838    pub fn regexp_substr(&self, pattern: &str) -> Column {
839        self.regexp_extract(pattern, 0)
840    }
841
842    /// 1-based position of first regex match (PySpark regexp_instr). group_idx 0 = full match; null if no match.
843    pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
844        let idx = group_idx.unwrap_or(0);
845        let pattern = pattern.to_string();
846        let expr = self.expr().clone().map(
847            move |s| crate::udfs::apply_regexp_instr(s, pattern.clone(), idx),
848            GetOutput::from_type(DataType::Int64),
849        );
850        Self::from_expr(expr, None)
851    }
852
853    /// 1-based index of self in comma-delimited set column (PySpark find_in_set). 0 if not found or self contains comma.
854    pub fn find_in_set(&self, set_column: &Column) -> Column {
855        let args = [set_column.expr().clone()];
856        let expr = self.expr().clone().map_many(
857            crate::udfs::apply_find_in_set,
858            &args,
859            GetOutput::from_type(DataType::Int64),
860        );
861        Self::from_expr(expr, None)
862    }
863
864    /// Repeat string column n times (PySpark repeat). Each element repeated n times.
865    pub fn repeat(&self, n: i32) -> Column {
866        use polars::prelude::*;
867        // repeat_by yields List[str]; join to get a single string per row.
868        Self::from_expr(
869            self.expr()
870                .clone()
871                .repeat_by(lit(n as u32))
872                .list()
873                .join(lit(""), false),
874            None,
875        )
876    }
877
878    /// Reverse string (PySpark reverse).
879    pub fn reverse(&self) -> Column {
880        Self::from_expr(self.expr().clone().str().reverse(), None)
881    }
882
883    /// Find substring position (1-based; 0 if not found). PySpark instr(col, substr).
884    pub fn instr(&self, substr: &str) -> Column {
885        use polars::prelude::*;
886        let found = self
887            .expr()
888            .clone()
889            .str()
890            .find_literal(lit(substr.to_string()));
891        // Polars find_literal returns 0-based index (null if not found); PySpark is 1-based, 0 when not found.
892        Self::from_expr(
893            (found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
894            None,
895        )
896    }
897
898    /// Left-pad string to length with pad character (PySpark lpad).
899    pub fn lpad(&self, length: i32, pad: &str) -> Column {
900        let pad_str = if pad.is_empty() { " " } else { pad };
901        let fill = pad_str.chars().next().unwrap_or(' ');
902        Self::from_expr(
903            self.expr().clone().str().pad_start(length as usize, fill),
904            None,
905        )
906    }
907
908    /// Right-pad string to length with pad character (PySpark rpad).
909    pub fn rpad(&self, length: i32, pad: &str) -> Column {
910        let pad_str = if pad.is_empty() { " " } else { pad };
911        let fill = pad_str.chars().next().unwrap_or(' ');
912        Self::from_expr(
913            self.expr().clone().str().pad_end(length as usize, fill),
914            None,
915        )
916    }
917
918    /// Character-by-character translation (PySpark translate). Replaces each char in from_str with corresponding in to_str; if to_str is shorter, extra from chars are removed.
919    pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
920        use polars::prelude::*;
921        let mut e = self.expr().clone();
922        let from_chars: Vec<char> = from_str.chars().collect();
923        let to_chars: Vec<char> = to_str.chars().collect();
924        for (i, fc) in from_chars.iter().enumerate() {
925            let f = fc.to_string();
926            let t = to_chars
927                .get(i)
928                .map(|c| c.to_string())
929                .unwrap_or_else(String::new); // PySpark: no replacement = drop char
930            e = e.str().replace_all(lit(f), lit(t), true);
931        }
932        Self::from_expr(e, None)
933    }
934
935    /// Mask string: replace uppercase with upper_char, lowercase with lower_char, digits with digit_char (PySpark mask).
936    /// Defaults: upper 'X', lower 'x', digit 'n'; other chars unchanged.
937    pub fn mask(
938        &self,
939        upper_char: Option<char>,
940        lower_char: Option<char>,
941        digit_char: Option<char>,
942        other_char: Option<char>,
943    ) -> Column {
944        use polars::prelude::*;
945        let upper = upper_char.unwrap_or('X').to_string();
946        let lower = lower_char.unwrap_or('x').to_string();
947        let digit = digit_char.unwrap_or('n').to_string();
948        let other = other_char.map(|c| c.to_string());
949        let mut e = self
950            .expr()
951            .clone()
952            .str()
953            .replace_all(lit("[A-Z]".to_string()), lit(upper), false)
954            .str()
955            .replace_all(lit("[a-z]".to_string()), lit(lower), false)
956            .str()
957            .replace_all(lit(r"\d".to_string()), lit(digit), false);
958        if let Some(o) = other {
959            e = e
960                .str()
961                .replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
962        }
963        Self::from_expr(e, None)
964    }
965
966    /// Split by delimiter and return 1-based part (PySpark split_part).
967    /// part_num > 0: from left; part_num < 0: from right; part_num = 0: null; out-of-range: empty string.
968    pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
969        use polars::prelude::*;
970        if part_num == 0 {
971            return Self::from_expr(Expr::Literal(LiteralValue::Null), None);
972        }
973        let use_regex = delimiter == "|";
974        if use_regex {
975            let pattern = delimiter.to_string();
976            let part = part_num;
977            let get_expr = self.expr().clone().map(
978                move |col| crate::udfs::apply_split_part_regex(col, &pattern, part),
979                GetOutput::from_type(DataType::String),
980            );
981            let expr = when(self.expr().clone().is_null())
982                .then(Expr::Literal(LiteralValue::Null))
983                .otherwise(get_expr.fill_null(lit("")));
984            return Self::from_expr(expr, None);
985        }
986        let delim = delimiter.to_string();
987        let split_expr = self.expr().clone().str().split(lit(delim));
988        let index = if part_num > 0 {
989            lit(part_num - 1)
990        } else {
991            lit(part_num)
992        };
993        let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
994        let expr = when(self.expr().clone().is_null())
995            .then(Expr::Literal(LiteralValue::Null))
996            .otherwise(get_expr);
997        Self::from_expr(expr, None)
998    }
999
1000    /// Substring before/after nth delimiter (PySpark substring_index). count > 0: before nth from left; count < 0: after nth from right.
1001    pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
1002        use polars::prelude::*;
1003        let delim = delimiter.to_string();
1004        let split_expr = self.expr().clone().str().split(lit(delim.clone()));
1005        let n = count.unsigned_abs() as i64;
1006        let expr = if count > 0 {
1007            split_expr
1008                .clone()
1009                .list()
1010                .slice(lit(0i64), lit(n))
1011                .list()
1012                .join(lit(delim), false)
1013        } else {
1014            let len = split_expr.clone().list().len();
1015            let start = when(len.clone().gt(lit(n)))
1016                .then(len.clone() - lit(n))
1017                .otherwise(lit(0i64));
1018            let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
1019            split_expr
1020                .list()
1021                .slice(start, slice_len)
1022                .list()
1023                .join(lit(delim), false)
1024        };
1025        Self::from_expr(expr, None)
1026    }
1027
1028    /// Soundex code (PySpark soundex). Implemented via map UDF (strsim/soundex crates).
1029    pub fn soundex(&self) -> Column {
1030        let expr = self
1031            .expr()
1032            .clone()
1033            .map(crate::udfs::apply_soundex, GetOutput::same_type());
1034        Self::from_expr(expr, None)
1035    }
1036
1037    /// Levenshtein distance to another string (PySpark levenshtein). Implemented via map_many UDF (strsim).
1038    pub fn levenshtein(&self, other: &Column) -> Column {
1039        let args = [other.expr().clone()];
1040        let expr = self.expr().clone().map_many(
1041            crate::udfs::apply_levenshtein,
1042            &args,
1043            GetOutput::from_type(DataType::Int64),
1044        );
1045        Self::from_expr(expr, None)
1046    }
1047
1048    /// CRC32 checksum of string bytes (PySpark crc32). Implemented via map UDF (crc32fast).
1049    pub fn crc32(&self) -> Column {
1050        let expr = self.expr().clone().map(
1051            crate::udfs::apply_crc32,
1052            GetOutput::from_type(DataType::Int64),
1053        );
1054        Self::from_expr(expr, None)
1055    }
1056
1057    /// XXH64 hash of string (PySpark xxhash64). Implemented via map UDF (twox-hash).
1058    pub fn xxhash64(&self) -> Column {
1059        let expr = self.expr().clone().map(
1060            crate::udfs::apply_xxhash64,
1061            GetOutput::from_type(DataType::Int64),
1062        );
1063        Self::from_expr(expr, None)
1064    }
1065
1066    /// ASCII value of first character (PySpark ascii). Returns Int32.
1067    pub fn ascii(&self) -> Column {
1068        let expr = self.expr().clone().map(
1069            crate::udfs::apply_ascii,
1070            GetOutput::from_type(DataType::Int32),
1071        );
1072        Self::from_expr(expr, None)
1073    }
1074
1075    /// Format numeric as string with fixed decimal places (PySpark format_number).
1076    pub fn format_number(&self, decimals: u32) -> Column {
1077        let expr = self.expr().clone().map(
1078            move |s| crate::udfs::apply_format_number(s, decimals),
1079            GetOutput::from_type(DataType::String),
1080        );
1081        Self::from_expr(expr, None)
1082    }
1083
1084    /// Int to single-character string (PySpark char / chr). Valid codepoint only.
1085    pub fn char(&self) -> Column {
1086        let expr = self.expr().clone().map(
1087            crate::udfs::apply_char,
1088            GetOutput::from_type(DataType::String),
1089        );
1090        Self::from_expr(expr, None)
1091    }
1092
1093    /// Alias for char (PySpark chr).
1094    pub fn chr(&self) -> Column {
1095        self.char()
1096    }
1097
1098    /// Base64 encode string bytes (PySpark base64).
1099    pub fn base64(&self) -> Column {
1100        let expr = self
1101            .expr()
1102            .clone()
1103            .map(crate::udfs::apply_base64, GetOutput::same_type());
1104        Self::from_expr(expr, None)
1105    }
1106
1107    /// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1108    pub fn unbase64(&self) -> Column {
1109        let expr = self
1110            .expr()
1111            .clone()
1112            .map(crate::udfs::apply_unbase64, GetOutput::same_type());
1113        Self::from_expr(expr, None)
1114    }
1115
1116    /// SHA1 hash of string bytes, return hex string (PySpark sha1).
1117    pub fn sha1(&self) -> Column {
1118        let expr = self
1119            .expr()
1120            .clone()
1121            .map(crate::udfs::apply_sha1, GetOutput::same_type());
1122        Self::from_expr(expr, None)
1123    }
1124
1125    /// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2). Default 256.
1126    pub fn sha2(&self, bit_length: i32) -> Column {
1127        let expr = self.expr().clone().map(
1128            move |s| crate::udfs::apply_sha2(s, bit_length),
1129            GetOutput::same_type(),
1130        );
1131        Self::from_expr(expr, None)
1132    }
1133
1134    /// MD5 hash of string bytes, return hex string (PySpark md5).
1135    pub fn md5(&self) -> Column {
1136        let expr = self
1137            .expr()
1138            .clone()
1139            .map(crate::udfs::apply_md5, GetOutput::same_type());
1140        Self::from_expr(expr, None)
1141    }
1142
1143    /// Replace substring at 1-based position (PySpark overlay). replace is literal string.
1144    pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
1145        use polars::prelude::*;
1146        let pos = pos.max(1);
1147        let replace_len = length.max(0);
1148        let start_left = 0i64;
1149        let len_left = (pos - 1).max(0);
1150        let start_right = (pos - 1 + replace_len).max(0);
1151        let len_right = 1_000_000i64; // "rest of string"
1152        let left = self
1153            .expr()
1154            .clone()
1155            .str()
1156            .slice(lit(start_left), lit(len_left));
1157        let mid = lit(replace.to_string());
1158        let right = self
1159            .expr()
1160            .clone()
1161            .str()
1162            .slice(lit(start_right), lit(len_right));
1163        let exprs = [left, mid, right];
1164        let concat_expr = polars::prelude::concat_str(&exprs, "", false);
1165        Self::from_expr(concat_expr, None)
1166    }
1167
1168    // --- Math functions ---
1169
1170    /// Absolute value (PySpark abs)
1171    pub fn abs(&self) -> Column {
1172        Self::from_expr(self.expr().clone().abs(), None)
1173    }
1174
1175    /// Ceiling (PySpark ceil)
1176    pub fn ceil(&self) -> Column {
1177        Self::from_expr(self.expr().clone().ceil(), None)
1178    }
1179
1180    /// Alias for ceil. PySpark ceiling.
1181    pub fn ceiling(&self) -> Column {
1182        self.ceil()
1183    }
1184
1185    /// Floor (PySpark floor)
1186    pub fn floor(&self) -> Column {
1187        Self::from_expr(self.expr().clone().floor(), None)
1188    }
1189
1190    /// Round to given decimal places (PySpark round)
1191    pub fn round(&self, decimals: u32) -> Column {
1192        Self::from_expr(self.expr().clone().round(decimals), None)
1193    }
1194
1195    /// Banker's rounding - round half to even (PySpark bround).
1196    pub fn bround(&self, scale: i32) -> Column {
1197        let expr = self.expr().clone().map(
1198            move |s| crate::udfs::apply_bround(s, scale),
1199            GetOutput::from_type(DataType::Float64),
1200        );
1201        Self::from_expr(expr, None)
1202    }
1203
1204    /// Unary minus (PySpark negate, negative).
1205    pub fn negate(&self) -> Column {
1206        use polars::prelude::*;
1207        Self::from_expr(self.expr().clone() * lit(-1), None)
1208    }
1209
1210    /// Multiply by another column or literal (PySpark multiply). Broadcasts scalars.
1211    pub fn multiply(&self, other: &Column) -> Column {
1212        Self::from_expr(self.expr().clone() * other.expr().clone(), None)
1213    }
1214
1215    /// Add another column or literal (PySpark +). Broadcasts scalars.
1216    pub fn add(&self, other: &Column) -> Column {
1217        Self::from_expr(self.expr().clone() + other.expr().clone(), None)
1218    }
1219
1220    /// Subtract another column or literal (PySpark -). Broadcasts scalars.
1221    pub fn subtract(&self, other: &Column) -> Column {
1222        Self::from_expr(self.expr().clone() - other.expr().clone(), None)
1223    }
1224
1225    /// Divide by another column or literal (PySpark /). Broadcasts scalars.
1226    pub fn divide(&self, other: &Column) -> Column {
1227        Self::from_expr(self.expr().clone() / other.expr().clone(), None)
1228    }
1229
1230    /// Modulo (PySpark %). Broadcasts scalars.
1231    pub fn mod_(&self, other: &Column) -> Column {
1232        Self::from_expr(self.expr().clone() % other.expr().clone(), None)
1233    }
1234
1235    /// Square root (PySpark sqrt)
1236    pub fn sqrt(&self) -> Column {
1237        Self::from_expr(self.expr().clone().sqrt(), None)
1238    }
1239
1240    /// Power (PySpark pow). Exponent can be literal or expression.
1241    pub fn pow(&self, exp: i64) -> Column {
1242        use polars::prelude::*;
1243        Self::from_expr(self.expr().clone().pow(lit(exp)), None)
1244    }
1245
1246    /// Alias for pow. PySpark power.
1247    pub fn power(&self, exp: i64) -> Column {
1248        self.pow(exp)
1249    }
1250
1251    /// Exponential (PySpark exp)
1252    pub fn exp(&self) -> Column {
1253        Self::from_expr(self.expr().clone().exp(), None)
1254    }
1255
1256    /// Natural logarithm (PySpark log)
1257    pub fn log(&self) -> Column {
1258        Self::from_expr(self.expr().clone().log(std::f64::consts::E), None)
1259    }
1260
1261    /// Alias for log. PySpark ln.
1262    pub fn ln(&self) -> Column {
1263        self.log()
1264    }
1265
1266    /// Sine (radians). PySpark sin.
1267    pub fn sin(&self) -> Column {
1268        let expr = self.expr().clone().map(
1269            crate::udfs::apply_sin,
1270            GetOutput::from_type(DataType::Float64),
1271        );
1272        Self::from_expr(expr, None)
1273    }
1274
1275    /// Cosine (radians). PySpark cos.
1276    pub fn cos(&self) -> Column {
1277        let expr = self.expr().clone().map(
1278            crate::udfs::apply_cos,
1279            GetOutput::from_type(DataType::Float64),
1280        );
1281        Self::from_expr(expr, None)
1282    }
1283
1284    /// Tangent (radians). PySpark tan.
1285    pub fn tan(&self) -> Column {
1286        let expr = self.expr().clone().map(
1287            crate::udfs::apply_tan,
1288            GetOutput::from_type(DataType::Float64),
1289        );
1290        Self::from_expr(expr, None)
1291    }
1292
1293    /// Cotangent: 1/tan (PySpark cot).
1294    pub fn cot(&self) -> Column {
1295        let expr = self.expr().clone().map(
1296            crate::udfs::apply_cot,
1297            GetOutput::from_type(DataType::Float64),
1298        );
1299        Self::from_expr(expr, None)
1300    }
1301
1302    /// Cosecant: 1/sin (PySpark csc).
1303    pub fn csc(&self) -> Column {
1304        let expr = self.expr().clone().map(
1305            crate::udfs::apply_csc,
1306            GetOutput::from_type(DataType::Float64),
1307        );
1308        Self::from_expr(expr, None)
1309    }
1310
1311    /// Secant: 1/cos (PySpark sec).
1312    pub fn sec(&self) -> Column {
1313        let expr = self.expr().clone().map(
1314            crate::udfs::apply_sec,
1315            GetOutput::from_type(DataType::Float64),
1316        );
1317        Self::from_expr(expr, None)
1318    }
1319
1320    /// Arc sine. PySpark asin.
1321    pub fn asin(&self) -> Column {
1322        let expr = self.expr().clone().map(
1323            crate::udfs::apply_asin,
1324            GetOutput::from_type(DataType::Float64),
1325        );
1326        Self::from_expr(expr, None)
1327    }
1328
1329    /// Arc cosine. PySpark acos.
1330    pub fn acos(&self) -> Column {
1331        let expr = self.expr().clone().map(
1332            crate::udfs::apply_acos,
1333            GetOutput::from_type(DataType::Float64),
1334        );
1335        Self::from_expr(expr, None)
1336    }
1337
1338    /// Arc tangent. PySpark atan.
1339    pub fn atan(&self) -> Column {
1340        let expr = self.expr().clone().map(
1341            crate::udfs::apply_atan,
1342            GetOutput::from_type(DataType::Float64),
1343        );
1344        Self::from_expr(expr, None)
1345    }
1346
1347    /// Two-argument arc tangent (y, x) -> angle in radians. PySpark atan2.
1348    pub fn atan2(&self, x: &Column) -> Column {
1349        let args = [x.expr().clone()];
1350        let expr = self.expr().clone().map_many(
1351            crate::udfs::apply_atan2,
1352            &args,
1353            GetOutput::from_type(DataType::Float64),
1354        );
1355        Self::from_expr(expr, None)
1356    }
1357
1358    /// Convert radians to degrees. PySpark degrees.
1359    pub fn degrees(&self) -> Column {
1360        let expr = self.expr().clone().map(
1361            crate::udfs::apply_degrees,
1362            GetOutput::from_type(DataType::Float64),
1363        );
1364        Self::from_expr(expr, None)
1365    }
1366
1367    /// Alias for degrees. PySpark toDegrees.
1368    pub fn to_degrees(&self) -> Column {
1369        self.degrees()
1370    }
1371
1372    /// Convert degrees to radians. PySpark radians.
1373    pub fn radians(&self) -> Column {
1374        let expr = self.expr().clone().map(
1375            crate::udfs::apply_radians,
1376            GetOutput::from_type(DataType::Float64),
1377        );
1378        Self::from_expr(expr, None)
1379    }
1380
1381    /// Alias for radians. PySpark toRadians.
1382    pub fn to_radians(&self) -> Column {
1383        self.radians()
1384    }
1385
1386    /// Sign of the number (-1, 0, or 1). PySpark signum.
1387    pub fn signum(&self) -> Column {
1388        let expr = self.expr().clone().map(
1389            crate::udfs::apply_signum,
1390            GetOutput::from_type(DataType::Float64),
1391        );
1392        Self::from_expr(expr, None)
1393    }
1394
1395    /// Hyperbolic cosine. PySpark cosh.
1396    pub fn cosh(&self) -> Column {
1397        let expr = self.expr().clone().map(
1398            crate::udfs::apply_cosh,
1399            GetOutput::from_type(DataType::Float64),
1400        );
1401        Self::from_expr(expr, None)
1402    }
1403    /// Hyperbolic sine. PySpark sinh.
1404    pub fn sinh(&self) -> Column {
1405        let expr = self.expr().clone().map(
1406            crate::udfs::apply_sinh,
1407            GetOutput::from_type(DataType::Float64),
1408        );
1409        Self::from_expr(expr, None)
1410    }
1411    /// Hyperbolic tangent. PySpark tanh.
1412    pub fn tanh(&self) -> Column {
1413        let expr = self.expr().clone().map(
1414            crate::udfs::apply_tanh,
1415            GetOutput::from_type(DataType::Float64),
1416        );
1417        Self::from_expr(expr, None)
1418    }
1419    /// Inverse hyperbolic cosine. PySpark acosh.
1420    pub fn acosh(&self) -> Column {
1421        let expr = self.expr().clone().map(
1422            crate::udfs::apply_acosh,
1423            GetOutput::from_type(DataType::Float64),
1424        );
1425        Self::from_expr(expr, None)
1426    }
1427    /// Inverse hyperbolic sine. PySpark asinh.
1428    pub fn asinh(&self) -> Column {
1429        let expr = self.expr().clone().map(
1430            crate::udfs::apply_asinh,
1431            GetOutput::from_type(DataType::Float64),
1432        );
1433        Self::from_expr(expr, None)
1434    }
1435    /// Inverse hyperbolic tangent. PySpark atanh.
1436    pub fn atanh(&self) -> Column {
1437        let expr = self.expr().clone().map(
1438            crate::udfs::apply_atanh,
1439            GetOutput::from_type(DataType::Float64),
1440        );
1441        Self::from_expr(expr, None)
1442    }
1443    /// Cube root. PySpark cbrt.
1444    pub fn cbrt(&self) -> Column {
1445        let expr = self.expr().clone().map(
1446            crate::udfs::apply_cbrt,
1447            GetOutput::from_type(DataType::Float64),
1448        );
1449        Self::from_expr(expr, None)
1450    }
1451    /// exp(x) - 1. PySpark expm1.
1452    pub fn expm1(&self) -> Column {
1453        let expr = self.expr().clone().map(
1454            crate::udfs::apply_expm1,
1455            GetOutput::from_type(DataType::Float64),
1456        );
1457        Self::from_expr(expr, None)
1458    }
1459    /// log(1 + x). PySpark log1p.
1460    pub fn log1p(&self) -> Column {
1461        let expr = self.expr().clone().map(
1462            crate::udfs::apply_log1p,
1463            GetOutput::from_type(DataType::Float64),
1464        );
1465        Self::from_expr(expr, None)
1466    }
1467    /// Base-10 logarithm. PySpark log10.
1468    pub fn log10(&self) -> Column {
1469        let expr = self.expr().clone().map(
1470            crate::udfs::apply_log10,
1471            GetOutput::from_type(DataType::Float64),
1472        );
1473        Self::from_expr(expr, None)
1474    }
1475    /// Base-2 logarithm. PySpark log2.
1476    pub fn log2(&self) -> Column {
1477        let expr = self.expr().clone().map(
1478            crate::udfs::apply_log2,
1479            GetOutput::from_type(DataType::Float64),
1480        );
1481        Self::from_expr(expr, None)
1482    }
1483    /// Round to nearest integer. PySpark rint.
1484    pub fn rint(&self) -> Column {
1485        let expr = self.expr().clone().map(
1486            crate::udfs::apply_rint,
1487            GetOutput::from_type(DataType::Float64),
1488        );
1489        Self::from_expr(expr, None)
1490    }
1491
1492    /// sqrt(x^2 + y^2). PySpark hypot.
1493    pub fn hypot(&self, other: &Column) -> Column {
1494        let xx = self.expr().clone() * self.expr().clone();
1495        let yy = other.expr().clone() * other.expr().clone();
1496        Self::from_expr((xx + yy).sqrt(), None)
1497    }
1498
1499    /// Cast to the given type (PySpark cast). Fails on invalid conversion.
1500    pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
1501        crate::functions::cast(self, type_name)
1502    }
1503
1504    /// Cast to the given type, null on invalid conversion (PySpark try_cast).
1505    pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
1506        crate::functions::try_cast(self, type_name)
1507    }
1508
1509    /// True where the float value is NaN (PySpark isnan).
1510    pub fn is_nan(&self) -> Column {
1511        Self::from_expr(self.expr().clone().is_nan(), None)
1512    }
1513
1514    // --- Datetime functions ---
1515
1516    /// Extract year from datetime column (PySpark year)
1517    pub fn year(&self) -> Column {
1518        Self::from_expr(self.expr().clone().dt().year(), None)
1519    }
1520
1521    /// Extract month from datetime column (PySpark month)
1522    pub fn month(&self) -> Column {
1523        Self::from_expr(self.expr().clone().dt().month(), None)
1524    }
1525
1526    /// Extract day of month from datetime column (PySpark day)
1527    pub fn day(&self) -> Column {
1528        Self::from_expr(self.expr().clone().dt().day(), None)
1529    }
1530
1531    /// Alias for day. PySpark dayofmonth.
1532    pub fn dayofmonth(&self) -> Column {
1533        self.day()
1534    }
1535
1536    /// Extract quarter (1-4) from date/datetime column (PySpark quarter).
1537    pub fn quarter(&self) -> Column {
1538        Self::from_expr(self.expr().clone().dt().quarter(), None)
1539    }
1540
1541    /// Extract ISO week of year (1-53) (PySpark weekofyear / week).
1542    pub fn weekofyear(&self) -> Column {
1543        Self::from_expr(self.expr().clone().dt().week(), None)
1544    }
1545
1546    /// Alias for weekofyear (PySpark week).
1547    pub fn week(&self) -> Column {
1548        self.weekofyear()
1549    }
1550
1551    /// Day of week: 1 = Sunday, 2 = Monday, ..., 7 = Saturday (PySpark dayofweek).
1552    /// Polars weekday is Mon=1..Sun=7; we convert to Sun=1..Sat=7.
1553    pub fn dayofweek(&self) -> Column {
1554        let w = self.expr().clone().dt().weekday();
1555        let dayofweek = (w % lit(7i32)) + lit(1i32); // 7->1 (Sun), 1->2 (Mon), ..., 6->7 (Sat)
1556        Self::from_expr(dayofweek, None)
1557    }
1558
1559    /// Day of year (1-366) (PySpark dayofyear).
1560    pub fn dayofyear(&self) -> Column {
1561        Self::from_expr(
1562            self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
1563            None,
1564        )
1565    }
1566
1567    /// Cast to date (PySpark to_date). Drops time component from datetime/timestamp.
1568    pub fn to_date(&self) -> Column {
1569        use polars::prelude::DataType;
1570        Self::from_expr(self.expr().clone().cast(DataType::Date), None)
1571    }
1572
1573    /// Format date/datetime as string (PySpark date_format). Uses chrono strftime format.
1574    pub fn date_format(&self, format: &str) -> Column {
1575        Self::from_expr(self.expr().clone().dt().strftime(format), None)
1576    }
1577
1578    /// Extract hour from datetime column (PySpark hour).
1579    pub fn hour(&self) -> Column {
1580        Self::from_expr(self.expr().clone().dt().hour(), None)
1581    }
1582
1583    /// Extract minute from datetime column (PySpark minute).
1584    pub fn minute(&self) -> Column {
1585        Self::from_expr(self.expr().clone().dt().minute(), None)
1586    }
1587
1588    /// Extract second from datetime column (PySpark second).
1589    pub fn second(&self) -> Column {
1590        Self::from_expr(self.expr().clone().dt().second(), None)
1591    }
1592
1593    /// Extract field from date/datetime (PySpark extract). field: "year","month","day","hour","minute","second","quarter","week","dayofweek","dayofyear".
1594    pub fn extract(&self, field: &str) -> Column {
1595        use polars::prelude::*;
1596        let e = self.expr().clone();
1597        let expr = match field.trim().to_lowercase().as_str() {
1598            "year" => e.dt().year(),
1599            "month" => e.dt().month(),
1600            "day" => e.dt().day(),
1601            "hour" => e.dt().hour(),
1602            "minute" => e.dt().minute(),
1603            "second" => e.dt().second(),
1604            "quarter" => e.dt().quarter(),
1605            "week" | "weekofyear" => e.dt().week(),
1606            "dayofweek" | "dow" => {
1607                let w = e.dt().weekday();
1608                (w % lit(7i32)) + lit(1i32)
1609            }
1610            "dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
1611            _ => e.dt().year(), // fallback
1612        };
1613        Self::from_expr(expr, None)
1614    }
1615
1616    /// Timestamp to microseconds since epoch (PySpark unix_micros).
1617    pub fn unix_micros(&self) -> Column {
1618        use polars::prelude::*;
1619        Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
1620    }
1621
1622    /// Timestamp to milliseconds since epoch (PySpark unix_millis).
1623    pub fn unix_millis(&self) -> Column {
1624        use polars::prelude::*;
1625        let micros = self.expr().clone().cast(DataType::Int64);
1626        Self::from_expr(micros / lit(1000i64), None)
1627    }
1628
1629    /// Timestamp to seconds since epoch (PySpark unix_seconds).
1630    pub fn unix_seconds(&self) -> Column {
1631        use polars::prelude::*;
1632        let micros = self.expr().clone().cast(DataType::Int64);
1633        Self::from_expr(micros / lit(1_000_000i64), None)
1634    }
1635
1636    /// Weekday name "Mon","Tue",... (PySpark dayname).
1637    pub fn dayname(&self) -> Column {
1638        let expr = self.expr().clone().map(
1639            crate::udfs::apply_dayname,
1640            GetOutput::from_type(DataType::String),
1641        );
1642        Self::from_expr(expr, None)
1643    }
1644
1645    /// Weekday 0=Mon, 6=Sun (PySpark weekday).
1646    pub fn weekday(&self) -> Column {
1647        let expr = self.expr().clone().map(
1648            crate::udfs::apply_weekday,
1649            GetOutput::from_type(DataType::Int32),
1650        );
1651        Self::from_expr(expr, None)
1652    }
1653
1654    /// Add n days to date/datetime column (PySpark date_add).
1655    pub fn date_add(&self, n: i32) -> Column {
1656        use polars::prelude::*;
1657        let date_expr = self.expr().clone().cast(DataType::Date);
1658        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
1659        Self::from_expr(date_expr + dur, None)
1660    }
1661
1662    /// Subtract n days from date/datetime column (PySpark date_sub).
1663    pub fn date_sub(&self, n: i32) -> Column {
1664        use polars::prelude::*;
1665        let date_expr = self.expr().clone().cast(DataType::Date);
1666        let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
1667        Self::from_expr(date_expr - dur, None)
1668    }
1669
1670    /// Number of days between two date/datetime columns (PySpark datediff). (end - start).
1671    pub fn datediff(&self, other: &Column) -> Column {
1672        use polars::prelude::*;
1673        let start = self.expr().clone().cast(DataType::Date);
1674        let end = other.expr().clone().cast(DataType::Date);
1675        Self::from_expr((end - start).dt().total_days(), None)
1676    }
1677
1678    /// Last day of the month for date/datetime column (PySpark last_day).
1679    pub fn last_day(&self) -> Column {
1680        Self::from_expr(self.expr().clone().dt().month_end(), None)
1681    }
1682
1683    /// Add amount of unit to timestamp (PySpark timestampadd). unit: DAY, HOUR, MINUTE, SECOND, etc.
1684    pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
1685        use polars::prelude::*;
1686        let ts = self.expr().clone();
1687        let amt = amount.expr().clone().cast(DataType::Int64);
1688        let dur = match unit.trim().to_uppercase().as_str() {
1689            "DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
1690            "HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
1691            "MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
1692            "SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
1693            "WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
1694            _ => duration(DurationArgs::new().with_days(amt)),
1695        };
1696        Self::from_expr(ts + dur, None)
1697    }
1698
1699    /// Difference between timestamps in given unit (PySpark timestampdiff). unit: DAY, HOUR, MINUTE, SECOND.
1700    pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
1701        let start = self.expr().clone();
1702        let end = other.expr().clone();
1703        let diff = end - start;
1704        let expr = match unit.trim().to_uppercase().as_str() {
1705            "HOUR" | "HOURS" => diff.dt().total_hours(),
1706            "MINUTE" | "MINUTES" => diff.dt().total_minutes(),
1707            "SECOND" | "SECONDS" => diff.dt().total_seconds(),
1708            "DAY" | "DAYS" => diff.dt().total_days(),
1709            _ => diff.dt().total_days(),
1710        };
1711        Self::from_expr(expr, None)
1712    }
1713
1714    /// Interpret timestamp as UTC, convert to target timezone (PySpark from_utc_timestamp).
1715    pub fn from_utc_timestamp(&self, tz: &str) -> Column {
1716        let tz = tz.to_string();
1717        let expr = self.expr().clone().map(
1718            move |s| crate::udfs::apply_from_utc_timestamp(s, &tz),
1719            GetOutput::same_type(),
1720        );
1721        Self::from_expr(expr, None)
1722    }
1723
1724    /// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
1725    pub fn to_utc_timestamp(&self, tz: &str) -> Column {
1726        let tz = tz.to_string();
1727        let expr = self.expr().clone().map(
1728            move |s| crate::udfs::apply_to_utc_timestamp(s, &tz),
1729            GetOutput::same_type(),
1730        );
1731        Self::from_expr(expr, None)
1732    }
1733
1734    /// Truncate date/datetime to unit (e.g. "mo", "wk", "day"). PySpark trunc.
1735    pub fn trunc(&self, format: &str) -> Column {
1736        use polars::prelude::*;
1737        Self::from_expr(
1738            self.expr().clone().dt().truncate(lit(format.to_string())),
1739            None,
1740        )
1741    }
1742
1743    /// Add n months to date/datetime column (PySpark add_months). Month-aware.
1744    pub fn add_months(&self, n: i32) -> Column {
1745        let expr = self.expr().clone().map(
1746            move |col| crate::udfs::apply_add_months(col, n),
1747            GetOutput::from_type(DataType::Date),
1748        );
1749        Self::from_expr(expr, None)
1750    }
1751
1752    /// Number of months between end and start dates, as fractional (PySpark months_between).
1753    /// When round_off is true, rounds to 8 decimal places (PySpark default).
1754    pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
1755        let args = [start.expr().clone()];
1756        let expr = self.expr().clone().map_many(
1757            move |cols| crate::udfs::apply_months_between(cols, round_off),
1758            &args,
1759            GetOutput::from_type(DataType::Float64),
1760        );
1761        Self::from_expr(expr, None)
1762    }
1763
1764    /// Next date that is the given day of week (e.g. "Mon", "Tue") (PySpark next_day).
1765    pub fn next_day(&self, day_of_week: &str) -> Column {
1766        let day = day_of_week.to_string();
1767        let expr = self.expr().clone().map(
1768            move |col| crate::udfs::apply_next_day(col, &day),
1769            GetOutput::from_type(DataType::Date),
1770        );
1771        Self::from_expr(expr, None)
1772    }
1773
1774    /// Parse string timestamp to seconds since epoch (PySpark unix_timestamp).
1775    pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
1776        let fmt = format.map(String::from);
1777        let expr = self.expr().clone().map(
1778            move |col| crate::udfs::apply_unix_timestamp(col, fmt.as_deref()),
1779            GetOutput::from_type(DataType::Int64),
1780        );
1781        Self::from_expr(expr, None)
1782    }
1783
1784    /// Convert seconds since epoch to formatted string (PySpark from_unixtime).
1785    pub fn from_unixtime(&self, format: Option<&str>) -> Column {
1786        let fmt = format.map(String::from);
1787        let expr = self.expr().clone().map(
1788            move |col| crate::udfs::apply_from_unixtime(col, fmt.as_deref()),
1789            GetOutput::from_type(DataType::String),
1790        );
1791        Self::from_expr(expr, None)
1792    }
1793
1794    /// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
1795    pub fn timestamp_seconds(&self) -> Column {
1796        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
1797            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
1798        Self::from_expr(expr, None)
1799    }
1800
1801    /// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
1802    pub fn timestamp_millis(&self) -> Column {
1803        let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
1804            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
1805        Self::from_expr(expr, None)
1806    }
1807
1808    /// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
1809    pub fn timestamp_micros(&self) -> Column {
1810        let expr = self
1811            .expr()
1812            .clone()
1813            .cast(DataType::Int64)
1814            .cast(DataType::Datetime(TimeUnit::Microseconds, None));
1815        Self::from_expr(expr, None)
1816    }
1817
1818    /// Date to days since 1970-01-01 (PySpark unix_date).
1819    pub fn unix_date(&self) -> Column {
1820        let expr = self.expr().clone().map(
1821            crate::udfs::apply_unix_date,
1822            GetOutput::from_type(DataType::Int32),
1823        );
1824        Self::from_expr(expr, None)
1825    }
1826
1827    /// Days since epoch to date (PySpark date_from_unix_date).
1828    pub fn date_from_unix_date(&self) -> Column {
1829        let expr = self.expr().clone().map(
1830            crate::udfs::apply_date_from_unix_date,
1831            GetOutput::from_type(DataType::Date),
1832        );
1833        Self::from_expr(expr, None)
1834    }
1835
1836    /// Positive modulus (PySpark pmod). Column method: pmod(self, other).
1837    pub fn pmod(&self, divisor: &Column) -> Column {
1838        let args = [divisor.expr().clone()];
1839        let expr = self.expr().clone().map_many(
1840            crate::udfs::apply_pmod,
1841            &args,
1842            GetOutput::from_type(DataType::Float64),
1843        );
1844        Self::from_expr(expr, None)
1845    }
1846
1847    /// Factorial n! for n in 0..=20 (PySpark factorial).
1848    pub fn factorial(&self) -> Column {
1849        let expr = self.expr().clone().map(
1850            crate::udfs::apply_factorial,
1851            GetOutput::from_type(DataType::Int64),
1852        );
1853        Self::from_expr(expr, None)
1854    }
1855
1856    // --- Window functions ---
1857
1858    /// Apply window partitioning. Returns a new Column with `.over(partition_by)`.
1859    /// Use after rank(), dense_rank(), row_number(), lag(), lead().
1860    pub fn over(&self, partition_by: &[&str]) -> Column {
1861        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
1862        Self::from_expr(self.expr().clone().over(partition_exprs), None)
1863    }
1864
1865    /// Rank (with ties, gaps). Use with `.over(partition_by)`.
1866    pub fn rank(&self, descending: bool) -> Column {
1867        let opts = RankOptions {
1868            method: RankMethod::Min,
1869            descending,
1870        };
1871        Self::from_expr(self.expr().clone().rank(opts, None), None)
1872    }
1873
1874    /// Dense rank (no gaps). Use with `.over(partition_by)`.
1875    pub fn dense_rank(&self, descending: bool) -> Column {
1876        let opts = RankOptions {
1877            method: RankMethod::Dense,
1878            descending,
1879        };
1880        Self::from_expr(self.expr().clone().rank(opts, None), None)
1881    }
1882
1883    /// Row number (1, 2, 3 by this column's order). Use with `.over(partition_by)`.
1884    pub fn row_number(&self, descending: bool) -> Column {
1885        let opts = RankOptions {
1886            method: RankMethod::Ordinal,
1887            descending,
1888        };
1889        Self::from_expr(self.expr().clone().rank(opts, None), None)
1890    }
1891
1892    /// Lag: value from n rows before. Use with `.over(partition_by)`.
1893    pub fn lag(&self, n: i64) -> Column {
1894        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
1895    }
1896
1897    /// Lead: value from n rows after. Use with `.over(partition_by)`.
1898    pub fn lead(&self, n: i64) -> Column {
1899        Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
1900    }
1901
1902    /// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
1903    pub fn first_value(&self) -> Column {
1904        Self::from_expr(self.expr().clone().first(), None)
1905    }
1906
1907    /// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
1908    pub fn last_value(&self) -> Column {
1909        Self::from_expr(self.expr().clone().last(), None)
1910    }
1911
1912    /// Percent rank in partition: (rank - 1) / (count - 1). Window is applied; do not call .over() again.
1913    pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
1914        use polars::prelude::*;
1915        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
1916        let opts = RankOptions {
1917            method: RankMethod::Min,
1918            descending,
1919        };
1920        let rank_expr = self
1921            .expr()
1922            .clone()
1923            .rank(opts, None)
1924            .over(partition_exprs.clone());
1925        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
1926        let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
1927        let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
1928        let pct = rank_f / count_f;
1929        Self::from_expr(pct, None)
1930    }
1931
1932    /// Cumulative distribution in partition: row_number / count. Window is applied; do not call .over() again.
1933    pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
1934        use polars::prelude::*;
1935        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
1936        let opts = RankOptions {
1937            method: RankMethod::Ordinal,
1938            descending,
1939        };
1940        let row_num = self
1941            .expr()
1942            .clone()
1943            .rank(opts, None)
1944            .over(partition_exprs.clone());
1945        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
1946        let cume = row_num / count_expr;
1947        Self::from_expr(cume.cast(DataType::Float64), None)
1948    }
1949
1950    /// Ntile: bucket 1..n by rank within partition (ceil(rank * n / count)). Window is applied; do not call .over() again.
1951    pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
1952        use polars::prelude::*;
1953        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
1954        let opts = RankOptions {
1955            method: RankMethod::Ordinal,
1956            descending,
1957        };
1958        let rank_expr = self
1959            .expr()
1960            .clone()
1961            .rank(opts, None)
1962            .over(partition_exprs.clone());
1963        let count_expr = self.expr().clone().count().over(partition_exprs.clone());
1964        let n_expr = lit(n as f64);
1965        let rank_f = rank_expr.cast(DataType::Float64);
1966        let count_f = count_expr.cast(DataType::Float64);
1967        let bucket = (rank_f * n_expr / count_f).ceil();
1968        let clamped = bucket.clip(lit(1.0), lit(n as f64));
1969        Self::from_expr(clamped.cast(DataType::Int32), None)
1970    }
1971
1972    /// Nth value in partition by order (1-based n). Returns a Column with window already applied; do not call .over() again.
1973    pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
1974        use polars::prelude::*;
1975        let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
1976        let opts = RankOptions {
1977            method: RankMethod::Ordinal,
1978            descending,
1979        };
1980        let rank_expr = self
1981            .expr()
1982            .clone()
1983            .rank(opts, None)
1984            .over(partition_exprs.clone());
1985        let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
1986        let null_col = Self::from_expr(Expr::Literal(LiteralValue::Null), None);
1987        let value_col = Self::from_expr(self.expr().clone(), None);
1988        let when_expr = crate::functions::when(&cond_col)
1989            .then(&value_col)
1990            .otherwise(&null_col)
1991            .into_expr();
1992        let windowed = when_expr.max().over(partition_exprs);
1993        Self::from_expr(windowed, None)
1994    }
1995
1996    /// Number of elements in list (PySpark size / array_size). Returns Int32.
1997    pub fn array_size(&self) -> Column {
1998        use polars::prelude::*;
1999        Self::from_expr(
2000            self.expr().clone().list().len().cast(DataType::Int32),
2001            Some("size".to_string()),
2002        )
2003    }
2004
2005    /// Cardinality: number of elements in array/list (PySpark cardinality). Alias for array_size.
2006    pub fn cardinality(&self) -> Column {
2007        self.array_size()
2008    }
2009
2010    /// Check if list contains value (PySpark array_contains).
2011    pub fn array_contains(&self, value: Expr) -> Column {
2012        Self::from_expr(self.expr().clone().list().contains(value), None)
2013    }
2014
2015    /// Join list of strings with separator (PySpark array_join).
2016    pub fn array_join(&self, separator: &str) -> Column {
2017        use polars::prelude::*;
2018        Self::from_expr(
2019            self.expr()
2020                .clone()
2021                .list()
2022                .join(lit(separator.to_string()), false),
2023            None,
2024        )
2025    }
2026
2027    /// Maximum element in list (PySpark array_max).
2028    pub fn array_max(&self) -> Column {
2029        Self::from_expr(self.expr().clone().list().max(), None)
2030    }
2031
2032    /// Minimum element in list (PySpark array_min).
2033    pub fn array_min(&self) -> Column {
2034        Self::from_expr(self.expr().clone().list().min(), None)
2035    }
2036
2037    /// Get element at 1-based index (PySpark element_at). Returns null if out of bounds.
2038    pub fn element_at(&self, index: i64) -> Column {
2039        use polars::prelude::*;
2040        // PySpark uses 1-based indexing; Polars uses 0-based. index 1 -> get(0).
2041        let idx = if index >= 1 { index - 1 } else { index };
2042        Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
2043    }
2044
2045    /// Sort list elements (PySpark array_sort). Ascending, nulls last.
2046    pub fn array_sort(&self) -> Column {
2047        use polars::prelude::SortOptions;
2048        let opts = SortOptions {
2049            descending: false,
2050            nulls_last: true,
2051            ..Default::default()
2052        };
2053        Self::from_expr(self.expr().clone().list().sort(opts), None)
2054    }
2055
2056    /// Distinct elements in list (PySpark array_distinct). Preserves first-occurrence order.
2057    pub fn array_distinct(&self) -> Column {
2058        let expr = self.expr().clone().map(
2059            crate::udfs::apply_array_distinct_first_order,
2060            GetOutput::same_type(),
2061        );
2062        Self::from_expr(expr, None)
2063    }
2064
2065    /// Mode aggregation - most frequent value (PySpark mode).
2066    /// Uses value_counts sorted by count descending, then first.
2067    pub fn mode(&self) -> Column {
2068        // value_counts(sort=true, parallel=false, name="count", normalize=false)
2069        // puts highest count first; first() gives the mode
2070        // Struct has "count" and value field; field 0 is typically the value
2071        let vc = self
2072            .expr()
2073            .clone()
2074            .value_counts(true, false, "count", false);
2075        let first_struct = vc.first();
2076        let val_expr = first_struct.struct_().field_by_index(0);
2077        Self::from_expr(val_expr, Some("mode".to_string()))
2078    }
2079
2080    /// Slice list from start with optional length (PySpark slice). 1-based start.
2081    pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
2082        use polars::prelude::*;
2083        let start_expr = lit((start - 1).max(0)); // 1-based to 0-based
2084        let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
2085        Self::from_expr(
2086            self.expr().clone().list().slice(start_expr, length_expr),
2087            None,
2088        )
2089    }
2090
2091    /// Explode list into one row per element (PySpark explode).
2092    pub fn explode(&self) -> Column {
2093        Self::from_expr(self.expr().clone().explode(), None)
2094    }
2095
2096    /// Explode list; null/empty produces one row with null (PySpark explode_outer).
2097    pub fn explode_outer(&self) -> Column {
2098        Self::from_expr(self.expr().clone().explode(), None)
2099    }
2100
2101    /// Posexplode with null preservation (PySpark posexplode_outer).
2102    pub fn posexplode_outer(&self) -> (Column, Column) {
2103        self.posexplode()
2104    }
2105
2106    /// Zip two arrays element-wise into array of structs (PySpark arrays_zip).
2107    pub fn arrays_zip(&self, other: &Column) -> Column {
2108        let args = [other.expr().clone()];
2109        let expr = self.expr().clone().map_many(
2110            crate::udfs::apply_arrays_zip,
2111            &args,
2112            GetOutput::same_type(),
2113        );
2114        Self::from_expr(expr, None)
2115    }
2116
2117    /// True if two arrays have any element in common (PySpark arrays_overlap).
2118    pub fn arrays_overlap(&self, other: &Column) -> Column {
2119        let args = [other.expr().clone()];
2120        let expr = self.expr().clone().map_many(
2121            crate::udfs::apply_arrays_overlap,
2122            &args,
2123            GetOutput::from_type(DataType::Boolean),
2124        );
2125        Self::from_expr(expr, None)
2126    }
2127
2128    /// Collect to array (PySpark array_agg). Alias for implode in group context.
2129    pub fn array_agg(&self) -> Column {
2130        Self::from_expr(self.expr().clone().implode(), None)
2131    }
2132
2133    /// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2134    /// Uses Polars list.eval with col("") as element (requires polars list_eval feature).
2135    pub fn array_position(&self, value: Expr) -> Column {
2136        use polars::prelude::{DataType, NULL};
2137        // In list.eval context, col("") refers to the current list element.
2138        let cond = Self::from_expr(col("").eq(value), None);
2139        let then_val = Self::from_expr(col("").cum_count(false), None);
2140        let else_val = Self::from_expr(lit(NULL), None);
2141        let idx_expr = crate::functions::when(&cond)
2142            .then(&then_val)
2143            .otherwise(&else_val)
2144            .into_expr();
2145        let list_expr = self
2146            .expr()
2147            .clone()
2148            .list()
2149            .eval(idx_expr, false)
2150            .list()
2151            .min()
2152            .fill_null(lit(0i64))
2153            .cast(DataType::Int64);
2154        Self::from_expr(list_expr, Some("array_position".to_string()))
2155    }
2156
2157    /// Remove null elements from list (PySpark array_compact). Preserves order.
2158    pub fn array_compact(&self) -> Column {
2159        let list_expr = self.expr().clone().list().drop_nulls();
2160        Self::from_expr(list_expr, None)
2161    }
2162
2163    /// New list with all elements equal to value removed (PySpark array_remove).
2164    /// Uses list.eval + drop_nulls (requires polars list_eval and list_drop_nulls).
2165    pub fn array_remove(&self, value: Expr) -> Column {
2166        use polars::prelude::NULL;
2167        // when(element != value) then element else null; then drop_nulls.
2168        let cond = Self::from_expr(col("").neq(value), None);
2169        let then_val = Self::from_expr(col(""), None);
2170        let else_val = Self::from_expr(lit(NULL), None);
2171        let elem_neq = crate::functions::when(&cond)
2172            .then(&then_val)
2173            .otherwise(&else_val)
2174            .into_expr();
2175        let list_expr = self
2176            .expr()
2177            .clone()
2178            .list()
2179            .eval(elem_neq, false)
2180            .list()
2181            .drop_nulls();
2182        Self::from_expr(list_expr, None)
2183    }
2184
2185    /// Repeat each element n times (PySpark array_repeat). Implemented via map UDF.
2186    pub fn array_repeat(&self, n: i64) -> Column {
2187        let expr = self.expr().clone().map(
2188            move |c| crate::udfs::apply_array_repeat(c, n),
2189            GetOutput::same_type(),
2190        );
2191        Self::from_expr(expr, None)
2192    }
2193
2194    /// Flatten list of lists to one list (PySpark flatten). Implemented via map UDF.
2195    pub fn array_flatten(&self) -> Column {
2196        let expr = self
2197            .expr()
2198            .clone()
2199            .map(crate::udfs::apply_array_flatten, GetOutput::same_type());
2200        Self::from_expr(expr, None)
2201    }
2202
2203    /// Append element to end of list (PySpark array_append).
2204    pub fn array_append(&self, elem: &Column) -> Column {
2205        let args = [elem.expr().clone()];
2206        let expr = self.expr().clone().map_many(
2207            crate::udfs::apply_array_append,
2208            &args,
2209            GetOutput::same_type(),
2210        );
2211        Self::from_expr(expr, None)
2212    }
2213
2214    /// Prepend element to start of list (PySpark array_prepend).
2215    pub fn array_prepend(&self, elem: &Column) -> Column {
2216        let args = [elem.expr().clone()];
2217        let expr = self.expr().clone().map_many(
2218            crate::udfs::apply_array_prepend,
2219            &args,
2220            GetOutput::same_type(),
2221        );
2222        Self::from_expr(expr, None)
2223    }
2224
2225    /// Insert element at 1-based position (PySpark array_insert).
2226    pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
2227        let args = [pos.expr().clone(), elem.expr().clone()];
2228        let expr = self.expr().clone().map_many(
2229            crate::udfs::apply_array_insert,
2230            &args,
2231            GetOutput::same_type(),
2232        );
2233        Self::from_expr(expr, None)
2234    }
2235
2236    /// Elements in first array not in second (PySpark array_except).
2237    pub fn array_except(&self, other: &Column) -> Column {
2238        let args = [other.expr().clone()];
2239        let expr = self.expr().clone().map_many(
2240            crate::udfs::apply_array_except,
2241            &args,
2242            GetOutput::same_type(),
2243        );
2244        Self::from_expr(expr, None)
2245    }
2246
2247    /// Elements in both arrays (PySpark array_intersect).
2248    pub fn array_intersect(&self, other: &Column) -> Column {
2249        let args = [other.expr().clone()];
2250        let expr = self.expr().clone().map_many(
2251            crate::udfs::apply_array_intersect,
2252            &args,
2253            GetOutput::same_type(),
2254        );
2255        Self::from_expr(expr, None)
2256    }
2257
2258    /// Distinct elements from both arrays (PySpark array_union).
2259    pub fn array_union(&self, other: &Column) -> Column {
2260        let args = [other.expr().clone()];
2261        let expr = self.expr().clone().map_many(
2262            crate::udfs::apply_array_union,
2263            &args,
2264            GetOutput::same_type(),
2265        );
2266        Self::from_expr(expr, None)
2267    }
2268
2269    /// Zip two arrays element-wise with merge function (PySpark zip_with). Shorter array padded with null.
2270    /// Merge Expr uses col("").struct_().field_by_name("left") and field_by_name("right").
2271    pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
2272        let args = [other.expr().clone()];
2273        let zip_expr = self.expr().clone().map_many(
2274            crate::udfs::apply_zip_arrays_to_struct,
2275            &args,
2276            GetOutput::same_type(),
2277        );
2278        let list_expr = zip_expr.list().eval(merge, false);
2279        Self::from_expr(list_expr, None)
2280    }
2281
2282    /// True if any list element satisfies the predicate (PySpark exists). Uses list.eval(pred).list().any().
2283    pub fn array_exists(&self, predicate: Expr) -> Column {
2284        let pred_expr = self
2285            .expr()
2286            .clone()
2287            .list()
2288            .eval(predicate, false)
2289            .list()
2290            .any();
2291        Self::from_expr(pred_expr, Some("exists".to_string()))
2292    }
2293
2294    /// True if all list elements satisfy the predicate (PySpark forall). Uses list.eval(pred).list().all().
2295    pub fn array_forall(&self, predicate: Expr) -> Column {
2296        let pred_expr = self
2297            .expr()
2298            .clone()
2299            .list()
2300            .eval(predicate, false)
2301            .list()
2302            .all();
2303        Self::from_expr(pred_expr, Some("forall".to_string()))
2304    }
2305
2306    /// Filter list elements by predicate (PySpark filter). Keeps elements where predicate is true.
2307    pub fn array_filter(&self, predicate: Expr) -> Column {
2308        use polars::prelude::NULL;
2309        let then_val = Self::from_expr(col(""), None);
2310        let else_val = Self::from_expr(lit(NULL), None);
2311        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
2312            .then(&then_val)
2313            .otherwise(&else_val)
2314            .into_expr();
2315        let list_expr = self
2316            .expr()
2317            .clone()
2318            .list()
2319            .eval(elem_expr, false)
2320            .list()
2321            .drop_nulls();
2322        Self::from_expr(list_expr, None)
2323    }
2324
2325    /// Transform list elements by expression (PySpark transform). list.eval(expr).
2326    pub fn array_transform(&self, f: Expr) -> Column {
2327        let list_expr = self.expr().clone().list().eval(f, false);
2328        Self::from_expr(list_expr, None)
2329    }
2330
2331    /// Sum of list elements (PySpark aggregate with sum). Uses list.sum().
2332    pub fn array_sum(&self) -> Column {
2333        Self::from_expr(self.expr().clone().list().sum(), None)
2334    }
2335
2336    /// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list). Full (zero, merge, finish) deferred.
2337    pub fn array_aggregate(&self, zero: &Column) -> Column {
2338        let sum_expr = self.expr().clone().list().sum();
2339        Self::from_expr(sum_expr + zero.expr().clone(), None)
2340    }
2341
2342    /// Mean of list elements (PySpark aggregate with avg). Uses list.mean().
2343    pub fn array_mean(&self) -> Column {
2344        Self::from_expr(self.expr().clone().list().mean(), None)
2345    }
2346
2347    /// Explode list with position (PySpark posexplode). Returns (pos_col, value_col).
2348    /// pos is 1-based; uses list.eval(cum_count()).explode() and explode().
2349    pub fn posexplode(&self) -> (Column, Column) {
2350        let pos_expr = self
2351            .expr()
2352            .clone()
2353            .list()
2354            .eval(col("").cum_count(false), false)
2355            .explode();
2356        let val_expr = self.expr().clone().explode();
2357        (
2358            Self::from_expr(pos_expr, Some("pos".to_string())),
2359            Self::from_expr(val_expr, Some("col".to_string())),
2360        )
2361    }
2362
2363    /// Extract keys from a map column (PySpark map_keys). Map column is List(Struct{key, value}).
2364    pub fn map_keys(&self) -> Column {
2365        let elem_key = col("").struct_().field_by_name("key");
2366        let list_expr = self.expr().clone().list().eval(elem_key, false);
2367        Self::from_expr(list_expr, None)
2368    }
2369
2370    /// Extract values from a map column (PySpark map_values). Map column is List(Struct{key, value}).
2371    pub fn map_values(&self) -> Column {
2372        let elem_val = col("").struct_().field_by_name("value");
2373        let list_expr = self.expr().clone().list().eval(elem_val, false);
2374        Self::from_expr(list_expr, None)
2375    }
2376
2377    /// Return map as list of structs {key, value} (PySpark map_entries). Identity for List(Struct) column.
2378    pub fn map_entries(&self) -> Column {
2379        Self::from_expr(self.expr().clone(), None)
2380    }
2381
2382    /// Build map from two array columns (keys, values) (PySpark map_from_arrays). Implemented via map_many UDF.
2383    pub fn map_from_arrays(&self, values: &Column) -> Column {
2384        let args = [values.expr().clone()];
2385        let expr = self.expr().clone().map_many(
2386            crate::udfs::apply_map_from_arrays,
2387            &args,
2388            GetOutput::same_type(),
2389        );
2390        Self::from_expr(expr, None)
2391    }
2392
2393    /// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2394    pub fn map_concat(&self, other: &Column) -> Column {
2395        let args = [other.expr().clone()];
2396        let expr = self.expr().clone().map_many(
2397            crate::udfs::apply_map_concat,
2398            &args,
2399            GetOutput::same_type(),
2400        );
2401        Self::from_expr(expr, None)
2402    }
2403
2404    /// Transform each map key by expr (PySpark transform_keys). key_expr should use col("").struct_().field_by_name("key").
2405    pub fn transform_keys(&self, key_expr: Expr) -> Column {
2406        use polars::prelude::as_struct;
2407        let value = col("").struct_().field_by_name("value");
2408        let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
2409        let list_expr = self.expr().clone().list().eval(new_struct, false);
2410        Self::from_expr(list_expr, None)
2411    }
2412
2413    /// Transform each map value by expr (PySpark transform_values). value_expr should use col("").struct_().field_by_name("value").
2414    pub fn transform_values(&self, value_expr: Expr) -> Column {
2415        use polars::prelude::as_struct;
2416        let key = col("").struct_().field_by_name("key");
2417        let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
2418        let list_expr = self.expr().clone().list().eval(new_struct, false);
2419        Self::from_expr(list_expr, None)
2420    }
2421
2422    /// Merge two maps by key with merge function (PySpark map_zip_with).
2423    /// Merge Expr uses col("").struct_().field_by_name("value1") and field_by_name("value2").
2424    pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
2425        use polars::prelude::as_struct;
2426        let args = [other.expr().clone()];
2427        let zip_expr = self.expr().clone().map_many(
2428            crate::udfs::apply_map_zip_to_struct,
2429            &args,
2430            GetOutput::same_type(),
2431        );
2432        let key_field = col("").struct_().field_by_name("key").alias("key");
2433        let value_field = merge.alias("value");
2434        let merge_expr = as_struct(vec![key_field, value_field]);
2435        let list_expr = zip_expr.list().eval(merge_expr, false);
2436        Self::from_expr(list_expr, None)
2437    }
2438
2439    /// Filter map entries by predicate (PySpark map_filter). Keeps key-value pairs where predicate is true.
2440    /// Predicate uses col("").struct_().field_by_name("key") and field_by_name("value") to reference key/value.
2441    pub fn map_filter(&self, predicate: Expr) -> Column {
2442        use polars::prelude::NULL;
2443        let then_val = Self::from_expr(col(""), None);
2444        let else_val = Self::from_expr(lit(NULL), None);
2445        let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
2446            .then(&then_val)
2447            .otherwise(&else_val)
2448            .into_expr();
2449        let list_expr = self
2450            .expr()
2451            .clone()
2452            .list()
2453            .eval(elem_expr, false)
2454            .list()
2455            .drop_nulls();
2456        Self::from_expr(list_expr, None)
2457    }
2458
2459    /// Array of structs {key, value} to map (PySpark map_from_entries). Identity for List(Struct) format.
2460    pub fn map_from_entries(&self) -> Column {
2461        Self::from_expr(self.expr().clone(), None)
2462    }
2463
2464    /// True if map contains key (PySpark map_contains_key).
2465    pub fn map_contains_key(&self, key: &Column) -> Column {
2466        let args = [key.expr().clone()];
2467        let expr = self.expr().clone().map_many(
2468            crate::udfs::apply_map_contains_key,
2469            &args,
2470            GetOutput::from_type(DataType::Boolean),
2471        );
2472        Self::from_expr(expr, None)
2473    }
2474
2475    /// Get value for key from map, or null (PySpark get).
2476    pub fn get(&self, key: &Column) -> Column {
2477        let args = [key.expr().clone()];
2478        let expr =
2479            self.expr()
2480                .clone()
2481                .map_many(crate::udfs::apply_get, &args, GetOutput::same_type());
2482        Self::from_expr(expr, None)
2483    }
2484
2485    /// Extract JSON path from string column (PySpark get_json_object). Uses Polars str().json_path_match.
2486    pub fn get_json_object(&self, path: &str) -> Column {
2487        let path_expr = polars::prelude::lit(path.to_string());
2488        let out = self.expr().clone().str().json_path_match(path_expr);
2489        Self::from_expr(out, None)
2490    }
2491
2492    /// Parse string column as JSON into struct (PySpark from_json). Uses Polars str().json_decode.
2493    pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
2494        let out = self.expr().clone().str().json_decode(schema, None);
2495        Self::from_expr(out, None)
2496    }
2497
2498    /// Serialize struct column to JSON string (PySpark to_json). Uses Polars struct().json_encode.
2499    pub fn to_json(&self) -> Column {
2500        let out = self.expr().clone().struct_().json_encode();
2501        Self::from_expr(out, None)
2502    }
2503
2504    /// Length of JSON array at path (PySpark json_array_length). UDF.
2505    pub fn json_array_length(&self, path: &str) -> Column {
2506        let path = path.to_string();
2507        let expr = self.expr().clone().map(
2508            move |s| crate::udfs::apply_json_array_length(s, &path),
2509            GetOutput::from_type(DataType::Int64),
2510        );
2511        Self::from_expr(expr, None)
2512    }
2513
2514    /// Keys of JSON object (PySpark json_object_keys). Returns list of strings. UDF.
2515    pub fn json_object_keys(&self) -> Column {
2516        let expr = self.expr().clone().map(
2517            crate::udfs::apply_json_object_keys,
2518            GetOutput::from_type(DataType::List(Box::new(DataType::String))),
2519        );
2520        Self::from_expr(expr, None)
2521    }
2522
2523    /// Extract keys from JSON as struct (PySpark json_tuple). UDF. Returns struct with one string field per key.
2524    pub fn json_tuple(&self, keys: &[&str]) -> Column {
2525        let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
2526        let struct_fields: Vec<polars::datatypes::Field> = keys_vec
2527            .iter()
2528            .map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
2529            .collect();
2530        let expr = self.expr().clone().map(
2531            move |s| crate::udfs::apply_json_tuple(s, &keys_vec),
2532            GetOutput::from_type(DataType::Struct(struct_fields)),
2533        );
2534        Self::from_expr(expr, None)
2535    }
2536
2537    /// Parse CSV string to struct (PySpark from_csv). Minimal: split by comma, up to 32 columns. UDF.
2538    pub fn from_csv(&self) -> Column {
2539        let expr = self.expr().clone().map(
2540            crate::udfs::apply_from_csv,
2541            GetOutput::from_type(DataType::Struct(vec![])),
2542        );
2543        Self::from_expr(expr, None)
2544    }
2545
2546    /// Format struct as CSV string (PySpark to_csv). Minimal. UDF.
2547    pub fn to_csv(&self) -> Column {
2548        let expr = self.expr().clone().map(
2549            crate::udfs::apply_to_csv,
2550            GetOutput::from_type(DataType::String),
2551        );
2552        Self::from_expr(expr, None)
2553    }
2554
2555    /// Parse URL and extract part (PySpark parse_url). UDF.
2556    /// When part is QUERY/QUERYSTRING and key is Some(k), returns the value for that query parameter only.
2557    pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
2558        let part = part.to_string();
2559        let key_owned = key.map(String::from);
2560        let expr = self.expr().clone().map(
2561            move |s| crate::udfs::apply_parse_url(s, &part, key_owned.as_deref()),
2562            GetOutput::from_type(DataType::String),
2563        );
2564        Self::from_expr(expr, None)
2565    }
2566
2567    /// Hash of column value (PySpark hash). Single-column version.
2568    pub fn hash(&self) -> Column {
2569        let expr = self.expr().clone().map(
2570            crate::udfs::apply_hash_one,
2571            GetOutput::from_type(DataType::Int64),
2572        );
2573        Self::from_expr(expr, None)
2574    }
2575
2576    /// Check if column values are in the other column's list/series (PySpark isin).
2577    pub fn isin(&self, other: &Column) -> Column {
2578        let out = self.expr().clone().is_in(other.expr().clone());
2579        Self::from_expr(out, None)
2580    }
2581
2582    /// Percent-decode URL-encoded string (PySpark url_decode). Uses UDF.
2583    pub fn url_decode(&self) -> Column {
2584        let expr = self.expr().clone().map(
2585            crate::udfs::apply_url_decode,
2586            GetOutput::from_type(DataType::String),
2587        );
2588        Self::from_expr(expr, None)
2589    }
2590
2591    /// Percent-encode string for URL (PySpark url_encode). Uses UDF.
2592    pub fn url_encode(&self) -> Column {
2593        let expr = self.expr().clone().map(
2594            crate::udfs::apply_url_encode,
2595            GetOutput::from_type(DataType::String),
2596        );
2597        Self::from_expr(expr, None)
2598    }
2599
2600    /// Bitwise left shift (PySpark shiftLeft). col << n = col * 2^n.
2601    pub fn shift_left(&self, n: i32) -> Column {
2602        use polars::prelude::*;
2603        let pow = lit(2i64).pow(lit(n as i64));
2604        Self::from_expr(
2605            (self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
2606            None,
2607        )
2608    }
2609
2610    /// Bitwise signed right shift (PySpark shiftRight). col >> n = col / 2^n.
2611    pub fn shift_right(&self, n: i32) -> Column {
2612        use polars::prelude::*;
2613        let pow = lit(2i64).pow(lit(n as i64));
2614        Self::from_expr(
2615            (self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
2616            None,
2617        )
2618    }
2619
2620    /// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift.
2621    pub fn shift_right_unsigned(&self, n: i32) -> Column {
2622        let expr = self.expr().clone().map(
2623            move |s| crate::udfs::apply_shift_right_unsigned(s, n),
2624            GetOutput::from_type(DataType::Int64),
2625        );
2626        Self::from_expr(expr, None)
2627    }
2628}
2629
2630#[cfg(test)]
2631mod tests {
2632    use super::Column;
2633    use polars::prelude::{col, df, lit, IntoLazy};
2634
2635    /// Helper to create a simple DataFrame for testing
2636    fn test_df() -> polars::prelude::DataFrame {
2637        df!(
2638            "a" => &[1, 2, 3, 4, 5],
2639            "b" => &[10, 20, 30, 40, 50]
2640        )
2641        .unwrap()
2642    }
2643
2644    /// Helper to create a DataFrame with nulls for testing
2645    fn test_df_with_nulls() -> polars::prelude::DataFrame {
2646        df!(
2647            "a" => &[Some(1), Some(2), None, Some(4), None],
2648            "b" => &[Some(10), None, Some(30), None, None]
2649        )
2650        .unwrap()
2651    }
2652
2653    #[test]
2654    fn test_column_new() {
2655        let column = Column::new("age".to_string());
2656        assert_eq!(column.name(), "age");
2657    }
2658
2659    #[test]
2660    fn test_column_from_expr() {
2661        let expr = col("test");
2662        let column = Column::from_expr(expr, Some("test".to_string()));
2663        assert_eq!(column.name(), "test");
2664    }
2665
2666    #[test]
2667    fn test_column_from_expr_default_name() {
2668        let expr = col("test").gt(lit(5));
2669        let column = Column::from_expr(expr, None);
2670        assert_eq!(column.name(), "<expr>");
2671    }
2672
2673    #[test]
2674    fn test_column_alias() {
2675        let column = Column::new("original".to_string());
2676        let aliased = column.alias("new_name");
2677        assert_eq!(aliased.name(), "new_name");
2678    }
2679
2680    #[test]
2681    fn test_column_gt() {
2682        let df = test_df();
2683        let column = Column::new("a".to_string());
2684        let result = column.gt(lit(3));
2685
2686        // Apply the expression to filter the DataFrame
2687        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2688        assert_eq!(filtered.height(), 2); // rows with a > 3: 4, 5
2689    }
2690
2691    #[test]
2692    fn test_column_lt() {
2693        let df = test_df();
2694        let column = Column::new("a".to_string());
2695        let result = column.lt(lit(3));
2696
2697        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2698        assert_eq!(filtered.height(), 2); // rows with a < 3: 1, 2
2699    }
2700
2701    #[test]
2702    fn test_column_eq() {
2703        let df = test_df();
2704        let column = Column::new("a".to_string());
2705        let result = column.eq(lit(3));
2706
2707        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2708        assert_eq!(filtered.height(), 1); // only row with a == 3
2709    }
2710
2711    #[test]
2712    fn test_column_neq() {
2713        let df = test_df();
2714        let column = Column::new("a".to_string());
2715        let result = column.neq(lit(3));
2716
2717        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2718        assert_eq!(filtered.height(), 4); // rows with a != 3
2719    }
2720
2721    #[test]
2722    fn test_column_gt_eq() {
2723        let df = test_df();
2724        let column = Column::new("a".to_string());
2725        let result = column.gt_eq(lit(3));
2726
2727        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2728        assert_eq!(filtered.height(), 3); // rows with a >= 3: 3, 4, 5
2729    }
2730
2731    #[test]
2732    fn test_column_lt_eq() {
2733        let df = test_df();
2734        let column = Column::new("a".to_string());
2735        let result = column.lt_eq(lit(3));
2736
2737        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2738        assert_eq!(filtered.height(), 3); // rows with a <= 3: 1, 2, 3
2739    }
2740
2741    #[test]
2742    fn test_column_is_null() {
2743        let df = test_df_with_nulls();
2744        let column = Column::new("a".to_string());
2745        let result = column.is_null();
2746
2747        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2748        assert_eq!(filtered.height(), 2); // 2 null values in column 'a'
2749    }
2750
2751    #[test]
2752    fn test_column_is_not_null() {
2753        let df = test_df_with_nulls();
2754        let column = Column::new("a".to_string());
2755        let result = column.is_not_null();
2756
2757        let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
2758        assert_eq!(filtered.height(), 3); // 3 non-null values in column 'a'
2759    }
2760
2761    #[test]
2762    fn test_eq_null_safe_both_null() {
2763        // Create a DataFrame where both columns have NULL at the same row
2764        let df = df!(
2765            "a" => &[Some(1), None, Some(3)],
2766            "b" => &[Some(1), None, Some(4)]
2767        )
2768        .unwrap();
2769
2770        let col_a = Column::new("a".to_string());
2771        let col_b = Column::new("b".to_string());
2772        let result = col_a.eq_null_safe(&col_b);
2773
2774        // Apply the expression and collect
2775        let result_df = df
2776            .lazy()
2777            .with_column(result.into_expr().alias("eq_null_safe"))
2778            .collect()
2779            .unwrap();
2780
2781        // Get the result column
2782        let eq_col = result_df.column("eq_null_safe").unwrap();
2783        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
2784
2785        // Row 0: 1 == 1 -> true
2786        // Row 1: NULL <=> NULL -> true
2787        // Row 2: 3 == 4 -> false
2788        assert_eq!(values[0], Some(true));
2789        assert_eq!(values[1], Some(true)); // NULL-safe: both NULL = true
2790        assert_eq!(values[2], Some(false));
2791    }
2792
2793    #[test]
2794    fn test_eq_null_safe_one_null() {
2795        // Create a DataFrame where only one column has NULL
2796        let df = df!(
2797            "a" => &[Some(1), None, Some(3)],
2798            "b" => &[Some(1), Some(2), None]
2799        )
2800        .unwrap();
2801
2802        let col_a = Column::new("a".to_string());
2803        let col_b = Column::new("b".to_string());
2804        let result = col_a.eq_null_safe(&col_b);
2805
2806        let result_df = df
2807            .lazy()
2808            .with_column(result.into_expr().alias("eq_null_safe"))
2809            .collect()
2810            .unwrap();
2811
2812        let eq_col = result_df.column("eq_null_safe").unwrap();
2813        let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
2814
2815        // Row 0: 1 == 1 -> true
2816        // Row 1: NULL <=> 2 -> false (one is null, not both)
2817        // Row 2: 3 <=> NULL -> false (one is null, not both)
2818        assert_eq!(values[0], Some(true));
2819        assert_eq!(values[1], Some(false));
2820        assert_eq!(values[2], Some(false));
2821    }
2822}