Skip to main content

robin_sparkless_expr/
udfs.rs

1//! Helpers for element-wise UDFs used by map() expressions (soundex, levenshtein, crc32, xxhash64, array_flatten, array_repeat).
2//! These run at plan execution time when Polars invokes the closure.
3
4use chrono::{Datelike, TimeZone};
5use chrono_tz::Tz;
6use polars::prelude::*;
7use regex::Regex;
8use std::borrow::Cow;
9
10/// Split string by delimiter with at most `limit` parts; remainder in last part (PySpark split with limit).
11/// Returns List(String). When limit <= 0, splits without limit.
12pub fn apply_split_with_limit(
13    column: Column,
14    delimiter: &str,
15    limit: i32,
16) -> PolarsResult<Option<Column>> {
17    let name = column.field().into_owned().name;
18    let series = column.take_materialized_series();
19    let ca = series
20        .str()
21        .map_err(|e| PolarsError::ComputeError(format!("split_with_limit: {e}").into()))?;
22    let n = if limit <= 0 {
23        usize::MAX
24    } else {
25        limit as usize
26    };
27    let values_capacity = ca.len().saturating_mul(64);
28    let mut builder =
29        ListStringChunkedBuilder::new(name.as_str().into(), ca.len(), values_capacity);
30    for opt_s in ca.into_iter() {
31        match opt_s {
32            Some(s) => {
33                if delimiter.is_empty() {
34                    // Empty delimiter: treat as no limit (split to chars would need different handling)
35                    builder.append_values_iter(s.split(delimiter));
36                } else {
37                    builder.append_values_iter(s.splitn(n, delimiter));
38                }
39            }
40            None => builder.append_null(),
41        }
42    }
43    let out = builder.finish();
44    Ok(Some(Column::new(name, out.into_series())))
45}
46
47/// Split string by regex and return 1-based part (for split_part with regex delimiter).
48pub fn apply_split_part_regex(
49    column: Column,
50    pattern: &str,
51    part_num: i64,
52) -> PolarsResult<Option<Column>> {
53    let name = column.field().into_owned().name;
54    let series = column.take_materialized_series();
55    let ca = series
56        .str()
57        .map_err(|e| PolarsError::ComputeError(format!("split_part_regex: {e}").into()))?;
58    let re = Regex::new(pattern)
59        .map_err(|e| PolarsError::ComputeError(format!("split_part_regex pattern: {e}").into()))?;
60    let out = StringChunked::from_iter_options(
61        name.as_str().into(),
62        ca.into_iter().map(|opt_s| {
63            opt_s.map(|s| {
64                let parts: Vec<&str> = re.split(s).collect();
65                let idx = if part_num > 0 {
66                    (part_num - 1) as usize
67                } else {
68                    let n = parts.len() as i64;
69                    (n + part_num) as usize
70                };
71                parts.get(idx).map(|&p| p.to_string()).unwrap_or_default()
72            })
73        }),
74    );
75    Ok(Some(Column::new(name, out.into_series())))
76}
77
78/// American Soundex code (4 chars). Matches PySpark soundex semantics.
79fn soundex_one(s: &str) -> Cow<'_, str> {
80    use soundex::american_soundex;
81    let code = american_soundex(s);
82    Cow::Owned(code.chars().take(4).collect::<String>())
83}
84
85/// Apply soundex to a string column; returns a new Column (Series).
86pub fn apply_soundex(column: Column) -> PolarsResult<Option<Column>> {
87    let name = column.field().into_owned().name;
88    let series = column.take_materialized_series();
89    let ca = series
90        .str()
91        .map_err(|e| PolarsError::ComputeError(format!("soundex: {e}").into()))?;
92    let out: StringChunked = ca.apply_values(soundex_one);
93    Ok(Some(Column::new(name, out.into_series())))
94}
95
96/// Apply CRC32 to string bytes (PySpark crc32).
97pub fn apply_crc32(column: Column) -> PolarsResult<Option<Column>> {
98    use crc32fast::Hasher;
99    let name = column.field().into_owned().name;
100    let series = column.take_materialized_series();
101    let ca = series
102        .str()
103        .map_err(|e| PolarsError::ComputeError(format!("crc32: {e}").into()))?;
104    let out = Int64Chunked::from_iter_options(
105        name.as_str().into(),
106        ca.into_iter().map(|opt_s| {
107            opt_s.map(|s| {
108                let mut hasher = Hasher::new();
109                hasher.update(s.as_bytes());
110                hasher.finalize() as i64
111            })
112        }),
113    );
114    Ok(Some(Column::new(name, out.into_series())))
115}
116
117/// Apply XXH64 hash (PySpark xxhash64).
118pub fn apply_xxhash64(column: Column) -> PolarsResult<Option<Column>> {
119    use std::hash::Hasher;
120    use twox_hash::XxHash64;
121    let name = column.field().into_owned().name;
122    let series = column.take_materialized_series();
123    let ca = series
124        .str()
125        .map_err(|e| PolarsError::ComputeError(format!("xxhash64: {e}").into()))?;
126    let out = Int64Chunked::from_iter_options(
127        name.as_str().into(),
128        ca.into_iter().map(|opt_s| {
129            opt_s.map(|s| {
130                let mut hasher = XxHash64::default();
131                hasher.write(s.as_bytes());
132                hasher.finish() as i64
133            })
134        }),
135    );
136    Ok(Some(Column::new(name, out.into_series())))
137}
138
139/// Levenshtein distance between two string columns (element-wise).
140pub fn apply_levenshtein(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
141    use strsim::levenshtein;
142    if columns.len() < 2 {
143        return Err(PolarsError::ComputeError(
144            "levenshtein needs two columns".into(),
145        ));
146    }
147    let name = columns[0].field().into_owned().name;
148    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
149    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
150    let a_ca = a_series
151        .str()
152        .map_err(|e| PolarsError::ComputeError(format!("levenshtein: {e}").into()))?;
153    let b_ca = b_series
154        .str()
155        .map_err(|e| PolarsError::ComputeError(format!("levenshtein: {e}").into()))?;
156    let out = Int64Chunked::from_iter_options(
157        name.as_str().into(),
158        a_ca.into_iter().zip(b_ca).map(|(a, b)| match (a, b) {
159            (Some(a), Some(b)) => Some(levenshtein(a, b) as i64),
160            _ => None,
161        }),
162    );
163    Ok(Some(Column::new(name, out.into_series())))
164}
165
166/// Flatten list-of-lists to a single list per row (PySpark flatten).
167pub fn apply_array_flatten(column: Column) -> PolarsResult<Option<Column>> {
168    let name = column.field().into_owned().name;
169    let series = column.take_materialized_series();
170    let list_ca = series
171        .list()
172        .map_err(|e| PolarsError::ComputeError(format!("array_flatten: {e}").into()))?;
173    let inner_dtype = match list_ca.inner_dtype() {
174        DataType::List(inner) => *inner.clone(),
175        other => other.clone(),
176    };
177    let out = list_ca.try_apply_amortized(|amort_s| {
178        let s = amort_s.as_ref();
179        let list_s = s.as_list();
180        if list_s.is_empty() {
181            return Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype));
182        }
183        let mut acc: Vec<Series> = Vec::new();
184        for elem in list_s.amortized_iter().flatten() {
185            acc.push(elem.deep_clone());
186        }
187        if acc.is_empty() {
188            Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
189        } else {
190            let mut result = acc.remove(0);
191            for s in acc {
192                result.extend(&s)?;
193            }
194            Ok(result)
195        }
196    })?;
197    Ok(Some(Column::new(name, out.into_series())))
198}
199
200/// Distinct elements in list preserving first-occurrence order (PySpark array_distinct parity).
201pub fn apply_array_distinct_first_order(column: Column) -> PolarsResult<Option<Column>> {
202    let name = column.field().into_owned().name;
203    let series = column.take_materialized_series();
204    let list_ca = series
205        .list()
206        .map_err(|e| PolarsError::ComputeError(format!("array_distinct: {e}").into()))?;
207    let inner_dtype = list_ca.inner_dtype().clone();
208    let out = list_ca.try_apply_amortized(|amort_s| {
209        let list_s = amort_s.as_ref().as_list();
210        let mut result: Vec<Series> = Vec::new();
211        for elem in list_s.amortized_iter().flatten() {
212            let taken = elem.deep_clone();
213            let is_dup = result.iter().any(|s| s.get(0).ok() == taken.get(0).ok());
214            if !is_dup {
215                result.push(taken);
216            }
217        }
218        if result.is_empty() {
219            Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
220        } else {
221            let mut combined = result.remove(0);
222            for s in result {
223                combined.extend(&s)?;
224            }
225            Ok(combined)
226        }
227    })?;
228    Ok(Some(Column::new(name, out.into_series())))
229}
230
231/// Repeat each element n times (PySpark array_repeat).
232/// Supports both: (1) scalar column (string, int, etc.) - create array of n copies; (2) List column - repeat each element within the list.
233pub fn apply_array_repeat(column: Column, n: i64) -> PolarsResult<Option<Column>> {
234    let name = column.field().into_owned().name;
235    let series = column.take_materialized_series();
236    let n_usize = n.max(0) as usize;
237
238    // Scalar column: create array of n copies of each element (PySpark array_repeat on string/int/etc.)
239    if !matches!(series.dtype(), DataType::List(_)) {
240        use polars::chunked_array::builder::get_list_builder;
241        let inner_dtype = series.dtype().clone();
242        let len = series.len();
243        let mut builder = get_list_builder(&inner_dtype, 64, len, name.as_str().into());
244        for i in 0..len {
245            let opt_av = series.get(i);
246            let elem_series = match opt_av {
247                Ok(av) => any_value_to_single_series(av, &inner_dtype)?,
248                Err(_) => Series::new_empty(PlSmallStr::EMPTY, &inner_dtype),
249            };
250            let mut repeated = elem_series.clone();
251            for _ in 1..n_usize {
252                repeated.extend(&elem_series)?;
253            }
254            builder.append_series(&repeated).map_err(|e| {
255                PolarsError::ComputeError(format!("array_repeat scalar: {e}").into())
256            })?;
257        }
258        let out = builder.finish().into_series();
259        return Ok(Some(Column::new(name, out)));
260    }
261
262    // List column: repeat each element within the list
263    let list_ca = series
264        .list()
265        .map_err(|e| PolarsError::ComputeError(format!("array_repeat: {e}").into()))?;
266    let inner_dtype = list_ca.inner_dtype().clone();
267    let n = n.max(0) as usize;
268    let out = list_ca.try_apply_amortized(move |amort_s| {
269        let list_s = amort_s.as_ref().as_list();
270        let mut repeated: Vec<Series> = Vec::new();
271        for elem in list_s.amortized_iter().flatten() {
272            let taken = elem.deep_clone();
273            for _ in 0..n {
274                repeated.push(taken.clone());
275            }
276        }
277        if repeated.is_empty() {
278            Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
279        } else {
280            let mut result = repeated.remove(0);
281            for s in repeated {
282                result.extend(&s)?;
283            }
284            Ok(result)
285        }
286    })?;
287    Ok(Some(Column::new(name, out.into_series())))
288}
289
290fn any_value_to_single_series(av: AnyValue, dtype: &DataType) -> PolarsResult<Series> {
291    Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &[av], dtype, false)
292}
293
294/// Append element to end of each list (PySpark array_append).
295pub fn apply_array_append(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
296    use std::cell::RefCell;
297    if columns.len() < 2 {
298        return Err(PolarsError::ComputeError(
299            "array_append needs two columns (array, element)".into(),
300        ));
301    }
302    let name = columns[0].field().into_owned().name;
303    let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
304    let elem_series = std::mem::take(&mut columns[1]).take_materialized_series();
305    let list_ca = list_series
306        .list()
307        .map_err(|e| PolarsError::ComputeError(format!("array_append: {e}").into()))?;
308    let inner_dtype = list_ca.inner_dtype().clone();
309    let elem_casted = elem_series.cast(&inner_dtype)?;
310    let elem_len = elem_casted.len();
311    let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
312    let idx = RefCell::new(0usize);
313    let out = list_ca.try_apply_amortized(|amort_s| {
314        let i = *idx.borrow();
315        *idx.borrow_mut() += 1;
316        let ei = if elem_len == 1 { 0 } else { i };
317        let list_s = amort_s.as_ref().as_list();
318        let mut acc: Vec<Series> = Vec::new();
319        for e in list_s.amortized_iter().flatten() {
320            acc.push(e.deep_clone());
321        }
322        if let Some(Some(av)) = elem_vec.get(ei) {
323            let single = any_value_to_single_series(av.clone(), &inner_dtype)?;
324            acc.push(single);
325        }
326        if acc.is_empty() {
327            Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
328        } else {
329            let mut result = acc.remove(0);
330            for s in acc {
331                result.extend(&s)?;
332            }
333            Ok(result)
334        }
335    })?;
336    Ok(Some(Column::new(name, out.into_series())))
337}
338
339/// Prepend element to start of each list (PySpark array_prepend).
340pub fn apply_array_prepend(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
341    use std::cell::RefCell;
342    if columns.len() < 2 {
343        return Err(PolarsError::ComputeError(
344            "array_prepend needs two columns (array, element)".into(),
345        ));
346    }
347    let name = columns[0].field().into_owned().name;
348    let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
349    let elem_series = std::mem::take(&mut columns[1]).take_materialized_series();
350    let list_ca = list_series
351        .list()
352        .map_err(|e| PolarsError::ComputeError(format!("array_prepend: {e}").into()))?;
353    let inner_dtype = list_ca.inner_dtype().clone();
354    let elem_casted = elem_series.cast(&inner_dtype)?;
355    let elem_len = elem_casted.len();
356    let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
357    let idx = RefCell::new(0usize);
358    let out = list_ca.try_apply_amortized(|amort_s| {
359        let i = *idx.borrow();
360        *idx.borrow_mut() += 1;
361        let ei = if elem_len == 1 { 0 } else { i };
362        let list_s = amort_s.as_ref().as_list();
363        let mut acc: Vec<Series> = Vec::new();
364        if let Some(Some(av)) = elem_vec.get(ei) {
365            let single = any_value_to_single_series(av.clone(), &inner_dtype)?;
366            acc.push(single);
367        }
368        for e in list_s.amortized_iter().flatten() {
369            acc.push(e.deep_clone());
370        }
371        if acc.is_empty() {
372            Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
373        } else {
374            let mut result = acc.remove(0);
375            for s in acc {
376                result.extend(&s)?;
377            }
378            Ok(result)
379        }
380    })?;
381    Ok(Some(Column::new(name, out.into_series())))
382}
383
384/// Insert element at 1-based position (PySpark array_insert). Negative pos = from end.
385pub fn apply_array_insert(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
386    use std::cell::RefCell;
387    if columns.len() < 3 {
388        return Err(PolarsError::ComputeError(
389            "array_insert needs three columns (array, position, element)".into(),
390        ));
391    }
392    let name = columns[0].field().into_owned().name;
393    let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
394    let pos_series = std::mem::take(&mut columns[1]).take_materialized_series();
395    let elem_series = std::mem::take(&mut columns[2]).take_materialized_series();
396    let list_ca = list_series
397        .list()
398        .map_err(|e| PolarsError::ComputeError(format!("array_insert: {e}").into()))?;
399    let inner_dtype = list_ca.inner_dtype().clone();
400    let pos_ca = pos_series
401        .cast(&DataType::Int64)?
402        .i64()
403        .map_err(|e| {
404            PolarsError::ComputeError(format!("array_insert: position column: {e}").into())
405        })?
406        .clone();
407    let elem_casted = elem_series.cast(&inner_dtype)?;
408    let pos_len = pos_ca.len();
409    let pos_vec: Vec<i64> = (0..pos_len).map(|i| pos_ca.get(i).unwrap_or(1)).collect();
410    let elem_len = elem_casted.len();
411    let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
412    let idx = RefCell::new(0usize);
413    let out = list_ca.try_apply_amortized(|amort_s| {
414        let i = *idx.borrow();
415        *idx.borrow_mut() += 1;
416        let pi = if pos_len == 1 { 0 } else { i };
417        let ei = if elem_len == 1 { 0 } else { i };
418        let list_s = amort_s.as_ref().as_list();
419        let pos_val = pos_vec.get(pi).copied().unwrap_or(1);
420        let mut acc: Vec<Series> = Vec::new();
421        for e in list_s.amortized_iter().flatten() {
422            acc.push(e.deep_clone());
423        }
424        let len = acc.len() as i64;
425        let pos = if pos_val < 0 {
426            (len + pos_val + 1).max(0).min(len) as usize
427        } else {
428            ((pos_val - 1).max(0)).min(len) as usize
429        };
430        let single = elem_vec
431            .get(ei)
432            .and_then(|o| o.as_ref())
433            .map(|av: &AnyValue| any_value_to_single_series(av.clone(), &inner_dtype));
434        if let Some(Ok(s)) = single {
435            acc.insert(pos, s);
436        }
437        if acc.is_empty() {
438            Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
439        } else {
440            let mut result = acc.remove(0);
441            for s in acc {
442                result.extend(&s)?;
443            }
444            Ok(result)
445        }
446    })?;
447    Ok(Some(Column::new(name, out.into_series())))
448}
449
450fn series_to_set_key(s: &Series) -> String {
451    if s.len() == 1 {
452        if let Ok(av) = s.get(0) {
453            return format!("{:?}", av);
454        }
455    }
456    std::string::ToString::to_string(s)
457}
458
459/// Elements in first array not in second (PySpark array_except).
460pub fn apply_array_except(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
461    if columns.len() < 2 {
462        return Err(PolarsError::ComputeError(
463            "array_except needs two columns (array1, array2)".into(),
464        ));
465    }
466    let name = columns[0].field().into_owned().name;
467    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
468    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
469    let a_ca = a_series
470        .list()
471        .map_err(|e| PolarsError::ComputeError(format!("array_except: {e}").into()))?;
472    let b_ca = b_series
473        .list()
474        .map_err(|e| PolarsError::ComputeError(format!("array_except: {e}").into()))?;
475    let inner_dtype = a_ca.inner_dtype().clone();
476    let mut builder = polars::chunked_array::builder::get_list_builder(
477        &inner_dtype,
478        64,
479        a_ca.len(),
480        name.as_str().into(),
481    );
482    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
483        match (opt_a, opt_b) {
484            (Some(a_amort), Some(b_amort)) => {
485                let a_list = a_amort.as_ref().as_list();
486                let b_list = b_amort.as_ref().as_list();
487                let b_keys: std::collections::HashSet<String> = b_list
488                    .amortized_iter()
489                    .flatten()
490                    .map(|e| series_to_set_key(&e.deep_clone()))
491                    .collect();
492                let mut acc: Vec<Series> = Vec::new();
493                let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
494                for e in a_list.amortized_iter().flatten() {
495                    let s = e.deep_clone();
496                    let key = series_to_set_key(&s);
497                    if !b_keys.contains(&key) && seen.insert(key) {
498                        acc.push(s);
499                    }
500                }
501                let result = if acc.is_empty() {
502                    Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
503                } else {
504                    let mut r = acc.remove(0);
505                    for s in acc {
506                        r.extend(&s)?;
507                    }
508                    r
509                };
510                builder.append_series(&result)?;
511            }
512            _ => builder.append_null(),
513        }
514    }
515    Ok(Some(Column::new(name, builder.finish().into_series())))
516}
517
518/// True if two arrays have any element in common (PySpark arrays_overlap).
519pub fn apply_arrays_overlap(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
520    if columns.len() < 2 {
521        return Err(PolarsError::ComputeError(
522            "arrays_overlap needs two columns (array1, array2)".into(),
523        ));
524    }
525    let name = columns[0].field().into_owned().name;
526    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
527    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
528    let a_ca = a_series
529        .list()
530        .map_err(|e| PolarsError::ComputeError(format!("arrays_overlap: {e}").into()))?;
531    let b_ca = b_series
532        .list()
533        .map_err(|e| PolarsError::ComputeError(format!("arrays_overlap: {e}").into()))?;
534    let mut results: Vec<bool> = Vec::with_capacity(a_ca.len());
535    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
536        let overlap = match (opt_a, opt_b) {
537            (Some(a_amort), Some(b_amort)) => {
538                let a_list = a_amort.as_ref().as_list();
539                let b_list = b_amort.as_ref().as_list();
540                let a_keys: std::collections::HashSet<String> = a_list
541                    .amortized_iter()
542                    .flatten()
543                    .map(|e| series_to_set_key(&e.deep_clone()))
544                    .collect();
545                let b_keys: std::collections::HashSet<String> = b_list
546                    .amortized_iter()
547                    .flatten()
548                    .map(|e| series_to_set_key(&e.deep_clone()))
549                    .collect();
550                !a_keys.is_disjoint(&b_keys)
551            }
552            _ => false,
553        };
554        results.push(overlap);
555    }
556    let out =
557        BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter().map(Some));
558    Ok(Some(Column::new(name, out.into_series())))
559}
560
561/// Zip two arrays into array of structs (PySpark arrays_zip).
562pub fn apply_arrays_zip(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
563    if columns.len() < 2 {
564        return Err(PolarsError::ComputeError(
565            "arrays_zip needs at least two columns".into(),
566        ));
567    }
568    let name = columns[0].field().into_owned().name;
569    let n = columns.len();
570    let mut series_vec: Vec<Series> = Vec::with_capacity(n);
571    for col in columns.iter_mut() {
572        series_vec.push(std::mem::take(col).take_materialized_series());
573    }
574    let list_cas: Vec<_> = series_vec
575        .iter()
576        .map(|s| {
577            s.list()
578                .map_err(|e| PolarsError::ComputeError(format!("arrays_zip: {e}").into()))
579        })
580        .collect::<PolarsResult<Vec<_>>>()?;
581    let len = list_cas[0].len();
582    let inner_dtype = list_cas[0].inner_dtype().clone();
583    use polars::chunked_array::StructChunked;
584    use polars::chunked_array::builder::get_list_builder;
585    use polars::datatypes::Field;
586    let struct_fields: Vec<Field> = (0..n)
587        .map(|i| Field::new(format!("field_{i}").into(), inner_dtype.clone()))
588        .collect();
589    let out_struct = DataType::Struct(struct_fields);
590    let mut builder = get_list_builder(&out_struct, 64, len, name.as_str().into());
591    for row_idx in 0..len {
592        let mut max_len = 0usize;
593        let mut row_lists: Vec<Vec<Series>> = Vec::with_capacity(n);
594        for ca in &list_cas {
595            let opt_amort = ca.amortized_iter().nth(row_idx).flatten();
596            if let Some(amort) = opt_amort {
597                let list_s = amort.as_ref().as_list();
598                let elems: Vec<Series> = list_s
599                    .amortized_iter()
600                    .flatten()
601                    .map(|e| e.deep_clone())
602                    .collect();
603                max_len = max_len.max(elems.len());
604                row_lists.push(elems);
605            } else {
606                row_lists.push(vec![]);
607            }
608        }
609        if max_len == 0 {
610            builder.append_null();
611        } else {
612            let mut struct_parts: Vec<Vec<Series>> =
613                (0..n).map(|_| Vec::with_capacity(max_len)).collect();
614            for i in 0..max_len {
615                for (j, lst) in row_lists.iter().enumerate() {
616                    let elem = lst
617                        .get(i)
618                        .cloned()
619                        .unwrap_or_else(|| Series::full_null(PlSmallStr::EMPTY, 1, &inner_dtype));
620                    struct_parts[j].push(elem);
621                }
622            }
623            let field_series: Vec<Series> = struct_parts
624                .into_iter()
625                .enumerate()
626                .map(|(j, mut parts)| {
627                    let mut r = parts.remove(0);
628                    for s in parts {
629                        let _ = r.extend(&s);
630                    }
631                    r.with_name(format!("field_{j}").as_str().into())
632                })
633                .collect();
634            let field_refs: Vec<&Series> = field_series.iter().collect();
635            let st =
636                StructChunked::from_series(PlSmallStr::EMPTY, max_len, field_refs.iter().copied())
637                    .map_err(|e| {
638                        PolarsError::ComputeError(format!("arrays_zip struct: {e}").into())
639                    })?
640                    .into_series();
641            builder.append_series(&st)?;
642        }
643    }
644    Ok(Some(Column::new(name, builder.finish().into_series())))
645}
646
647/// Elements in both arrays (PySpark array_intersect). Distinct.
648pub fn apply_array_intersect(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
649    if columns.len() < 2 {
650        return Err(PolarsError::ComputeError(
651            "array_intersect needs two columns (array1, array2)".into(),
652        ));
653    }
654    let name = columns[0].field().into_owned().name;
655    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
656    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
657    let a_ca = a_series
658        .list()
659        .map_err(|e| PolarsError::ComputeError(format!("array_intersect: {e}").into()))?;
660    let b_ca = b_series
661        .list()
662        .map_err(|e| PolarsError::ComputeError(format!("array_intersect: {e}").into()))?;
663    let inner_dtype = a_ca.inner_dtype().clone();
664    let mut builder = polars::chunked_array::builder::get_list_builder(
665        &inner_dtype,
666        64,
667        a_ca.len(),
668        name.as_str().into(),
669    );
670    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
671        match (opt_a, opt_b) {
672            (Some(a_amort), Some(b_amort)) => {
673                let a_list = a_amort.as_ref().as_list();
674                let b_list = b_amort.as_ref().as_list();
675                let b_keys: std::collections::HashSet<String> = b_list
676                    .amortized_iter()
677                    .flatten()
678                    .map(|e| series_to_set_key(&e.deep_clone()))
679                    .collect();
680                let mut acc: Vec<Series> = Vec::new();
681                let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
682                for e in a_list.amortized_iter().flatten() {
683                    let s = e.deep_clone();
684                    let key = series_to_set_key(&s);
685                    if b_keys.contains(&key) && seen.insert(key) {
686                        acc.push(s);
687                    }
688                }
689                let result = if acc.is_empty() {
690                    Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
691                } else {
692                    let mut r = acc.remove(0);
693                    for s in acc {
694                        r.extend(&s)?;
695                    }
696                    r
697                };
698                builder.append_series(&result)?;
699            }
700            _ => builder.append_null(),
701        }
702    }
703    Ok(Some(Column::new(name, builder.finish().into_series())))
704}
705
706/// Distinct elements from both arrays (PySpark array_union).
707pub fn apply_array_union(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
708    if columns.len() < 2 {
709        return Err(PolarsError::ComputeError(
710            "array_union needs two columns (array1, array2)".into(),
711        ));
712    }
713    let name = columns[0].field().into_owned().name;
714    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
715    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
716    let a_ca = a_series
717        .list()
718        .map_err(|e| PolarsError::ComputeError(format!("array_union: {e}").into()))?;
719    let b_ca = b_series
720        .list()
721        .map_err(|e| PolarsError::ComputeError(format!("array_union: {e}").into()))?;
722    let inner_dtype = a_ca.inner_dtype().clone();
723    let mut builder = polars::chunked_array::builder::get_list_builder(
724        &inner_dtype,
725        64,
726        a_ca.len(),
727        name.as_str().into(),
728    );
729    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
730        match (opt_a, opt_b) {
731            (Some(a_amort), Some(b_amort)) => {
732                let a_list = a_amort.as_ref().as_list();
733                let b_list = b_amort.as_ref().as_list();
734                let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
735                let mut acc: Vec<Series> = Vec::new();
736                for e in a_list.amortized_iter().flatten() {
737                    let s = e.deep_clone();
738                    let key = series_to_set_key(&s);
739                    if seen.insert(key) {
740                        acc.push(s);
741                    }
742                }
743                for e in b_list.amortized_iter().flatten() {
744                    let s = e.deep_clone();
745                    let key = series_to_set_key(&s);
746                    if seen.insert(key) {
747                        acc.push(s);
748                    }
749                }
750                let result = if acc.is_empty() {
751                    Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
752                } else {
753                    let mut r = acc.remove(0);
754                    for s in acc {
755                        r.extend(&s)?;
756                    }
757                    r
758                };
759                builder.append_series(&result)?;
760            }
761            _ => builder.append_null(),
762        }
763    }
764    Ok(Some(Column::new(name, builder.finish().into_series())))
765}
766
767/// Parse string to map: "k1:v1,k2:v2" -> List(Struct{key, value}) (PySpark str_to_map).
768pub fn apply_str_to_map(
769    column: Column,
770    pair_delim: &str,
771    key_value_delim: &str,
772) -> PolarsResult<Option<Column>> {
773    use polars::chunked_array::StructChunked;
774    use polars::chunked_array::builder::get_list_builder;
775    use polars::datatypes::Field;
776    let name = column.field().into_owned().name;
777    let series = column.take_materialized_series();
778    let ca = series
779        .str()
780        .map_err(|e| PolarsError::ComputeError(format!("str_to_map: {e}").into()))?;
781    let out_struct = DataType::Struct(vec![
782        Field::new("key".into(), DataType::String),
783        Field::new("value".into(), DataType::String),
784    ]);
785    let mut builder = get_list_builder(&out_struct, 64, ca.len(), name.as_str().into());
786    for opt_s in ca.into_iter() {
787        if let Some(s) = opt_s {
788            let pairs: Vec<(String, String)> = s
789                .split(pair_delim)
790                .filter_map(|part| {
791                    let kv: Vec<&str> = part.splitn(2, key_value_delim).collect();
792                    if kv.len() >= 2 {
793                        Some((kv[0].to_string(), kv[1].to_string()))
794                    } else if kv.len() == 1 && !kv[0].is_empty() {
795                        Some((kv[0].to_string(), String::new()))
796                    } else {
797                        None
798                    }
799                })
800                .collect();
801            if pairs.is_empty() {
802                builder.append_null();
803            } else {
804                let keys: Vec<String> = pairs.iter().map(|(k, _)| k.clone()).collect();
805                let vals: Vec<String> = pairs.iter().map(|(_, v)| v.clone()).collect();
806                let k_series = Series::new("key".into(), keys);
807                let v_series = Series::new("value".into(), vals);
808                let fields: [&Series; 2] = [&k_series, &v_series];
809                let st = StructChunked::from_series(
810                    PlSmallStr::EMPTY,
811                    pairs.len(),
812                    fields.iter().copied(),
813                )
814                .map_err(|e| PolarsError::ComputeError(format!("str_to_map struct: {e}").into()))?
815                .into_series();
816                builder.append_series(&st)?;
817            }
818        } else {
819            builder.append_null();
820        }
821    }
822    Ok(Some(Column::new(name, builder.finish().into_series())))
823}
824
825/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
826pub fn apply_map_concat(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
827    use polars::chunked_array::StructChunked;
828    use polars::chunked_array::builder::get_list_builder;
829    use polars::datatypes::Field;
830    if columns.len() < 2 {
831        return Err(PolarsError::ComputeError(
832            "map_concat needs at least two columns".into(),
833        ));
834    }
835    let name = columns[0].field().into_owned().name;
836    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
837    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
838    let a_ca = a_series
839        .list()
840        .map_err(|e| PolarsError::ComputeError(format!("map_concat: {e}").into()))?;
841    let b_ca = b_series
842        .list()
843        .map_err(|e| PolarsError::ComputeError(format!("map_concat: {e}").into()))?;
844    let struct_dtype = a_ca.inner_dtype().clone();
845    let (key_dtype, value_dtype) = match &struct_dtype {
846        DataType::Struct(fields) => {
847            let k = fields
848                .iter()
849                .find(|f| f.name == "key")
850                .map(|f| f.dtype.clone())
851                .unwrap_or(DataType::String);
852            let v = fields
853                .iter()
854                .find(|f| f.name == "value")
855                .map(|f| f.dtype.clone())
856                .unwrap_or(DataType::String);
857            (k, v)
858        }
859        _ => (DataType::String, DataType::String),
860    };
861    let out_struct = DataType::Struct(vec![
862        Field::new("key".into(), key_dtype),
863        Field::new("value".into(), value_dtype),
864    ]);
865    let n = a_ca.len();
866    let mut builder = get_list_builder(&out_struct, 64, n, name.as_str().into());
867    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
868        let mut merged: std::collections::BTreeMap<String, (Series, Series)> =
869            std::collections::BTreeMap::new();
870        for amort in [opt_a, opt_b].into_iter().flatten() {
871            let list_s = amort.as_ref().as_list();
872            for elem in list_s.amortized_iter().flatten() {
873                let s = elem.deep_clone();
874                let st = s.struct_().map_err(|e| {
875                    PolarsError::ComputeError(format!("map_concat struct: {e}").into())
876                })?;
877                let k_s = st.field_by_name("key").map_err(|e| {
878                    PolarsError::ComputeError(format!("map_concat key: {e}").into())
879                })?;
880                let v_s = st.field_by_name("value").map_err(|e| {
881                    PolarsError::ComputeError(format!("map_concat value: {e}").into())
882                })?;
883                let key = std::string::ToString::to_string(&k_s);
884                merged.insert(key, (k_s, v_s));
885            }
886        }
887        if merged.is_empty() {
888            builder.append_null();
889        } else {
890            let mut row_structs: Vec<Series> = Vec::new();
891            for (_, (k_s, v_s)) in merged {
892                let len = k_s.len();
893                let fields: [&Series; 2] = [&k_s, &v_s];
894                let st = StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
895                    .map_err(|e| {
896                        PolarsError::ComputeError(format!("map_concat build: {e}").into())
897                    })?
898                    .into_series();
899                row_structs.push(st);
900            }
901            let mut combined = row_structs.remove(0);
902            for s in row_structs {
903                combined.extend(&s)?;
904            }
905            builder.append_series(&combined)?;
906        }
907    }
908    Ok(Some(Column::new(name, builder.finish().into_series())))
909}
910
911/// True if map contains key (PySpark map_contains_key).
912pub fn apply_map_contains_key(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
913    if columns.len() < 2 {
914        return Err(PolarsError::ComputeError(
915            "map_contains_key needs two columns (map, key)".into(),
916        ));
917    }
918    let name = columns[0].field().into_owned().name;
919    let map_series = std::mem::take(&mut columns[0]).take_materialized_series();
920    let key_series = std::mem::take(&mut columns[1]).take_materialized_series();
921    let map_ca = map_series
922        .list()
923        .map_err(|e| PolarsError::ComputeError(format!("map_contains_key: {e}").into()))?;
924    let key_str = key_series.cast(&DataType::String)?;
925    let key_vec: Vec<String> = (0..key_str.len())
926        .map(|i| key_str.get(i).map(|av| av.to_string()).unwrap_or_default())
927        .collect();
928    let key_len = key_vec.len();
929    let mut results: Vec<bool> = Vec::with_capacity(map_ca.len());
930    for (i, opt_amort) in map_ca.amortized_iter().enumerate() {
931        let target = key_vec
932            .get(if key_len == 1 { 0 } else { i })
933            .map(|s| s.as_str())
934            .unwrap_or("");
935        let mut found = false;
936        if let Some(amort) = opt_amort {
937            let list_s = amort.as_ref().as_list();
938            for elem in list_s.amortized_iter().flatten() {
939                let s = elem.deep_clone();
940                if let Ok(st) = s.struct_() {
941                    if let Ok(k) = st.field_by_name("key") {
942                        let k_str: String =
943                            k.get(0).ok().map(|av| av.to_string()).unwrap_or_default();
944                        if k_str == target {
945                            found = true;
946                            break;
947                        }
948                    }
949                }
950            }
951        }
952        results.push(found);
953    }
954    let out =
955        BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter().map(Some));
956    Ok(Some(Column::new(name, out.into_series())))
957}
958
959/// Get value for key from map, or null (PySpark get).
960pub fn apply_get(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
961    if columns.len() < 2 {
962        return Err(PolarsError::ComputeError(
963            "get needs two columns (map, key)".into(),
964        ));
965    }
966    let name = columns[0].field().into_owned().name;
967    let map_series = std::mem::take(&mut columns[0]).take_materialized_series();
968    let key_series = std::mem::take(&mut columns[1]).take_materialized_series();
969    let map_ca = map_series
970        .list()
971        .map_err(|e| PolarsError::ComputeError(format!("get: {e}").into()))?;
972    let key_str = key_series.cast(&DataType::String)?;
973    let key_vec: Vec<String> = (0..key_str.len())
974        .map(|i| key_str.get(i).map(|av| av.to_string()).unwrap_or_default())
975        .collect();
976    let key_len = key_vec.len();
977    let value_dtype = match map_ca.inner_dtype() {
978        DataType::Struct(fields) => fields
979            .iter()
980            .find(|f| f.name == "value")
981            .map(|f| f.dtype.clone())
982            .unwrap_or(DataType::String),
983        _ => DataType::String,
984    };
985    let mut result_series: Vec<Series> = Vec::with_capacity(map_ca.len());
986    for (i, opt_amort) in map_ca.amortized_iter().enumerate() {
987        let target = key_vec
988            .get(if key_len == 1 { 0 } else { i })
989            .map(|s| s.as_str())
990            .unwrap_or("");
991        let mut found: Option<Series> = None;
992        if let Some(amort) = opt_amort {
993            let list_s = amort.as_ref().as_list();
994            for elem in list_s.amortized_iter().flatten() {
995                let s = elem.deep_clone();
996                if let Ok(st) = s.struct_() {
997                    if let Ok(k) = st.field_by_name("key") {
998                        let k_str: String =
999                            k.get(0).ok().map(|av| av.to_string()).unwrap_or_default();
1000                        if k_str == target {
1001                            if let Ok(v) = st.field_by_name("value") {
1002                                found = Some(v);
1003                            }
1004                            break;
1005                        }
1006                    }
1007                }
1008            }
1009        }
1010        result_series
1011            .push(found.unwrap_or_else(|| Series::full_null(PlSmallStr::EMPTY, 1, &value_dtype)));
1012    }
1013    let mut out = result_series.remove(0);
1014    for s in result_series {
1015        out.extend(&s)?;
1016    }
1017    Ok(Some(Column::new(name, out)))
1018}
1019
1020/// ASCII value of first character (PySpark ascii). Returns Int32.
1021pub fn apply_ascii(column: Column) -> PolarsResult<Option<Column>> {
1022    let name = column.field().into_owned().name;
1023    let series = column.take_materialized_series();
1024    let ca = series
1025        .str()
1026        .map_err(|e| PolarsError::ComputeError(format!("ascii: {e}").into()))?;
1027    let out = Int32Chunked::from_iter_options(
1028        name.as_str().into(),
1029        ca.into_iter()
1030            .map(|opt_s| opt_s.and_then(|s| s.chars().next().map(|c| c as i32))),
1031    );
1032    Ok(Some(Column::new(name, out.into_series())))
1033}
1034
1035/// Format numeric column as string with fixed decimal places (PySpark format_number).
1036pub fn apply_format_number(column: Column, decimals: u32) -> PolarsResult<Option<Column>> {
1037    let name = column.field().into_owned().name;
1038    let series = column.take_materialized_series();
1039    let prec = decimals as usize;
1040    let out: StringChunked = match series.dtype() {
1041        DataType::Float64 => {
1042            let ca = series
1043                .f64()
1044                .map_err(|e| PolarsError::ComputeError(format!("format_number: {e}").into()))?;
1045            StringChunked::from_iter_options(
1046                name.as_str().into(),
1047                ca.into_iter()
1048                    .map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
1049            )
1050        }
1051        DataType::Float32 => {
1052            let ca = series
1053                .f32()
1054                .map_err(|e| PolarsError::ComputeError(format!("format_number: {e}").into()))?;
1055            StringChunked::from_iter_options(
1056                name.as_str().into(),
1057                ca.into_iter()
1058                    .map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
1059            )
1060        }
1061        _ => {
1062            let f64_series = series.cast(&DataType::Float64).map_err(|e| {
1063                PolarsError::ComputeError(format!("format_number cast: {e}").into())
1064            })?;
1065            let ca = f64_series
1066                .f64()
1067                .map_err(|e| PolarsError::ComputeError(format!("format_number: {e}").into()))?;
1068            StringChunked::from_iter_options(
1069                name.as_str().into(),
1070                ca.into_iter()
1071                    .map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
1072            )
1073        }
1074    };
1075    Ok(Some(Column::new(name, out.into_series())))
1076}
1077
1078/// Return the value at index `i` in `s` as a string, or None if null/unsupported type.
1079fn series_value_at_as_string(s: &Series, i: usize) -> Option<String> {
1080    match s.dtype() {
1081        DataType::String => s.str().ok()?.get(i).map(|v| v.to_string()),
1082        DataType::Int32 => s.i32().ok()?.get(i).map(|v| v.to_string()),
1083        DataType::Int64 => s.i64().ok()?.get(i).map(|v| v.to_string()),
1084        DataType::Float32 => s.f32().ok()?.get(i).map(|v| v.to_string()),
1085        DataType::Float64 => s.f64().ok()?.get(i).map(|v| v.to_string()),
1086        DataType::Boolean => s.bool().ok()?.get(i).map(|v| v.to_string()),
1087        _ => s.get(i).ok().map(|av| av.to_string()),
1088    }
1089}
1090
1091/// Format columns with printf-style format string (PySpark format_string / printf).
1092/// Supports %s, %d, %i, %f, %g, %%. Null in any column yields null result.
1093pub fn apply_format_string(columns: &mut [Column], format: &str) -> PolarsResult<Option<Column>> {
1094    let n = columns.len();
1095    if n == 0 {
1096        return Err(PolarsError::ComputeError(
1097            "format_string needs at least one column".into(),
1098        ));
1099    }
1100    let name = columns[0].field().into_owned().name;
1101    let mut series_vec: Vec<Series> = Vec::with_capacity(n);
1102    for col in columns.iter_mut() {
1103        series_vec.push(std::mem::take(col).take_materialized_series());
1104    }
1105    let len = series_vec[0].len();
1106    let mut result = Vec::with_capacity(len);
1107    for i in 0..len {
1108        let values: Vec<Option<String>> = series_vec
1109            .iter()
1110            .map(|s| series_value_at_as_string(s, i))
1111            .collect();
1112        let out = format_string_row(format, &values);
1113        result.push(out);
1114    }
1115    let out = StringChunked::from_iter_options(name.as_str().into(), result.into_iter());
1116    Ok(Some(Column::new(name, out.into_series())))
1117}
1118
1119fn format_string_row(format: &str, values: &[Option<String>]) -> Option<String> {
1120    let mut out = String::new();
1121    let mut idx = 0;
1122    let mut chars = format.chars().peekable();
1123    while let Some(c) = chars.next() {
1124        if c == '%' {
1125            match chars.next() {
1126                Some('%') => out.push('%'),
1127                Some('s') => {
1128                    if idx >= values.len() {
1129                        return None;
1130                    }
1131                    match &values[idx] {
1132                        Some(v) => out.push_str(v),
1133                        None => return None,
1134                    }
1135                    idx += 1;
1136                }
1137                Some('d') | Some('i') => {
1138                    if idx >= values.len() {
1139                        return None;
1140                    }
1141                    match &values[idx] {
1142                        Some(v) => out.push_str(v),
1143                        None => return None,
1144                    }
1145                    idx += 1;
1146                }
1147                Some('f') | Some('g') | Some('e') => {
1148                    if idx >= values.len() {
1149                        return None;
1150                    }
1151                    match &values[idx] {
1152                        Some(v) => out.push_str(v),
1153                        None => return None,
1154                    }
1155                    idx += 1;
1156                }
1157                _ => return None,
1158            }
1159        } else {
1160            out.push(c);
1161        }
1162    }
1163    Some(out)
1164}
1165
1166/// Find 1-based index of str in comma-delimited set (PySpark find_in_set).
1167/// Returns 0 if not found or if str contains comma.
1168/// map_many: `columns[0]`=str, `columns[1]`=set
1169pub fn apply_find_in_set(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
1170    if columns.len() < 2 {
1171        return Err(PolarsError::ComputeError(
1172            "find_in_set needs two columns".into(),
1173        ));
1174    }
1175    let name = columns[0].field().into_owned().name;
1176    let str_series = std::mem::take(&mut columns[0]).take_materialized_series();
1177    let set_series = std::mem::take(&mut columns[1]).take_materialized_series();
1178    let str_ca = str_series
1179        .str()
1180        .map_err(|e| PolarsError::ComputeError(format!("find_in_set: {e}").into()))?;
1181    let set_ca = set_series
1182        .str()
1183        .map_err(|e| PolarsError::ComputeError(format!("find_in_set: {e}").into()))?;
1184    let out = Int64Chunked::from_iter_options(
1185        name.as_str().into(),
1186        str_ca
1187            .into_iter()
1188            .zip(set_ca)
1189            .map(|(opt_str, opt_set)| match (opt_str, opt_set) {
1190                (Some(s), Some(set_str)) => {
1191                    if s.contains(',') {
1192                        Some(0i64)
1193                    } else {
1194                        let parts: Vec<&str> = set_str.split(',').collect();
1195                        let idx = parts.iter().position(|p| *p == s);
1196                        Some(idx.map(|i| (i + 1) as i64).unwrap_or(0))
1197                    }
1198                }
1199                _ => None,
1200            }),
1201    );
1202    Ok(Some(Column::new(name, out.into_series())))
1203}
1204
1205/// regexp_extract using fancy-regex when pattern has lookahead/lookbehind (PySpark parity).
1206/// Polars str().extract() uses regex crate which does not support lookaround.
1207pub fn apply_regexp_extract_lookaround(
1208    column: Column,
1209    pattern: &str,
1210    group_index: usize,
1211) -> PolarsResult<Option<Column>> {
1212    use fancy_regex::Regex;
1213    let name = column.field().into_owned().name;
1214    let series = column.take_materialized_series();
1215    let ca = series
1216        .str()
1217        .map_err(|e| PolarsError::ComputeError(format!("regexp_extract: {e}").into()))?;
1218    let re = Regex::new(pattern).map_err(|e| {
1219        PolarsError::ComputeError(
1220            format!("regexp_extract invalid regex (lookaround) '{pattern}': {e}").into(),
1221        )
1222    })?;
1223    let out = StringChunked::from_iter_options(
1224        name.as_str().into(),
1225        ca.into_iter().map(|opt_s| {
1226            opt_s.and_then(|s| {
1227                let caps = re.captures(s).ok().flatten()?;
1228                let m = caps.get(group_index)?;
1229                Some(m.as_str().to_string())
1230            })
1231        }),
1232    );
1233    Ok(Some(Column::new(name, out.into_series())))
1234}
1235
1236/// Regexp instr: 1-based position of first regex match (PySpark regexp_instr).
1237/// group_idx: 0 = full match, 1+ = capture group. Returns null if no match.
1238pub fn apply_regexp_instr(
1239    column: Column,
1240    pattern: String,
1241    group_idx: usize,
1242) -> PolarsResult<Option<Column>> {
1243    use regex::Regex;
1244    let name = column.field().into_owned().name;
1245    let series = column.take_materialized_series();
1246    let ca = series
1247        .str()
1248        .map_err(|e| PolarsError::ComputeError(format!("regexp_instr: {e}").into()))?;
1249    let re = Regex::new(&pattern).map_err(|e| {
1250        PolarsError::ComputeError(format!("regexp_instr invalid regex '{pattern}': {e}").into())
1251    })?;
1252    let out = Int64Chunked::from_iter_options(
1253        name.as_str().into(),
1254        ca.into_iter().map(|opt_s| {
1255            opt_s.and_then(|s| {
1256                let m = if group_idx == 0 {
1257                    re.find(s)
1258                } else {
1259                    re.captures(s).and_then(|cap| cap.get(group_idx))
1260                };
1261                m.map(|m| (m.start() + 1) as i64)
1262            })
1263        }),
1264    );
1265    Ok(Some(Column::new(name, out.into_series())))
1266}
1267
1268/// Base64 encode string bytes (PySpark base64). Input string UTF-8, output base64 string.
1269pub fn apply_base64(column: Column) -> PolarsResult<Option<Column>> {
1270    use base64::Engine;
1271    let name = column.field().into_owned().name;
1272    let series = column.take_materialized_series();
1273    let ca = series
1274        .str()
1275        .map_err(|e| PolarsError::ComputeError(format!("base64: {e}").into()))?;
1276    let out = StringChunked::from_iter_options(
1277        name.as_str().into(),
1278        ca.into_iter().map(|opt_s| {
1279            opt_s.map(|s| base64::engine::general_purpose::STANDARD.encode(s.as_bytes()))
1280        }),
1281    );
1282    Ok(Some(Column::new(name, out.into_series())))
1283}
1284
1285/// Base64 decode to string (PySpark unbase64). Output UTF-8 string; invalid decode → null.
1286pub fn apply_unbase64(column: Column) -> PolarsResult<Option<Column>> {
1287    use base64::Engine;
1288    let name = column.field().into_owned().name;
1289    let series = column.take_materialized_series();
1290    let ca = series
1291        .str()
1292        .map_err(|e| PolarsError::ComputeError(format!("unbase64: {e}").into()))?;
1293    let out = StringChunked::from_iter_options(
1294        name.as_str().into(),
1295        ca.into_iter().map(|opt_s| {
1296            opt_s.and_then(|s| {
1297                let decoded = base64::engine::general_purpose::STANDARD
1298                    .decode(s.as_bytes())
1299                    .ok()?;
1300                String::from_utf8(decoded).ok()
1301            })
1302        }),
1303    );
1304    Ok(Some(Column::new(name, out.into_series())))
1305}
1306
1307/// SHA1 hash of string bytes, return hex string (PySpark sha1).
1308pub fn apply_sha1(column: Column) -> PolarsResult<Option<Column>> {
1309    use sha1::Digest;
1310    let name = column.field().into_owned().name;
1311    let series = column.take_materialized_series();
1312    let ca = series
1313        .str()
1314        .map_err(|e| PolarsError::ComputeError(format!("sha1: {e}").into()))?;
1315    let out = StringChunked::from_iter_options(
1316        name.as_str().into(),
1317        ca.into_iter().map(|opt_s| {
1318            opt_s.map(|s| {
1319                let mut hasher = sha1::Sha1::new();
1320                hasher.update(s.as_bytes());
1321                format!("{:x}", hasher.finalize())
1322            })
1323        }),
1324    );
1325    Ok(Some(Column::new(name, out.into_series())))
1326}
1327
1328/// SHA2 hash of string bytes, return hex string (PySpark sha2). bit_length 256 or 384 or 512.
1329pub fn apply_sha2(column: Column, bit_length: i32) -> PolarsResult<Option<Column>> {
1330    let name = column.field().into_owned().name;
1331    let series = column.take_materialized_series();
1332    let ca = series
1333        .str()
1334        .map_err(|e| PolarsError::ComputeError(format!("sha2: {e}").into()))?;
1335    let out = StringChunked::from_iter_options(
1336        name.as_str().into(),
1337        ca.into_iter().map(|opt_s| {
1338            opt_s.map(|s| {
1339                let bytes = s.as_bytes();
1340                use sha2::Digest;
1341                match bit_length {
1342                    256 => format!("{:x}", sha2::Sha256::digest(bytes)),
1343                    384 => format!("{:x}", sha2::Sha384::digest(bytes)),
1344                    512 => format!("{:x}", sha2::Sha512::digest(bytes)),
1345                    _ => format!("{:x}", sha2::Sha256::digest(bytes)),
1346                }
1347            })
1348        }),
1349    );
1350    Ok(Some(Column::new(name, out.into_series())))
1351}
1352
1353/// MD5 hash of string bytes, return hex string (PySpark md5).
1354pub fn apply_md5(column: Column) -> PolarsResult<Option<Column>> {
1355    let name = column.field().into_owned().name;
1356    let series = column.take_materialized_series();
1357    let ca = series
1358        .str()
1359        .map_err(|e| PolarsError::ComputeError(format!("md5: {e}").into()))?;
1360    let out = StringChunked::from_iter_options(
1361        name.as_str().into(),
1362        ca.into_iter()
1363            .map(|opt_s| opt_s.map(|s| format!("{:x}", md5::compute(s.as_bytes())))),
1364    );
1365    Ok(Some(Column::new(name, out.into_series())))
1366}
1367
1368/// Encode string to binary (PySpark encode). Charset: UTF-8, hex. Returns hex string representation of bytes.
1369pub fn apply_encode(column: Column, charset: &str) -> PolarsResult<Option<Column>> {
1370    let name = column.field().into_owned().name;
1371    let series = column.take_materialized_series();
1372    let ca = series
1373        .str()
1374        .map_err(|e| PolarsError::ComputeError(format!("encode: {e}").into()))?;
1375    let cs = charset.to_lowercase();
1376    let out = StringChunked::from_iter_options(
1377        name.as_str().into(),
1378        ca.into_iter().map(|opt_s| {
1379            opt_s.map(|s| {
1380                let bytes: Vec<u8> = match cs.as_str() {
1381                    "utf-8" | "utf8" => s.as_bytes().to_vec(),
1382                    _ => s.as_bytes().to_vec(), // default UTF-8
1383                };
1384                hex::encode(bytes)
1385            })
1386        }),
1387    );
1388    Ok(Some(Column::new(name, out.into_series())))
1389}
1390
1391/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
1392pub fn apply_decode(column: Column, charset: &str) -> PolarsResult<Option<Column>> {
1393    let name = column.field().into_owned().name;
1394    let series = column.take_materialized_series();
1395    let ca = series
1396        .str()
1397        .map_err(|e| PolarsError::ComputeError(format!("decode: {e}").into()))?;
1398    let _ = charset;
1399    let out = StringChunked::from_iter_options(
1400        name.as_str().into(),
1401        ca.into_iter().map(|opt_s| {
1402            opt_s.and_then(|s| {
1403                let bytes = hex::decode(s.as_bytes()).ok()?;
1404                String::from_utf8(bytes).ok()
1405            })
1406        }),
1407    );
1408    Ok(Some(Column::new(name, out.into_series())))
1409}
1410
1411/// to_binary(expr, fmt): PySpark to_binary. fmt 'utf-8' => hex(utf8 bytes), 'hex' => validate and return hex. Returns hex string.
1412pub fn apply_to_binary(column: Column, fmt: &str) -> PolarsResult<Option<Column>> {
1413    let name = column.field().into_owned().name;
1414    let series = column.take_materialized_series();
1415    let ca = series
1416        .str()
1417        .map_err(|e| PolarsError::ComputeError(format!("to_binary: {e}").into()))?;
1418    let fmt_lower = fmt.to_lowercase();
1419    let out = StringChunked::from_iter_options(
1420        name.as_str().into(),
1421        ca.into_iter().map(|opt_s| {
1422            opt_s.and_then(|s| {
1423                let hex_str = match fmt_lower.as_str() {
1424                    "hex" => {
1425                        // Input is hex string; validate and return as-is (binary representation)
1426                        hex::decode(s.as_bytes()).ok()?;
1427                        Some(s.to_string())
1428                    }
1429                    "utf-8" | "utf8" => Some(hex::encode(s.as_bytes())),
1430                    _ => Some(hex::encode(s.as_bytes())),
1431                };
1432                hex_str
1433            })
1434        }),
1435    );
1436    Ok(Some(Column::new(name, out.into_series())))
1437}
1438
1439/// try_to_binary: like to_binary but returns null on failure.
1440pub fn apply_try_to_binary(column: Column, fmt: &str) -> PolarsResult<Option<Column>> {
1441    apply_to_binary(column, fmt)
1442}
1443
1444/// AES-GCM encrypt (PySpark aes_encrypt). Key: UTF-8 string, 16 or 32 bytes for AES-128/256. Output: hex(nonce||ciphertext).
1445fn aes_gcm_encrypt_one(plaintext: &[u8], key: &[u8]) -> Option<String> {
1446    use aes_gcm::Aes128Gcm;
1447    use aes_gcm::aead::generic_array::GenericArray;
1448    use aes_gcm::aead::{Aead, KeyInit};
1449    use rand::RngCore;
1450    let key_arr: [u8; 16] = key
1451        .iter()
1452        .copied()
1453        .chain(std::iter::repeat(0))
1454        .take(16)
1455        .collect::<Vec<_>>()
1456        .try_into()
1457        .ok()?;
1458    let cipher = Aes128Gcm::new(GenericArray::from_slice(&key_arr));
1459    let mut nonce = [0u8; 12];
1460    rand::thread_rng().fill_bytes(&mut nonce);
1461    let nonce = GenericArray::from_slice(&nonce);
1462    let ciphertext = cipher.encrypt(nonce, plaintext).ok()?;
1463    let mut out = nonce.as_slice().to_vec();
1464    out.extend(ciphertext);
1465    Some(hex::encode(out))
1466}
1467
1468/// AES-GCM decrypt (PySpark aes_decrypt). Input: hex(nonce||ciphertext).
1469fn aes_gcm_decrypt_one(hex_input: &str, key: &[u8]) -> Option<String> {
1470    use aes_gcm::Aes128Gcm;
1471    use aes_gcm::aead::generic_array::GenericArray;
1472    use aes_gcm::aead::{Aead, KeyInit};
1473    let bytes = hex::decode(hex_input.as_bytes()).ok()?;
1474    if bytes.len() < 12 + 16 {
1475        return None; // nonce + at least tag
1476    }
1477    let (nonce_bytes, ct) = bytes.split_at(12);
1478    let key_arr: [u8; 16] = key
1479        .iter()
1480        .copied()
1481        .chain(std::iter::repeat(0))
1482        .take(16)
1483        .collect::<Vec<_>>()
1484        .try_into()
1485        .ok()?;
1486    let cipher = Aes128Gcm::new(GenericArray::from_slice(&key_arr));
1487    let nonce = GenericArray::from_slice(nonce_bytes);
1488    let plaintext = cipher.decrypt(nonce, ct).ok()?;
1489    String::from_utf8(plaintext).ok()
1490}
1491
1492/// AES encrypt (PySpark aes_encrypt). Key as string; uses AES-128-GCM. Output hex.
1493pub fn apply_aes_encrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
1494    let name = column.field().into_owned().name;
1495    let series = column.take_materialized_series();
1496    let ca = series
1497        .str()
1498        .map_err(|e| PolarsError::ComputeError(format!("aes_encrypt: {e}").into()))?;
1499    let key_bytes = key.as_bytes();
1500    let out = StringChunked::from_iter_options(
1501        name.as_str().into(),
1502        ca.into_iter()
1503            .map(|opt_s| opt_s.and_then(|s| aes_gcm_encrypt_one(s.as_bytes(), key_bytes))),
1504    );
1505    Ok(Some(Column::new(name, out.into_series())))
1506}
1507
1508/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext). Returns null on failure.
1509pub fn apply_aes_decrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
1510    let name = column.field().into_owned().name;
1511    let series = column.take_materialized_series();
1512    let ca = series
1513        .str()
1514        .map_err(|e| PolarsError::ComputeError(format!("aes_decrypt: {e}").into()))?;
1515    let key_bytes = key.as_bytes();
1516    let out = StringChunked::from_iter_options(
1517        name.as_str().into(),
1518        ca.into_iter()
1519            .map(|opt_s| opt_s.and_then(|s| aes_gcm_decrypt_one(s, key_bytes))),
1520    );
1521    Ok(Some(Column::new(name, out.into_series())))
1522}
1523
1524/// try_aes_decrypt: same as aes_decrypt, returns null on failure (PySpark try_aes_decrypt).
1525pub fn apply_try_aes_decrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
1526    apply_aes_decrypt(column, key)
1527}
1528
1529/// Int column to single-character string (PySpark char / chr). Valid codepoint only.
1530pub fn apply_char(column: Column) -> PolarsResult<Option<Column>> {
1531    let name = column.field().into_owned().name;
1532    let series = column.take_materialized_series();
1533    let to_char = |v: i64| -> String {
1534        let u = v as u32;
1535        if u <= 0x10FFFF {
1536            char::from_u32(u).map(|c| c.to_string()).unwrap_or_default()
1537        } else {
1538            String::new()
1539        }
1540    };
1541    let out: StringChunked = match series.dtype() {
1542        DataType::Int32 => {
1543            let ca = series
1544                .i32()
1545                .map_err(|e| PolarsError::ComputeError(format!("char: {e}").into()))?;
1546            StringChunked::from_iter_options(
1547                name.as_str().into(),
1548                ca.into_iter().map(|opt_v| opt_v.map(|v| to_char(v as i64))),
1549            )
1550        }
1551        DataType::Int64 => {
1552            let ca = series
1553                .i64()
1554                .map_err(|e| PolarsError::ComputeError(format!("char: {e}").into()))?;
1555            StringChunked::from_iter_options(
1556                name.as_str().into(),
1557                ca.into_iter().map(|opt_v| opt_v.map(to_char)),
1558            )
1559        }
1560        _ => {
1561            let i64_series = series
1562                .cast(&DataType::Int64)
1563                .map_err(|e| PolarsError::ComputeError(format!("char cast: {e}").into()))?;
1564            let ca = i64_series
1565                .i64()
1566                .map_err(|e| PolarsError::ComputeError(format!("char: {e}").into()))?;
1567            StringChunked::from_iter_options(
1568                name.as_str().into(),
1569                ca.into_iter().map(|opt_v| opt_v.map(to_char)),
1570            )
1571        }
1572    };
1573    Ok(Some(Column::new(name, out.into_series())))
1574}
1575
1576// --- Datetime: add_months, months_between, next_day ---
1577
1578fn date_series_to_days(series: &Series) -> PolarsResult<Int32Chunked> {
1579    let casted = series.cast(&DataType::Date)?;
1580    let days_series = casted.cast(&DataType::Int32)?;
1581    days_series
1582        .i32()
1583        .map_err(|e| PolarsError::ComputeError(format!("date_series_to_days: {e}").into()))
1584        .cloned()
1585}
1586
1587fn days_to_naive_date(days: i32) -> Option<chrono::NaiveDate> {
1588    let base = robin_sparkless_core::date_utils::epoch_naive_date();
1589    base.checked_add_signed(chrono::TimeDelta::days(days as i64))
1590}
1591
1592fn naivedate_to_days(d: chrono::NaiveDate) -> i32 {
1593    let base = robin_sparkless_core::date_utils::epoch_naive_date();
1594    (d.signed_duration_since(base).num_days()) as i32
1595}
1596
1597/// add_months(date_column, n) - add n months to each date.
1598pub fn apply_add_months(column: Column, n: i32) -> PolarsResult<Option<Column>> {
1599    use chrono::Months;
1600    let name = column.field().into_owned().name;
1601    let series = column.take_materialized_series();
1602    let ca = date_series_to_days(&series)?;
1603    let u = n.unsigned_abs();
1604    let out = ca.into_iter().map(|opt_d| {
1605        opt_d.and_then(|days| {
1606            let d = days_to_naive_date(days)?;
1607            let next = if n >= 0 {
1608                d.checked_add_months(Months::new(u))?
1609            } else {
1610                d.checked_sub_months(Months::new(u))?
1611            };
1612            Some(naivedate_to_days(next))
1613        })
1614    });
1615    let out = Int32Chunked::from_iter_options(name.as_str().into(), out);
1616    let out_series = out.into_series().cast(&DataType::Date)?;
1617    Ok(Some(Column::new(name, out_series)))
1618}
1619
1620/// next_day(date_column, "Mon") - next occurrence of weekday (Sun=1..Sat=7; "Mon","Tue",...).
1621fn parse_day_of_week(s: &str) -> Option<u32> {
1622    let s = s.trim().to_lowercase();
1623    match s.as_str() {
1624        "sun" | "sunday" => Some(1),
1625        "mon" | "monday" => Some(2),
1626        "tue" | "tuesday" => Some(3),
1627        "wed" | "wednesday" => Some(4),
1628        "thu" | "thursday" => Some(5),
1629        "fri" | "friday" => Some(6),
1630        "sat" | "saturday" => Some(7),
1631        _ => None,
1632    }
1633}
1634
1635/// PySpark next_day: 1=Sunday, 2=Monday, ... 7=Saturday. chrono Weekday: Mon=0, ..., Sun=6.
1636fn chrono_weekday_to_dayofweek(w: chrono::Weekday) -> u32 {
1637    match w {
1638        chrono::Weekday::Mon => 2,
1639        chrono::Weekday::Tue => 3,
1640        chrono::Weekday::Wed => 4,
1641        chrono::Weekday::Thu => 5,
1642        chrono::Weekday::Fri => 6,
1643        chrono::Weekday::Sat => 7,
1644        chrono::Weekday::Sun => 1,
1645    }
1646}
1647
1648pub fn apply_next_day(column: Column, day_of_week: &str) -> PolarsResult<Option<Column>> {
1649    let target = parse_day_of_week(day_of_week).ok_or_else(|| {
1650        PolarsError::ComputeError(format!("next_day: invalid day '{day_of_week}'").into())
1651    })?;
1652    let name = column.field().into_owned().name;
1653    let series = column.take_materialized_series();
1654    let ca = date_series_to_days(&series)?;
1655    let out = ca.into_iter().map(|opt_d| {
1656        opt_d.and_then(|days| {
1657            let d = days_to_naive_date(days)?;
1658            let current = chrono_weekday_to_dayofweek(d.weekday());
1659            let diff = if target >= current {
1660                (target - current) as i64
1661            } else {
1662                (7 - (current - target)) as i64
1663            };
1664            let days_to_add = if diff == 0 { 7 } else { diff }; // same day -> next week
1665            let next = d.checked_add_signed(chrono::TimeDelta::days(days_to_add))?;
1666            Some(naivedate_to_days(next))
1667        })
1668    });
1669    let out = Int32Chunked::from_iter_options(name.as_str().into(), out);
1670    let out_series = out.into_series().cast(&DataType::Date)?;
1671    Ok(Some(Column::new(name, out_series)))
1672}
1673
1674/// dayname(date_col) - weekday name "Mon","Tue",... (PySpark dayname).
1675pub fn apply_dayname(column: Column) -> PolarsResult<Option<Column>> {
1676    let name = column.field().into_owned().name;
1677    let series = column.take_materialized_series();
1678    let ca = date_series_to_days(&series)?;
1679    let out = StringChunked::from_iter_options(
1680        name.as_str().into(),
1681        ca.into_iter().map(|opt_d| {
1682            opt_d.and_then(|days| {
1683                let d = days_to_naive_date(days)?;
1684                Some(d.weekday().to_string())
1685            })
1686        }),
1687    );
1688    Ok(Some(Column::new(name, out.into_series())))
1689}
1690
1691/// weekday(date_col) - 0=Mon, 6=Sun (PySpark weekday).
1692pub fn apply_weekday(column: Column) -> PolarsResult<Option<Column>> {
1693    let name = column.field().into_owned().name;
1694    let series = column.take_materialized_series();
1695    let ca = date_series_to_days(&series)?;
1696    let out = Int32Chunked::from_iter_options(
1697        name.as_str().into(),
1698        ca.into_iter().map(|opt_d| {
1699            opt_d.and_then(|days| {
1700                let d = days_to_naive_date(days)?;
1701                Some(d.weekday().num_days_from_monday() as i32)
1702            })
1703        }),
1704    );
1705    Ok(Some(Column::new(name, out.into_series())))
1706}
1707
1708/// months_between(end, start, round_off) - returns fractional number of months.
1709/// When round_off is true, rounds to 8 decimal places (PySpark default).
1710pub fn apply_months_between(
1711    columns: &mut [Column],
1712    round_off: bool,
1713) -> PolarsResult<Option<Column>> {
1714    if columns.len() < 2 {
1715        return Err(PolarsError::ComputeError(
1716            "months_between needs two columns (end, start)".into(),
1717        ));
1718    }
1719    let name = columns[0].field().into_owned().name;
1720    let end_series = std::mem::take(&mut columns[0]).take_materialized_series();
1721    let start_series = std::mem::take(&mut columns[1]).take_materialized_series();
1722    let end_ca = date_series_to_days(&end_series)?;
1723    let start_ca = date_series_to_days(&start_series)?;
1724    // PySpark: fractional months using 31 days per month; roundOff rounds to 8 decimal places.
1725    let out = end_ca
1726        .into_iter()
1727        .zip(&start_ca)
1728        .map(|(oe, os)| match (oe, os) {
1729            (Some(e), Some(s)) => {
1730                let days = (e - s) as f64;
1731                let months = days / 31.0;
1732                Some(if round_off {
1733                    (months * 1e8).round() / 1e8
1734                } else {
1735                    months
1736                })
1737            }
1738            _ => None,
1739        });
1740    let out = Float64Chunked::from_iter_options(name.as_str().into(), out);
1741    Ok(Some(Column::new(name, out.into_series())))
1742}
1743
1744// --- Math (trig, degrees, radians, signum) ---
1745
1746fn float_series_to_f64(series: &Series) -> PolarsResult<Float64Chunked> {
1747    if series.dtype() == &DataType::String {
1748        // PySpark parity (#272): strip whitespace then parse (cast does not strip).
1749        let ca = series
1750            .str()
1751            .map_err(|e| PolarsError::ComputeError(format!("float_series_to_f64: {e}").into()))?;
1752        let name = series.name().as_str().into();
1753        let results: Vec<Option<f64>> = ca
1754            .into_iter()
1755            .map(|opt_s| opt_s.and_then(parse_str_to_double))
1756            .collect();
1757        return Ok(Float64Chunked::from_iter_options(name, results.into_iter()));
1758    }
1759    let casted = series.cast(&DataType::Float64)?;
1760    casted
1761        .f64()
1762        .map_err(|e| PolarsError::ComputeError(format!("float_series_to_f64: {e}").into()))
1763        .cloned()
1764}
1765
1766/// Apply sin (radians) to a float column.
1767pub fn apply_sin(column: Column) -> PolarsResult<Option<Column>> {
1768    let name = column.field().into_owned().name;
1769    let series = column.take_materialized_series();
1770    let ca = float_series_to_f64(&series)?;
1771    let out = ca.apply_values(f64::sin).into_series();
1772    Ok(Some(Column::new(name, out)))
1773}
1774
1775/// Apply cos (radians) to a float column.
1776pub fn apply_cos(column: Column) -> PolarsResult<Option<Column>> {
1777    let name = column.field().into_owned().name;
1778    let series = column.take_materialized_series();
1779    let ca = float_series_to_f64(&series)?;
1780    let out = ca.apply_values(f64::cos).into_series();
1781    Ok(Some(Column::new(name, out)))
1782}
1783
1784/// Banker's rounding: round half to even (PySpark bround).
1785fn bround_one(x: f64, scale: i32) -> f64 {
1786    if x.is_nan() || x.is_infinite() {
1787        return x;
1788    }
1789    let factor = 10_f64.powi(scale);
1790    let scaled = x * factor;
1791    let rounded =
1792        if scaled.fract().abs() > 0.5_f64 - 1e-10 && scaled.fract().abs() < 0.5_f64 + 1e-10 {
1793            // Half case: round to nearest even
1794            let floor_val = scaled.trunc();
1795            if floor_val as i64 % 2 == 0 {
1796                floor_val
1797            } else {
1798                floor_val + scaled.signum()
1799            }
1800        } else {
1801            scaled.round()
1802        };
1803    rounded / factor
1804}
1805
1806/// Convert number string from one base to another (PySpark conv).
1807fn conv_one(s: &str, from_base: i32, to_base: i32) -> Option<String> {
1808    if !(2..=36).contains(&from_base) || !(2..=36).contains(&to_base) {
1809        return None;
1810    }
1811    let s_trim = s.trim();
1812    if s_trim.is_empty() {
1813        return None;
1814    }
1815    let n = i64::from_str_radix(s_trim, from_base as u32).ok()?;
1816    let to_b = to_base as u32;
1817    const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz";
1818    if n == 0 {
1819        return Some("0".to_string());
1820    }
1821    let mut result = String::new();
1822    let mut val = if n < 0 {
1823        result.push('-');
1824        n.unsigned_abs()
1825    } else {
1826        n as u64
1827    };
1828    let mut buf = String::new();
1829    while val > 0 {
1830        buf.push(DIGITS[(val % to_b as u64) as usize] as char);
1831        val /= to_b as u64;
1832    }
1833    result.push_str(&buf.chars().rev().collect::<String>());
1834    Some(result)
1835}
1836
1837/// Apply conv (base conversion). String: parse from from_base, format in to_base. Int: format value in to_base.
1838pub fn apply_conv(column: Column, from_base: i32, to_base: i32) -> PolarsResult<Option<Column>> {
1839    use std::borrow::Cow;
1840    let name = column.field().into_owned().name;
1841    let series = column.take_materialized_series();
1842    let out = if series.dtype() == &DataType::String {
1843        let ca = series
1844            .str()
1845            .map_err(|e| PolarsError::ComputeError(format!("conv: {e}").into()))?;
1846        ca.apply(|opt_s| opt_s.and_then(|s| conv_one(s, from_base, to_base).map(Cow::Owned)))
1847            .into_series()
1848    } else if series.dtype() == &DataType::Int64 {
1849        let ca = series
1850            .i64()
1851            .map_err(|e| PolarsError::ComputeError(format!("conv: {e}").into()))?;
1852        let to_b = to_base as u32;
1853        const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz";
1854        let format_int = |n: i64| -> Option<String> {
1855            if !(2..=36).contains(&to_b) {
1856                return None;
1857            }
1858            if n == 0 {
1859                return Some("0".to_string());
1860            }
1861            let mut result = String::new();
1862            let mut val = if n < 0 {
1863                result.push('-');
1864                n.unsigned_abs()
1865            } else {
1866                n as u64
1867            };
1868            let mut buf = String::new();
1869            while val > 0 {
1870                buf.push(DIGITS[(val % to_b as u64) as usize] as char);
1871                val /= to_b as u64;
1872            }
1873            result.push_str(&buf.chars().rev().collect::<String>());
1874            Some(result)
1875        };
1876        let out_ca = StringChunked::from_iter_options(
1877            name.as_str().into(),
1878            ca.into_iter().map(|opt| opt.and_then(format_int)),
1879        );
1880        out_ca.into_series()
1881    } else {
1882        let s_str = series.cast(&DataType::String)?;
1883        let ca = s_str
1884            .str()
1885            .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1886        ca.apply(|opt_s| opt_s.and_then(|s| conv_one(s, from_base, to_base).map(Cow::Owned)))
1887            .into_series()
1888    };
1889    Ok(Some(Column::new(name, out)))
1890}
1891
1892/// Apply hex: integer or string to hex string (PySpark hex).
1893pub fn apply_hex(column: Column) -> PolarsResult<Option<Column>> {
1894    let name = column.field().into_owned().name;
1895    let series = column.take_materialized_series();
1896    let out = if series.dtype() == &DataType::Int64 || series.dtype() == &DataType::Int32 {
1897        let s = series.cast(&DataType::Int64)?;
1898        let ca = s
1899            .i64()
1900            .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1901        let out_ca = StringChunked::from_iter_options(
1902            name.as_str().into(),
1903            ca.into_iter().map(|opt| opt.map(|n| format!("{n:X}"))),
1904        );
1905        out_ca.into_series()
1906    } else if series.dtype() == &DataType::String {
1907        let ca = series
1908            .str()
1909            .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1910        let out_ca = StringChunked::from_iter_options(
1911            name.as_str().into(),
1912            ca.into_iter().map(|opt| {
1913                opt.map(|s| {
1914                    s.as_bytes()
1915                        .iter()
1916                        .map(|b| format!("{b:02X}"))
1917                        .collect::<String>()
1918                })
1919            }),
1920        );
1921        out_ca.into_series()
1922    } else {
1923        let s = series.cast(&DataType::Int64)?;
1924        let ca = s
1925            .i64()
1926            .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1927        let out_ca = StringChunked::from_iter_options(
1928            name.as_str().into(),
1929            ca.into_iter().map(|opt| opt.map(|n| format!("{n:X}"))),
1930        );
1931        out_ca.into_series()
1932    };
1933    Ok(Some(Column::new(name, out)))
1934}
1935
1936/// Apply unhex: hex string to binary/string (PySpark unhex).
1937pub fn apply_unhex(column: Column) -> PolarsResult<Option<Column>> {
1938    use std::borrow::Cow;
1939    let name = column.field().into_owned().name;
1940    let series = column.take_materialized_series();
1941    let ca = series
1942        .str()
1943        .map_err(|e| PolarsError::ComputeError(format!("unhex: {e}").into()))?;
1944    let unhex_one = |s: &str| -> Option<Vec<u8>> {
1945        let s = s.trim();
1946        let chars: Vec<char> = if s.len() % 2 == 1 {
1947            s.chars().skip(1).collect()
1948        } else {
1949            s.chars().collect()
1950        };
1951        let mut bytes = Vec::with_capacity(chars.len() / 2);
1952        for chunk in chars.chunks(2) {
1953            let hex_pair: String = chunk.iter().collect();
1954            let byte = u8::from_str_radix(&hex_pair, 16).ok()?;
1955            bytes.push(byte);
1956        }
1957        Some(bytes)
1958    };
1959    let out = ca
1960        .apply(|opt_s| {
1961            opt_s.and_then(|s| {
1962                unhex_one(s)
1963                    .and_then(|b| String::from_utf8(b).ok())
1964                    .map(Cow::Owned)
1965            })
1966        })
1967        .into_series();
1968    Ok(Some(Column::new(name, out)))
1969}
1970
1971/// Apply bin: integer to binary string (PySpark bin).
1972pub fn apply_bin(column: Column) -> PolarsResult<Option<Column>> {
1973    let name = column.field().into_owned().name;
1974    let series = column.take_materialized_series();
1975    let s = series.cast(&DataType::Int64)?;
1976    let ca = s
1977        .i64()
1978        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1979    let out_ca = StringChunked::from_iter_options(
1980        name.as_str().into(),
1981        ca.into_iter().map(|opt| opt.map(|n| format!("{n:b}"))),
1982    );
1983    Ok(Some(Column::new(name, out_ca.into_series())))
1984}
1985
1986/// Apply getbit: get bit at 0-based position (PySpark getbit).
1987pub fn apply_getbit(column: Column, pos: i64) -> PolarsResult<Option<Column>> {
1988    let name = column.field().into_owned().name;
1989    let series = column.take_materialized_series();
1990    let s = series.cast(&DataType::Int64)?;
1991    let ca = s
1992        .i64()
1993        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1994    let pos = pos.max(0);
1995    let out_ca = Int64Chunked::from_iter_options(
1996        name.as_str().into(),
1997        ca.into_iter().map(|opt| opt.map(|n| (n >> pos) & 1)),
1998    );
1999    Ok(Some(Column::new(name, out_ca.into_series())))
2000}
2001
2002/// Apply bit_count: count set bits in integer (PySpark bit_count).
2003pub fn apply_bit_count(column: Column) -> PolarsResult<Option<Column>> {
2004    let name = column.field().into_owned().name;
2005    let series = column.take_materialized_series();
2006    let s = series.cast(&DataType::Int64)?;
2007    let ca = s
2008        .i64()
2009        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2010    let out_ca = Int64Chunked::from_iter_options(
2011        name.as_str().into(),
2012        ca.into_iter()
2013            .map(|opt| opt.map(|n| i64::from(n.count_ones()))),
2014    );
2015    Ok(Some(Column::new(name, out_ca.into_series())))
2016}
2017
2018/// Assert that all boolean values are true (PySpark assert_true).
2019/// PySpark: returns null when input is true; throws when input is false or null.
2020/// When err_msg is Some, it is used in the error message when assertion fails.
2021pub fn apply_assert_true(column: Column, err_msg: Option<&str>) -> PolarsResult<Option<Column>> {
2022    let name = column.field().into_owned().name;
2023    let series = column.take_materialized_series();
2024    let ca = series
2025        .bool()
2026        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2027    let len = ca.len();
2028    // PySpark: fail on false or null
2029    let failed = ca.into_iter().any(|opt| match opt {
2030        Some(true) => false,
2031        Some(false) | None => true,
2032    });
2033    if failed {
2034        let msg = err_msg
2035            .map(String::from)
2036            .unwrap_or_else(|| format!("assert_true failed on column '{name}'"));
2037        return Err(PolarsError::ComputeError(msg.into()));
2038    }
2039    // PySpark: return null on success
2040    let null_col = BooleanChunked::from_iter_options(name.as_str().into(), (0..len).map(|_| None));
2041    Ok(Some(Column::new(name, null_col.into_series())))
2042}
2043
2044/// Apply rand: uniform [0, 1) per row, with optional seed (PySpark rand).
2045pub fn apply_rand_with_seed(column: Column, seed: Option<u64>) -> PolarsResult<Option<Column>> {
2046    use rand::Rng;
2047    use rand::SeedableRng;
2048    let name = column.field().into_owned().name;
2049    let series = column.take_materialized_series();
2050    let n = series.len();
2051    let values: Vec<f64> = if let Some(s) = seed {
2052        let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2053        (0..n).map(|_| rng.r#gen::<f64>()).collect()
2054    } else {
2055        let mut rng = rand::thread_rng();
2056        (0..n).map(|_| rng.r#gen::<f64>()).collect()
2057    };
2058    let out = Float64Chunked::from_vec(name.as_str().into(), values);
2059    Ok(Some(Column::new(name, out.into_series())))
2060}
2061
2062/// Apply randn: standard normal per row, with optional seed (PySpark randn).
2063pub fn apply_randn_with_seed(column: Column, seed: Option<u64>) -> PolarsResult<Option<Column>> {
2064    use rand::SeedableRng;
2065    use rand_distr::Distribution;
2066    let name = column.field().into_owned().name;
2067    let series = column.take_materialized_series();
2068    let n = series.len();
2069    let dist = rand_distr::StandardNormal;
2070    let values: Vec<f64> = if let Some(s) = seed {
2071        let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2072        (0..n).map(|_| dist.sample(&mut rng)).collect()
2073    } else {
2074        let mut rng = rand::thread_rng();
2075        (0..n).map(|_| dist.sample(&mut rng)).collect()
2076    };
2077    let out = Float64Chunked::from_vec(name.as_str().into(), values);
2078    Ok(Some(Column::new(name, out.into_series())))
2079}
2080
2081/// Build a Series of `n` uniform [0, 1) values with optional seed (for with_column PySpark-like rand).
2082pub fn series_rand_n(name: &str, n: usize, seed: Option<u64>) -> Series {
2083    use rand::Rng;
2084    use rand::SeedableRng;
2085    let values: Vec<f64> = if let Some(s) = seed {
2086        let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2087        (0..n).map(|_| rng.r#gen::<f64>()).collect()
2088    } else {
2089        let mut rng = rand::thread_rng();
2090        (0..n).map(|_| rng.r#gen::<f64>()).collect()
2091    };
2092    Float64Chunked::from_vec(name.into(), values).into_series()
2093}
2094
2095/// Build a Series of `n` standard normal values with optional seed (for with_column PySpark-like randn).
2096pub fn series_randn_n(name: &str, n: usize, seed: Option<u64>) -> Series {
2097    use rand::SeedableRng;
2098    use rand_distr::Distribution;
2099    let dist = rand_distr::StandardNormal;
2100    let values: Vec<f64> = if let Some(s) = seed {
2101        let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2102        (0..n).map(|_| dist.sample(&mut rng)).collect()
2103    } else {
2104        let mut rng = rand::thread_rng();
2105        (0..n).map(|_| dist.sample(&mut rng)).collect()
2106    };
2107    Float64Chunked::from_vec(name.into(), values).into_series()
2108}
2109
2110/// Apply bitwise AND for two integer columns (PySpark bit_and).
2111pub fn apply_bit_and(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2112    if columns.len() < 2 {
2113        return Err(PolarsError::ComputeError(
2114            "bit_and needs two columns".into(),
2115        ));
2116    }
2117    let name = columns[0].field().into_owned().name;
2118    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2119    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2120    let a_cast = a_series.cast(&DataType::Int64)?;
2121    let b_cast = b_series.cast(&DataType::Int64)?;
2122    let a = a_cast
2123        .i64()
2124        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2125    let b = b_cast
2126        .i64()
2127        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2128    let out = Int64Chunked::from_iter_options(
2129        name.as_str().into(),
2130        a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
2131            (Some(x), Some(y)) => Some(x & y),
2132            _ => None,
2133        }),
2134    );
2135    Ok(Some(Column::new(name, out.into_series())))
2136}
2137
2138/// Apply bitwise OR for two integer columns (PySpark bit_or).
2139pub fn apply_bit_or(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2140    if columns.len() < 2 {
2141        return Err(PolarsError::ComputeError("bit_or needs two columns".into()));
2142    }
2143    let name = columns[0].field().into_owned().name;
2144    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2145    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2146    let a_cast = a_series.cast(&DataType::Int64)?;
2147    let b_cast = b_series.cast(&DataType::Int64)?;
2148    let a = a_cast
2149        .i64()
2150        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2151    let b = b_cast
2152        .i64()
2153        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2154    let out = Int64Chunked::from_iter_options(
2155        name.as_str().into(),
2156        a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
2157            (Some(x), Some(y)) => Some(x | y),
2158            _ => None,
2159        }),
2160    );
2161    Ok(Some(Column::new(name, out.into_series())))
2162}
2163
2164/// Apply bitwise XOR for two integer columns (PySpark bit_xor).
2165pub fn apply_bit_xor(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2166    if columns.len() < 2 {
2167        return Err(PolarsError::ComputeError(
2168            "bit_xor needs two columns".into(),
2169        ));
2170    }
2171    let name = columns[0].field().into_owned().name;
2172    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2173    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2174    let a_cast = a_series.cast(&DataType::Int64)?;
2175    let b_cast = b_series.cast(&DataType::Int64)?;
2176    let a = a_cast
2177        .i64()
2178        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2179    let b = b_cast
2180        .i64()
2181        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2182    let out = Int64Chunked::from_iter_options(
2183        name.as_str().into(),
2184        a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
2185            (Some(x), Some(y)) => Some(x ^ y),
2186            _ => None,
2187        }),
2188    );
2189    Ok(Some(Column::new(name, out.into_series())))
2190}
2191
2192/// Apply round to given decimal places. Supports numeric and string columns (PySpark parity:
2193/// string columns containing numeric values are implicitly cast to double then rounded).
2194pub fn apply_round(column: Column, decimals: u32) -> PolarsResult<Option<Column>> {
2195    let name = column.field().into_owned().name;
2196    let series = column.take_materialized_series();
2197    let ca = float_series_to_f64(&series)?;
2198    let scale = decimals as i32;
2199    let factor = 10_f64.powi(scale);
2200    let out = ca
2201        .apply_values(|x| (x * factor).round() / factor)
2202        .into_series();
2203    Ok(Some(Column::new(name, out)))
2204}
2205
2206/// Apply bround (banker's rounding) to a float column.
2207pub fn apply_bround(column: Column, scale: i32) -> PolarsResult<Option<Column>> {
2208    let name = column.field().into_owned().name;
2209    let series = column.take_materialized_series();
2210    let ca = float_series_to_f64(&series)?;
2211    let out = ca.apply_values(|x| bround_one(x, scale)).into_series();
2212    Ok(Some(Column::new(name, out)))
2213}
2214
2215/// Apply cot (1/tan) to a float column.
2216pub fn apply_cot(column: Column) -> PolarsResult<Option<Column>> {
2217    let name = column.field().into_owned().name;
2218    let series = column.take_materialized_series();
2219    let ca = float_series_to_f64(&series)?;
2220    let out = ca.apply_values(|x| 1.0 / x.tan()).into_series();
2221    Ok(Some(Column::new(name, out)))
2222}
2223
2224/// Apply csc (1/sin) to a float column.
2225pub fn apply_csc(column: Column) -> PolarsResult<Option<Column>> {
2226    let name = column.field().into_owned().name;
2227    let series = column.take_materialized_series();
2228    let ca = float_series_to_f64(&series)?;
2229    let out = ca.apply_values(|x| 1.0 / x.sin()).into_series();
2230    Ok(Some(Column::new(name, out)))
2231}
2232
2233/// Apply sec (1/cos) to a float column.
2234pub fn apply_sec(column: Column) -> PolarsResult<Option<Column>> {
2235    let name = column.field().into_owned().name;
2236    let series = column.take_materialized_series();
2237    let ca = float_series_to_f64(&series)?;
2238    let out = ca.apply_values(|x| 1.0 / x.cos()).into_series();
2239    Ok(Some(Column::new(name, out)))
2240}
2241
2242/// Apply tan (radians) to a float column.
2243pub fn apply_tan(column: Column) -> PolarsResult<Option<Column>> {
2244    let name = column.field().into_owned().name;
2245    let series = column.take_materialized_series();
2246    let ca = float_series_to_f64(&series)?;
2247    let out = ca.apply_values(f64::tan).into_series();
2248    Ok(Some(Column::new(name, out)))
2249}
2250
2251/// Apply asin to a float column.
2252pub fn apply_asin(column: Column) -> PolarsResult<Option<Column>> {
2253    let name = column.field().into_owned().name;
2254    let series = column.take_materialized_series();
2255    let ca = float_series_to_f64(&series)?;
2256    let out = ca.apply_values(f64::asin).into_series();
2257    Ok(Some(Column::new(name, out)))
2258}
2259
2260/// Apply acos to a float column.
2261pub fn apply_acos(column: Column) -> PolarsResult<Option<Column>> {
2262    let name = column.field().into_owned().name;
2263    let series = column.take_materialized_series();
2264    let ca = float_series_to_f64(&series)?;
2265    let out = ca.apply_values(f64::acos).into_series();
2266    Ok(Some(Column::new(name, out)))
2267}
2268
2269/// Apply atan to a float column.
2270pub fn apply_atan(column: Column) -> PolarsResult<Option<Column>> {
2271    let name = column.field().into_owned().name;
2272    let series = column.take_materialized_series();
2273    let ca = float_series_to_f64(&series)?;
2274    let out = ca.apply_values(f64::atan).into_series();
2275    Ok(Some(Column::new(name, out)))
2276}
2277
2278/// Apply atan2(y, x) to two float columns.
2279pub fn apply_atan2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2280    if columns.len() < 2 {
2281        return Err(PolarsError::ComputeError(
2282            "atan2 needs two columns (y, x)".into(),
2283        ));
2284    }
2285    let name = columns[0].field().into_owned().name;
2286    let y_series = std::mem::take(&mut columns[0]).take_materialized_series();
2287    let x_series = std::mem::take(&mut columns[1]).take_materialized_series();
2288    let y_ca = float_series_to_f64(&y_series)?;
2289    let x_ca = float_series_to_f64(&x_series)?;
2290    let out = y_ca.into_iter().zip(&x_ca).map(|(oy, ox)| match (oy, ox) {
2291        (Some(y), Some(x)) => Some(f64::atan2(y, x)),
2292        _ => None,
2293    });
2294    let out = Float64Chunked::from_iter_options(name.as_str().into(), out);
2295    Ok(Some(Column::new(name, out.into_series())))
2296}
2297
2298/// Apply degrees (radians -> degrees) to a float column.
2299pub fn apply_degrees(column: Column) -> PolarsResult<Option<Column>> {
2300    let name = column.field().into_owned().name;
2301    let series = column.take_materialized_series();
2302    let ca = float_series_to_f64(&series)?;
2303    let out = ca.apply_values(|r| r.to_degrees()).into_series();
2304    Ok(Some(Column::new(name, out)))
2305}
2306
2307/// Apply radians (degrees -> radians) to a float column.
2308pub fn apply_radians(column: Column) -> PolarsResult<Option<Column>> {
2309    let name = column.field().into_owned().name;
2310    let series = column.take_materialized_series();
2311    let ca = float_series_to_f64(&series)?;
2312    let out = ca.apply_values(|d| d.to_radians()).into_series();
2313    Ok(Some(Column::new(name, out)))
2314}
2315
2316/// Apply signum (-1, 0, or 1) to a numeric column.
2317pub fn apply_signum(column: Column) -> PolarsResult<Option<Column>> {
2318    let name = column.field().into_owned().name;
2319    let series = column.take_materialized_series();
2320    let ca = float_series_to_f64(&series)?;
2321    let out = ca
2322        .apply_values(|v| {
2323            if v > 0.0 {
2324                1.0
2325            } else if v < 0.0 {
2326                -1.0
2327            } else {
2328                0.0
2329            }
2330        })
2331        .into_series();
2332    Ok(Some(Column::new(name, out)))
2333}
2334
2335/// Hyperbolic and inverse hyperbolic / extra math.
2336pub fn apply_cosh(column: Column) -> PolarsResult<Option<Column>> {
2337    let name = column.field().into_owned().name;
2338    let series = column.take_materialized_series();
2339    let ca = float_series_to_f64(&series)?;
2340    let out = ca.apply_values(f64::cosh).into_series();
2341    Ok(Some(Column::new(name, out)))
2342}
2343pub fn apply_sinh(column: Column) -> PolarsResult<Option<Column>> {
2344    let name = column.field().into_owned().name;
2345    let series = column.take_materialized_series();
2346    let ca = float_series_to_f64(&series)?;
2347    let out = ca.apply_values(f64::sinh).into_series();
2348    Ok(Some(Column::new(name, out)))
2349}
2350pub fn apply_tanh(column: Column) -> PolarsResult<Option<Column>> {
2351    let name = column.field().into_owned().name;
2352    let series = column.take_materialized_series();
2353    let ca = float_series_to_f64(&series)?;
2354    let out = ca.apply_values(f64::tanh).into_series();
2355    Ok(Some(Column::new(name, out)))
2356}
2357pub fn apply_acosh(column: Column) -> PolarsResult<Option<Column>> {
2358    let name = column.field().into_owned().name;
2359    let series = column.take_materialized_series();
2360    let ca = float_series_to_f64(&series)?;
2361    let out = ca.apply_values(f64::acosh).into_series();
2362    Ok(Some(Column::new(name, out)))
2363}
2364pub fn apply_asinh(column: Column) -> PolarsResult<Option<Column>> {
2365    let name = column.field().into_owned().name;
2366    let series = column.take_materialized_series();
2367    let ca = float_series_to_f64(&series)?;
2368    let out = ca.apply_values(f64::asinh).into_series();
2369    Ok(Some(Column::new(name, out)))
2370}
2371pub fn apply_atanh(column: Column) -> PolarsResult<Option<Column>> {
2372    let name = column.field().into_owned().name;
2373    let series = column.take_materialized_series();
2374    let ca = float_series_to_f64(&series)?;
2375    let out = ca.apply_values(f64::atanh).into_series();
2376    Ok(Some(Column::new(name, out)))
2377}
2378pub fn apply_cbrt(column: Column) -> PolarsResult<Option<Column>> {
2379    let name = column.field().into_owned().name;
2380    let series = column.take_materialized_series();
2381    let ca = float_series_to_f64(&series)?;
2382    let out = ca.apply_values(f64::cbrt).into_series();
2383    Ok(Some(Column::new(name, out)))
2384}
2385pub fn apply_expm1(column: Column) -> PolarsResult<Option<Column>> {
2386    let name = column.field().into_owned().name;
2387    let series = column.take_materialized_series();
2388    let ca = float_series_to_f64(&series)?;
2389    let out = ca.apply_values(f64::exp_m1).into_series();
2390    Ok(Some(Column::new(name, out)))
2391}
2392pub fn apply_log1p(column: Column) -> PolarsResult<Option<Column>> {
2393    let name = column.field().into_owned().name;
2394    let series = column.take_materialized_series();
2395    let ca = float_series_to_f64(&series)?;
2396    let out = ca.apply_values(f64::ln_1p).into_series();
2397    Ok(Some(Column::new(name, out)))
2398}
2399pub fn apply_log10(column: Column) -> PolarsResult<Option<Column>> {
2400    let name = column.field().into_owned().name;
2401    let series = column.take_materialized_series();
2402    let ca = float_series_to_f64(&series)?;
2403    let out = ca.apply_values(f64::log10).into_series();
2404    Ok(Some(Column::new(name, out)))
2405}
2406pub fn apply_log2(column: Column) -> PolarsResult<Option<Column>> {
2407    let name = column.field().into_owned().name;
2408    let series = column.take_materialized_series();
2409    let ca = float_series_to_f64(&series)?;
2410    let out = ca.apply_values(f64::log2).into_series();
2411    Ok(Some(Column::new(name, out)))
2412}
2413pub fn apply_rint(column: Column) -> PolarsResult<Option<Column>> {
2414    let name = column.field().into_owned().name;
2415    let series = column.take_materialized_series();
2416    let ca = float_series_to_f64(&series)?;
2417    let out = ca.apply_values(f64::round).into_series();
2418    Ok(Some(Column::new(name, out)))
2419}
2420
2421/// Element-wise max of two columns (for greatest). Supports Float64, Int64, String.
2422pub fn apply_greatest2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2423    if columns.len() < 2 {
2424        return Err(PolarsError::ComputeError(
2425            "greatest2 needs two columns".into(),
2426        ));
2427    }
2428    let name = columns[0].field().into_owned().name;
2429    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2430    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2431    let out = match (a_series.dtype(), b_series.dtype()) {
2432        (DataType::Float64, _) | (_, DataType::Float64) => {
2433            let a = float_series_to_f64(&a_series)?;
2434            let b = float_series_to_f64(&b_series)?;
2435            let out = Float64Chunked::from_iter_options(
2436                name.as_str().into(),
2437                a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2438                    (Some(x), Some(y)) => Some(x.max(y)),
2439                    (Some(x), None) => Some(x),
2440                    (None, Some(y)) => Some(y),
2441                    (None, None) => None,
2442                }),
2443            );
2444            out.into_series()
2445        }
2446        (DataType::Int64, _)
2447        | (DataType::Int32, _)
2448        | (_, DataType::Int64)
2449        | (_, DataType::Int32) => {
2450            let a = a_series.cast(&DataType::Int64)?;
2451            let b = b_series.cast(&DataType::Int64)?;
2452            let ca_a = a
2453                .i64()
2454                .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2455            let ca_b = b
2456                .i64()
2457                .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2458            let out = Int64Chunked::from_iter_options(
2459                name.as_str().into(),
2460                ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2461                    (Some(x), Some(y)) => Some(x.max(y)),
2462                    (Some(x), None) => Some(x),
2463                    (None, Some(y)) => Some(y),
2464                    (None, None) => None,
2465                }),
2466            );
2467            out.into_series()
2468        }
2469        (DataType::String, _) | (_, DataType::String) => {
2470            let a = a_series.cast(&DataType::String)?;
2471            let b = b_series.cast(&DataType::String)?;
2472            let ca_a = a
2473                .str()
2474                .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2475            let ca_b = b
2476                .str()
2477                .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2478            let out = StringChunked::from_iter_options(
2479                name.as_str().into(),
2480                ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2481                    (Some(x), Some(y)) => Some(if x >= y { x } else { y }),
2482                    (Some(x), None) => Some(x),
2483                    (None, Some(y)) => Some(y),
2484                    (None, None) => None,
2485                }),
2486            );
2487            out.into_series()
2488        }
2489        _ => {
2490            let a = float_series_to_f64(&a_series)?;
2491            let b = float_series_to_f64(&b_series)?;
2492            let out = Float64Chunked::from_iter_options(
2493                name.as_str().into(),
2494                a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2495                    (Some(x), Some(y)) => Some(x.max(y)),
2496                    (Some(x), None) => Some(x),
2497                    (None, Some(y)) => Some(y),
2498                    (None, None) => None,
2499                }),
2500            );
2501            out.into_series()
2502        }
2503    };
2504    Ok(Some(Column::new(name, out)))
2505}
2506
2507/// Element-wise min of two columns (for least).
2508pub fn apply_least2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2509    if columns.len() < 2 {
2510        return Err(PolarsError::ComputeError("least2 needs two columns".into()));
2511    }
2512    let name = columns[0].field().into_owned().name;
2513    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2514    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2515    let out = match (a_series.dtype(), b_series.dtype()) {
2516        (DataType::Float64, _) | (_, DataType::Float64) => {
2517            let a = float_series_to_f64(&a_series)?;
2518            let b = float_series_to_f64(&b_series)?;
2519            let out = Float64Chunked::from_iter_options(
2520                name.as_str().into(),
2521                a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2522                    (Some(x), Some(y)) => Some(x.min(y)),
2523                    (Some(x), None) => Some(x),
2524                    (None, Some(y)) => Some(y),
2525                    (None, None) => None,
2526                }),
2527            );
2528            out.into_series()
2529        }
2530        (DataType::Int64, _)
2531        | (DataType::Int32, _)
2532        | (_, DataType::Int64)
2533        | (_, DataType::Int32) => {
2534            let a = a_series.cast(&DataType::Int64)?;
2535            let b = b_series.cast(&DataType::Int64)?;
2536            let ca_a = a
2537                .i64()
2538                .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2539            let ca_b = b
2540                .i64()
2541                .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2542            let out = Int64Chunked::from_iter_options(
2543                name.as_str().into(),
2544                ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2545                    (Some(x), Some(y)) => Some(x.min(y)),
2546                    (Some(x), None) => Some(x),
2547                    (None, Some(y)) => Some(y),
2548                    (None, None) => None,
2549                }),
2550            );
2551            out.into_series()
2552        }
2553        (DataType::String, _) | (_, DataType::String) => {
2554            let a = a_series.cast(&DataType::String)?;
2555            let b = b_series.cast(&DataType::String)?;
2556            let ca_a = a
2557                .str()
2558                .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2559            let ca_b = b
2560                .str()
2561                .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2562            let out = StringChunked::from_iter_options(
2563                name.as_str().into(),
2564                ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2565                    (Some(x), Some(y)) => Some(if x <= y { x } else { y }),
2566                    (Some(x), None) => Some(x),
2567                    (None, Some(y)) => Some(y),
2568                    (None, None) => None,
2569                }),
2570            );
2571            out.into_series()
2572        }
2573        _ => {
2574            let a = float_series_to_f64(&a_series)?;
2575            let b = float_series_to_f64(&b_series)?;
2576            let out = Float64Chunked::from_iter_options(
2577                name.as_str().into(),
2578                a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2579                    (Some(x), Some(y)) => Some(x.min(y)),
2580                    (Some(x), None) => Some(x),
2581                    (None, Some(y)) => Some(y),
2582                    (None, None) => None,
2583                }),
2584            );
2585            out.into_series()
2586        }
2587    };
2588    Ok(Some(Column::new(name, out)))
2589}
2590
2591/// Build map (list of structs {key, value}) from two list columns. PySpark map_from_arrays.
2592pub fn apply_map_from_arrays(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2593    use polars::chunked_array::StructChunked;
2594    use polars::chunked_array::builder::get_list_builder;
2595    use polars::datatypes::Field;
2596    if columns.len() < 2 {
2597        return Err(PolarsError::ComputeError(
2598            "map_from_arrays needs two columns (keys, values)".into(),
2599        ));
2600    }
2601    let name = columns[0].field().into_owned().name;
2602    let keys_series = std::mem::take(&mut columns[0]).take_materialized_series();
2603    let values_series = std::mem::take(&mut columns[1]).take_materialized_series();
2604    let keys_ca = keys_series
2605        .list()
2606        .map_err(|e| PolarsError::ComputeError(format!("map_from_arrays keys: {e}").into()))?;
2607    let values_ca = values_series
2608        .list()
2609        .map_err(|e| PolarsError::ComputeError(format!("map_from_arrays values: {e}").into()))?;
2610    let key_dtype = keys_ca.inner_dtype().clone();
2611    let value_dtype = values_ca.inner_dtype().clone();
2612    let struct_dtype = DataType::Struct(vec![
2613        Field::new("key".into(), key_dtype),
2614        Field::new("value".into(), value_dtype),
2615    ]);
2616    let mut builder = get_list_builder(&struct_dtype, 64, keys_ca.len(), name.as_str().into());
2617    for (opt_k, opt_v) in keys_ca.amortized_iter().zip(values_ca.amortized_iter()) {
2618        match (opt_k, opt_v) {
2619            (Some(k_amort), Some(v_amort)) => {
2620                let k_list = k_amort.as_ref().as_list();
2621                let v_list = v_amort.as_ref().as_list();
2622                let mut row_structs: Vec<Series> = Vec::new();
2623                for (opt_ke, opt_ve) in k_list.amortized_iter().zip(v_list.amortized_iter()) {
2624                    if let (Some(ke), Some(ve)) = (opt_ke, opt_ve) {
2625                        let ke_s = ke.deep_clone();
2626                        let ve_s = ve.deep_clone();
2627                        let len = ke_s.len();
2628                        let fields: [&Series; 2] = [&ke_s, &ve_s];
2629                        let st = StructChunked::from_series(
2630                            PlSmallStr::EMPTY,
2631                            len,
2632                            fields.iter().copied(),
2633                        )
2634                        .map_err(|e| PolarsError::ComputeError(format!("struct: {e}").into()))?
2635                        .into_series();
2636                        row_structs.push(st);
2637                    }
2638                }
2639                if row_structs.is_empty() {
2640                    builder
2641                        .append_series(&Series::new_empty(PlSmallStr::EMPTY, &struct_dtype))
2642                        .map_err(|e| PolarsError::ComputeError(format!("builder: {e}").into()))?;
2643                } else {
2644                    let mut combined = row_structs.remove(0);
2645                    for s in row_structs {
2646                        combined.extend(&s)?;
2647                    }
2648                    builder
2649                        .append_series(&combined)
2650                        .map_err(|e| PolarsError::ComputeError(format!("builder: {e}").into()))?;
2651                }
2652            }
2653            _ => {
2654                builder.append_null();
2655            }
2656        }
2657    }
2658    let out = builder.finish().into_series();
2659    Ok(Some(Column::new(name, out)))
2660}
2661
2662/// Zip two array columns into List(Struct{left, right}) for zip_with. Shorter padded with null.
2663pub fn apply_zip_arrays_to_struct(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2664    use polars::chunked_array::StructChunked;
2665    use polars::chunked_array::builder::get_list_builder;
2666    use polars::datatypes::Field;
2667    if columns.len() < 2 {
2668        return Err(PolarsError::ComputeError(
2669            "zip_arrays_to_struct needs two columns (left, right)".into(),
2670        ));
2671    }
2672    let name = columns[0].field().into_owned().name;
2673    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2674    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2675    let a_ca = a_series
2676        .list()
2677        .map_err(|e| PolarsError::ComputeError(format!("zip_with left: {e}").into()))?;
2678    let b_ca = b_series
2679        .list()
2680        .map_err(|e| PolarsError::ComputeError(format!("zip_with right: {e}").into()))?;
2681    let left_dtype = a_ca.inner_dtype().clone();
2682    let right_dtype = b_ca.inner_dtype().clone();
2683    let struct_dtype = DataType::Struct(vec![
2684        Field::new("left".into(), left_dtype.clone()),
2685        Field::new("right".into(), right_dtype.clone()),
2686    ]);
2687    let mut builder = get_list_builder(&struct_dtype, 64, a_ca.len(), name.as_str().into());
2688    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
2689        match (opt_a, opt_b) {
2690            (Some(a_amort), Some(b_amort)) => {
2691                let a_list = a_amort.as_ref().as_list();
2692                let b_list = b_amort.as_ref().as_list();
2693                let a_elems: Vec<Series> = a_list
2694                    .amortized_iter()
2695                    .flatten()
2696                    .map(|amort| amort.deep_clone())
2697                    .collect();
2698                let b_elems: Vec<Series> = b_list
2699                    .amortized_iter()
2700                    .flatten()
2701                    .map(|amort| amort.deep_clone())
2702                    .collect();
2703                let max_len = a_elems.len().max(b_elems.len());
2704                let mut row_structs: Vec<Series> = Vec::new();
2705                for i in 0..max_len {
2706                    let left_s = a_elems.get(i).cloned();
2707                    let right_s = b_elems.get(i).cloned();
2708                    let (mut left_series, mut right_series) = match (left_s, right_s) {
2709                        (Some(l), Some(r)) => (l, r),
2710                        (Some(l), None) => {
2711                            let r = Series::from_any_values_and_dtype(
2712                                PlSmallStr::EMPTY,
2713                                &[AnyValue::Null],
2714                                &right_dtype,
2715                                false,
2716                            )
2717                            .map_err(|e| {
2718                                PolarsError::ComputeError(format!("zip null: {e}").into())
2719                            })?;
2720                            (l, r)
2721                        }
2722                        (None, Some(r)) => {
2723                            let l = Series::from_any_values_and_dtype(
2724                                PlSmallStr::EMPTY,
2725                                &[AnyValue::Null],
2726                                &left_dtype,
2727                                false,
2728                            )
2729                            .map_err(|e| {
2730                                PolarsError::ComputeError(format!("zip null: {e}").into())
2731                            })?;
2732                            (l, r)
2733                        }
2734                        (None, None) => {
2735                            let mut l = Series::from_any_values_and_dtype(
2736                                PlSmallStr::EMPTY,
2737                                &[AnyValue::Null],
2738                                &left_dtype,
2739                                false,
2740                            )
2741                            .map_err(|e| {
2742                                PolarsError::ComputeError(format!("zip null: {e}").into())
2743                            })?;
2744                            l.rename("left".into());
2745                            let mut r = Series::from_any_values_and_dtype(
2746                                PlSmallStr::EMPTY,
2747                                &[AnyValue::Null],
2748                                &right_dtype,
2749                                false,
2750                            )
2751                            .map_err(|e| {
2752                                PolarsError::ComputeError(format!("zip null: {e}").into())
2753                            })?;
2754                            r.rename("right".into());
2755                            (l, r)
2756                        }
2757                    };
2758                    left_series.rename("left".into());
2759                    right_series.rename("right".into());
2760                    let len = left_series.len();
2761                    let fields: [&Series; 2] = [&left_series, &right_series];
2762                    let st =
2763                        StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
2764                            .map_err(|e| {
2765                                PolarsError::ComputeError(format!("zip struct: {e}").into())
2766                            })?
2767                            .into_series();
2768                    row_structs.push(st);
2769                }
2770                if row_structs.is_empty() {
2771                    builder
2772                        .append_series(&Series::new_empty(PlSmallStr::EMPTY, &struct_dtype))
2773                        .map_err(|e| {
2774                            PolarsError::ComputeError(format!("zip builder: {e}").into())
2775                        })?;
2776                } else {
2777                    let mut combined = row_structs.remove(0);
2778                    for s in row_structs {
2779                        combined.extend(&s)?;
2780                    }
2781                    builder.append_series(&combined).map_err(|e| {
2782                        PolarsError::ComputeError(format!("zip builder: {e}").into())
2783                    })?;
2784                }
2785            }
2786            _ => builder.append_null(),
2787        }
2788    }
2789    let out = builder.finish().into_series();
2790    Ok(Some(Column::new(name, out)))
2791}
2792
2793/// Merge two maps into List(Struct{key, value1, value2}) for map_zip_with. Union of keys.
2794pub fn apply_map_zip_to_struct(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2795    use polars::chunked_array::StructChunked;
2796    use polars::chunked_array::builder::get_list_builder;
2797    use polars::datatypes::Field;
2798    use std::collections::BTreeMap;
2799    if columns.len() < 2 {
2800        return Err(PolarsError::ComputeError(
2801            "map_zip_to_struct needs two columns (map1, map2)".into(),
2802        ));
2803    }
2804    let name = columns[0].field().into_owned().name;
2805    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2806    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2807    let a_ca = a_series
2808        .list()
2809        .map_err(|e| PolarsError::ComputeError(format!("map_zip_with map1: {e}").into()))?;
2810    let b_ca = b_series
2811        .list()
2812        .map_err(|e| PolarsError::ComputeError(format!("map_zip_with map2: {e}").into()))?;
2813    let struct_dtype_in = a_ca.inner_dtype().clone();
2814    let (key_dtype, value_dtype) = match &struct_dtype_in {
2815        DataType::Struct(fields) => {
2816            let k = fields
2817                .iter()
2818                .find(|f| f.name == "key")
2819                .map(|f| f.dtype.clone())
2820                .unwrap_or(DataType::String);
2821            let v = fields
2822                .iter()
2823                .find(|f| f.name == "value")
2824                .map(|f| f.dtype.clone())
2825                .unwrap_or(DataType::String);
2826            (k, v)
2827        }
2828        _ => (DataType::String, DataType::String),
2829    };
2830    let out_struct_dtype = DataType::Struct(vec![
2831        Field::new("key".into(), key_dtype.clone()),
2832        Field::new("value1".into(), value_dtype.clone()),
2833        Field::new("value2".into(), value_dtype.clone()),
2834    ]);
2835    let mut builder = get_list_builder(&out_struct_dtype, 64, a_ca.len(), name.as_str().into());
2836    for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
2837        match (opt_a, opt_b) {
2838            (Some(a_amort), Some(b_amort)) => {
2839                let a_list = a_amort.as_ref().as_list();
2840                let b_list = b_amort.as_ref().as_list();
2841                let mut key_to_vals: BTreeMap<String, (Series, Option<Series>, Option<Series>)> =
2842                    BTreeMap::new();
2843                for elem in a_list.amortized_iter().flatten() {
2844                    let s = elem.deep_clone();
2845                    if let Ok(st) = s.struct_() {
2846                        if let (Ok(k), Ok(v)) = (st.field_by_name("key"), st.field_by_name("value"))
2847                        {
2848                            let key_str: String = std::string::ToString::to_string(&k);
2849                            key_to_vals
2850                                .entry(key_str.clone())
2851                                .or_insert_with(|| (k.clone(), None, None))
2852                                .1 = Some(v);
2853                        }
2854                    }
2855                }
2856                for elem in b_list.amortized_iter().flatten() {
2857                    let s = elem.deep_clone();
2858                    if let Ok(st) = s.struct_() {
2859                        if let (Ok(k), Ok(v)) = (st.field_by_name("key"), st.field_by_name("value"))
2860                        {
2861                            let key_str: String = std::string::ToString::to_string(&k);
2862                            key_to_vals
2863                                .entry(key_str.clone())
2864                                .or_insert_with(|| (k.clone(), None, None))
2865                                .2 = Some(v);
2866                        }
2867                    }
2868                }
2869                let mut row_structs: Vec<Series> = Vec::new();
2870                for (_, (k_series, v1_opt, v2_opt)) in key_to_vals {
2871                    let mut k_renamed = k_series;
2872                    k_renamed.rename("key".into());
2873                    let v1_fallback = || {
2874                        Series::from_any_values_and_dtype(
2875                            PlSmallStr::EMPTY,
2876                            &[AnyValue::Null],
2877                            &value_dtype,
2878                            false,
2879                        )
2880                        .map_err(|e| {
2881                            PolarsError::ComputeError(format!("map_zip null fallback: {e}").into())
2882                        })
2883                    };
2884                    let mut v1_series = match v1_opt {
2885                        Some(s) => s,
2886                        None => v1_fallback()?,
2887                    };
2888                    v1_series.rename("value1".into());
2889                    let v2_fallback = || {
2890                        Series::from_any_values_and_dtype(
2891                            PlSmallStr::EMPTY,
2892                            &[AnyValue::Null],
2893                            &value_dtype,
2894                            false,
2895                        )
2896                        .map_err(|e| {
2897                            PolarsError::ComputeError(format!("map_zip null fallback: {e}").into())
2898                        })
2899                    };
2900                    let mut v2_series = match v2_opt {
2901                        Some(s) => s,
2902                        None => v2_fallback()?,
2903                    };
2904                    v2_series.rename("value2".into());
2905                    let len = k_renamed.len();
2906                    let fields: [&Series; 3] = [&k_renamed, &v1_series, &v2_series];
2907                    let st =
2908                        StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
2909                            .map_err(|e| {
2910                                PolarsError::ComputeError(format!("map_zip struct: {e}").into())
2911                            })?
2912                            .into_series();
2913                    row_structs.push(st);
2914                }
2915                if row_structs.is_empty() {
2916                    builder
2917                        .append_series(&Series::new_empty(PlSmallStr::EMPTY, &out_struct_dtype))
2918                        .map_err(|e| {
2919                            PolarsError::ComputeError(format!("map_zip builder: {e}").into())
2920                        })?;
2921                } else {
2922                    let mut combined = row_structs.remove(0);
2923                    for s in row_structs {
2924                        combined.extend(&s)?;
2925                    }
2926                    builder.append_series(&combined).map_err(|e| {
2927                        PolarsError::ComputeError(format!("map_zip builder: {e}").into())
2928                    })?;
2929                }
2930            }
2931            _ => builder.append_null(),
2932        }
2933    }
2934    let out = builder.finish().into_series();
2935    Ok(Some(Column::new(name, out)))
2936}
2937
2938/// typeof: return dtype as string (PySpark typeof).
2939pub fn apply_typeof(column: Column) -> PolarsResult<Option<Column>> {
2940    let name = column.field().into_owned().name;
2941    let series = column.take_materialized_series();
2942    let dtype_str = format!("{:?}", series.dtype());
2943    let len = series.len();
2944    let out = StringChunked::from_iter_options(
2945        name.as_str().into(),
2946        (0..len).map(|_| Some(dtype_str.clone())),
2947    );
2948    Ok(Some(Column::new(name, out.into_series())))
2949}
2950
2951/// Helper: get two series as Int64Chunked with shared error context.
2952fn binary_series_i64(
2953    a: &Series,
2954    b: &Series,
2955    ctx: &str,
2956) -> PolarsResult<(Int64Chunked, Int64Chunked)> {
2957    let ca_a = a
2958        .i64()
2959        .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2960        .clone();
2961    let ca_b = b
2962        .i64()
2963        .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2964        .clone();
2965    Ok((ca_a, ca_b))
2966}
2967
2968/// Helper: get two series as Int32Chunked with shared error context.
2969fn binary_series_i32(
2970    a: &Series,
2971    b: &Series,
2972    ctx: &str,
2973) -> PolarsResult<(Int32Chunked, Int32Chunked)> {
2974    let ca_a = a
2975        .i32()
2976        .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2977        .clone();
2978    let ca_b = b
2979        .i32()
2980        .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2981        .clone();
2982    Ok((ca_a, ca_b))
2983}
2984
2985/// Helper: get two series as Float64Chunked with shared error context.
2986fn binary_series_f64(
2987    a: &Series,
2988    b: &Series,
2989    ctx: &str,
2990) -> PolarsResult<(Float64Chunked, Float64Chunked)> {
2991    let a_f = a.cast(&DataType::Float64)?;
2992    let b_f = b.cast(&DataType::Float64)?;
2993    let ca_a = a_f
2994        .f64()
2995        .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2996        .clone();
2997    let ca_b = b_f
2998        .f64()
2999        .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
3000        .clone();
3001    Ok((ca_a, ca_b))
3002}
3003
3004/// Helper: convert any numeric-or-string series to Float64Chunked with PySpark-like semantics.
3005///
3006/// - Numeric types are cast to Float64.
3007/// - Utf8/string values are trimmed and parsed as f64; invalid strings become null.
3008/// - Null values remain null.
3009fn series_to_f64_pyspark(s: &Series, ctx: &str) -> PolarsResult<Float64Chunked> {
3010    match s.dtype() {
3011        DataType::String => {
3012            let name = s.name();
3013            let ca = s
3014                .str()
3015                .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?;
3016            let out = Float64Chunked::from_iter_options(
3017                name.as_str().into(),
3018                ca.into_iter().map(|opt_s| {
3019                    opt_s.and_then(|raw| {
3020                        let trimmed = raw.trim();
3021                        if trimmed.is_empty() {
3022                            return None;
3023                        }
3024                        trimmed.parse::<f64>().ok()
3025                    })
3026                }),
3027            );
3028            Ok(out)
3029        }
3030        DataType::Null => {
3031            // All-null column – return a null Float64Chunked of same length.
3032            Ok(Float64Chunked::full_null(PlSmallStr::EMPTY, s.len()))
3033        }
3034        // For all other dtypes, rely on Polars cast to Float64.
3035        _ => {
3036            let casted = s
3037                .cast(&DataType::Float64)
3038                .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?;
3039            casted
3040                .f64()
3041                .cloned()
3042                .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))
3043        }
3044    }
3045}
3046
3047/// Binary arithmetic helper used for Python Column operators with PySpark-style string coercion.
3048///
3049/// Always returns a Float64 series; string inputs are coerced to numbers when possible,
3050/// invalid strings become null. Numeric-only inputs are upcast to Float64.
3051/// Broadcasts scalar (length-1) operands to match the other series length (PySpark parity).
3052#[allow(clippy::useless_conversion)]
3053fn pyspark_binary_arith(
3054    columns: &mut [Column],
3055    ctx: &str,
3056    op: fn(f64, f64) -> f64,
3057) -> PolarsResult<Option<Column>> {
3058    if columns.len() < 2 {
3059        return Err(PolarsError::ComputeError(
3060            format!("{ctx} needs two columns").into(),
3061        ));
3062    }
3063
3064    let name = columns[0].field().into_owned().name;
3065    let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3066    let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3067
3068    let mut ca_a = series_to_f64_pyspark(&a_s, ctx)?;
3069    let mut ca_b = series_to_f64_pyspark(&b_s, ctx)?;
3070
3071    // Broadcast scalar (length-1) to match the other series (#179)
3072    let len_a = ca_a.len();
3073    let len_b = ca_b.len();
3074    if len_a == 1 && len_b > 1 {
3075        let val = ca_a.get(0);
3076        ca_a = Float64Chunked::from_iter_options(
3077            name.as_str().into(),
3078            std::iter::repeat_n(val, len_b),
3079        );
3080    } else if len_b == 1 && len_a > 1 {
3081        let val = ca_b.get(0);
3082        ca_b = Float64Chunked::from_iter_options(
3083            name.as_str().into(),
3084            std::iter::repeat_n(val, len_a),
3085        );
3086    }
3087
3088    let out = Float64Chunked::from_iter_options(
3089        name.as_str().into(),
3090        ca_a.into_iter()
3091            .zip(ca_b.into_iter())
3092            .map(|(oa, ob)| match (oa, ob) {
3093                (Some(a), Some(b)) => Some(op(a, b)),
3094                _ => None,
3095            }),
3096    )
3097    .into_series();
3098
3099    Ok(Some(Column::new(name, out)))
3100}
3101
3102/// PySpark-style addition with string/number coercion for Python Column operators.
3103pub fn apply_pyspark_add(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3104    pyspark_binary_arith(columns, "pyspark_add", |a, b| a + b)
3105}
3106
3107/// PySpark-style subtraction with string/number coercion for Python Column operators.
3108pub fn apply_pyspark_subtract(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3109    pyspark_binary_arith(columns, "pyspark_subtract", |a, b| a - b)
3110}
3111
3112/// PySpark-style multiplication with string/number coercion for Python Column operators.
3113pub fn apply_pyspark_multiply(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3114    pyspark_binary_arith(columns, "pyspark_multiply", |a, b| a * b)
3115}
3116
3117/// PySpark-style true division with string/number coercion for Python Column operators.
3118/// Division by zero yields null (Spark/PySpark parity; issue #218).
3119#[allow(clippy::useless_conversion)] // ChunkedArray needs .into_iter() for owned iteration
3120pub fn apply_pyspark_divide(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3121    if columns.len() < 2 {
3122        return Err(PolarsError::ComputeError(
3123            "pyspark_divide needs two columns".into(),
3124        ));
3125    }
3126
3127    let name = columns[0].field().into_owned().name;
3128    let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3129    let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3130
3131    let mut ca_a = series_to_f64_pyspark(&a_s, "pyspark_divide")?;
3132    let mut ca_b = series_to_f64_pyspark(&b_s, "pyspark_divide")?;
3133
3134    let len_a = ca_a.len();
3135    let len_b = ca_b.len();
3136    if len_a == 1 && len_b > 1 {
3137        let val = ca_a.get(0);
3138        ca_a = Float64Chunked::from_iter_options(
3139            name.as_str().into(),
3140            std::iter::repeat_n(val, len_b),
3141        );
3142    } else if len_b == 1 && len_a > 1 {
3143        let val = ca_b.get(0);
3144        ca_b = Float64Chunked::from_iter_options(
3145            name.as_str().into(),
3146            std::iter::repeat_n(val, len_a),
3147        );
3148    }
3149
3150    let out = Float64Chunked::from_iter_options(
3151        name.as_str().into(),
3152        ca_a.into_iter()
3153            .zip(ca_b.into_iter())
3154            .map(|(oa, ob)| match (oa, ob) {
3155                (Some(a), Some(b)) => {
3156                    if b == 0.0 {
3157                        None // division by zero -> null (Spark parity, #218)
3158                    } else {
3159                        Some(a / b)
3160                    }
3161                }
3162                _ => None,
3163            }),
3164    )
3165    .into_series();
3166
3167    Ok(Some(Column::new(name, out)))
3168}
3169
3170/// PySpark-style modulo with string/number coercion for Python Column operators.
3171pub fn apply_pyspark_mod(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3172    pyspark_binary_arith(columns, "pyspark_mod", |a, b| a % b)
3173}
3174
3175/// try_add: returns null on overflow.
3176pub fn apply_try_add(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3177    if columns.len() < 2 {
3178        return Err(PolarsError::ComputeError(
3179            "try_add needs two columns".into(),
3180        ));
3181    }
3182    let name = columns[0].field().into_owned().name;
3183    let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3184    let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3185    let out = match (a_s.dtype(), b_s.dtype()) {
3186        (DataType::Int64, DataType::Int64) => {
3187            let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_add")?;
3188            Int64Chunked::from_iter_options(
3189                name.as_str().into(),
3190                ca_a.into_iter()
3191                    .zip(&ca_b)
3192                    .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_add(b)))),
3193            )
3194            .into_series()
3195        }
3196        (DataType::Int32, DataType::Int32) => {
3197            let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_add")?;
3198            Int32Chunked::from_iter_options(
3199                name.as_str().into(),
3200                ca_a.into_iter()
3201                    .zip(&ca_b)
3202                    .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_add(b)))),
3203            )
3204            .into_series()
3205        }
3206        _ => {
3207            let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_add")?;
3208            Float64Chunked::from_iter_options(
3209                name.as_str().into(),
3210                ca_a.into_iter()
3211                    .zip(&ca_b)
3212                    .map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a + b))),
3213            )
3214            .into_series()
3215        }
3216    };
3217    Ok(Some(Column::new(name, out)))
3218}
3219
3220/// try_subtract: returns null on overflow.
3221pub fn apply_try_subtract(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3222    if columns.len() < 2 {
3223        return Err(PolarsError::ComputeError(
3224            "try_subtract needs two columns".into(),
3225        ));
3226    }
3227    let name = columns[0].field().into_owned().name;
3228    let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3229    let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3230    let out = match (a_s.dtype(), b_s.dtype()) {
3231        (DataType::Int64, DataType::Int64) => {
3232            let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_subtract")?;
3233            Int64Chunked::from_iter_options(
3234                name.as_str().into(),
3235                ca_a.into_iter()
3236                    .zip(&ca_b)
3237                    .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_sub(b)))),
3238            )
3239            .into_series()
3240        }
3241        (DataType::Int32, DataType::Int32) => {
3242            let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_subtract")?;
3243            Int32Chunked::from_iter_options(
3244                name.as_str().into(),
3245                ca_a.into_iter()
3246                    .zip(&ca_b)
3247                    .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_sub(b)))),
3248            )
3249            .into_series()
3250        }
3251        _ => {
3252            let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_subtract")?;
3253            Float64Chunked::from_iter_options(
3254                name.as_str().into(),
3255                ca_a.into_iter()
3256                    .zip(&ca_b)
3257                    .map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a - b))),
3258            )
3259            .into_series()
3260        }
3261    };
3262    Ok(Some(Column::new(name, out)))
3263}
3264
3265/// try_multiply: returns null on overflow.
3266pub fn apply_try_multiply(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3267    if columns.len() < 2 {
3268        return Err(PolarsError::ComputeError(
3269            "try_multiply needs two columns".into(),
3270        ));
3271    }
3272    let name = columns[0].field().into_owned().name;
3273    let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3274    let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3275    let out = match (a_s.dtype(), b_s.dtype()) {
3276        (DataType::Int64, DataType::Int64) => {
3277            let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_multiply")?;
3278            Int64Chunked::from_iter_options(
3279                name.as_str().into(),
3280                ca_a.into_iter()
3281                    .zip(&ca_b)
3282                    .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_mul(b)))),
3283            )
3284            .into_series()
3285        }
3286        (DataType::Int32, DataType::Int32) => {
3287            let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_multiply")?;
3288            Int32Chunked::from_iter_options(
3289                name.as_str().into(),
3290                ca_a.into_iter()
3291                    .zip(&ca_b)
3292                    .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_mul(b)))),
3293            )
3294            .into_series()
3295        }
3296        _ => {
3297            let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_multiply")?;
3298            Float64Chunked::from_iter_options(
3299                name.as_str().into(),
3300                ca_a.into_iter()
3301                    .zip(&ca_b)
3302                    .map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a * b))),
3303            )
3304            .into_series()
3305        }
3306    };
3307    Ok(Some(Column::new(name, out)))
3308}
3309
3310/// Strip PySpark/Java SimpleDateFormat quoted literals: 'T' -> T, '' -> '. So chrono gets unquoted pattern.
3311fn unquote_simple_date_format(s: &str) -> String {
3312    let mut out = String::with_capacity(s.len());
3313    let mut chars = s.chars().peekable();
3314    while let Some(c) = chars.next() {
3315        if c == '\'' {
3316            if chars.peek() == Some(&'\'') {
3317                chars.next();
3318                out.push('\'');
3319            } else {
3320                for q in chars.by_ref() {
3321                    if q == '\'' {
3322                        break;
3323                    }
3324                    out.push(q);
3325                }
3326            }
3327        } else {
3328            out.push(c);
3329        }
3330    }
3331    out
3332}
3333
3334/// Map PySpark/Java SimpleDateFormat style to chrono strftime. Public for to_char/date_format.
3335/// Handles quoted literals (e.g. 'T' in "yyyy-MM-dd'T'HH:mm:ss") so they become literal in chrono (#273).
3336pub(crate) fn pyspark_format_to_chrono(s: &str) -> String {
3337    let s = unquote_simple_date_format(s);
3338    s.replace("yyyy", "%Y")
3339        .replace("MM", "%m")
3340        .replace("dd", "%d")
3341        .replace("HH", "%H")
3342        .replace("mm", "%M")
3343        .replace("ss", "%S")
3344}
3345
3346/// unix_timestamp(column, format?) - parse string to seconds since epoch.
3347pub fn apply_unix_timestamp(column: Column, format: Option<&str>) -> PolarsResult<Option<Column>> {
3348    use chrono::{DateTime, NaiveDateTime, Utc};
3349    let chrono_fmt = format
3350        .map(pyspark_format_to_chrono)
3351        .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
3352    let name = column.field().into_owned().name;
3353    let series = column.take_materialized_series();
3354    let ca = series
3355        .str()
3356        .map_err(|e| PolarsError::ComputeError(format!("unix_timestamp: {e}").into()))?;
3357    let out = Int64Chunked::from_iter_options(
3358        name.as_str().into(),
3359        ca.into_iter().map(|opt_s| {
3360            opt_s.and_then(|s| {
3361                NaiveDateTime::parse_from_str(s, &chrono_fmt)
3362                    .ok()
3363                    .map(|ndt| DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc).timestamp())
3364            })
3365        }),
3366    );
3367    Ok(Some(Column::new(name, out.into_series())))
3368}
3369
3370/// from_unixtime(column, format?) - seconds since epoch to formatted string.
3371pub fn apply_from_unixtime(column: Column, format: Option<&str>) -> PolarsResult<Option<Column>> {
3372    use chrono::{DateTime, Utc};
3373    let chrono_fmt = format
3374        .map(pyspark_format_to_chrono)
3375        .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
3376    let name = column.field().into_owned().name;
3377    let series = column.take_materialized_series();
3378    let casted = series
3379        .cast(&DataType::Int64)
3380        .map_err(|e| PolarsError::ComputeError(format!("from_unixtime cast: {e}").into()))?;
3381    let ca = casted
3382        .i64()
3383        .map_err(|e| PolarsError::ComputeError(format!("from_unixtime: {e}").into()))?;
3384    let out = StringChunked::from_iter_options(
3385        name.as_str().into(),
3386        ca.into_iter().map(|opt_secs| {
3387            opt_secs.and_then(|secs| {
3388                DateTime::<Utc>::from_timestamp(secs, 0)
3389                    .map(|dt| dt.format(&chrono_fmt).to_string())
3390            })
3391        }),
3392    );
3393    Ok(Some(Column::new(name, out.into_series())))
3394}
3395
3396/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (micros).
3397/// When timezone is Some(tz_str), components are interpreted as local time in that zone, then converted to UTC.
3398pub fn apply_make_timestamp(
3399    columns: &mut [Column],
3400    timezone: Option<&str>,
3401) -> PolarsResult<Option<Column>> {
3402    use chrono::{NaiveDate, Utc};
3403    use polars::datatypes::TimeUnit;
3404    if columns.len() < 6 {
3405        return Err(PolarsError::ComputeError(
3406            "make_timestamp needs six columns (year, month, day, hour, min, sec)".into(),
3407        ));
3408    }
3409    let tz: Option<Tz> = timezone
3410        .map(|s| {
3411            s.parse()
3412                .map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {s}").into()))
3413        })
3414        .transpose()?;
3415    let name = columns[0].field().into_owned().name;
3416    let series: Vec<Series> = (0..6)
3417        .map(|i| std::mem::take(&mut columns[i]).take_materialized_series())
3418        .collect();
3419    let ca: Vec<Int32Chunked> = series
3420        .iter()
3421        .map(|s| {
3422            let c = s.cast(&DataType::Int32)?;
3423            Ok(c.i32()
3424                .map_err(|e| PolarsError::ComputeError(format!("make_timestamp: {e}").into()))?
3425                .clone())
3426        })
3427        .collect::<PolarsResult<Vec<_>>>()?;
3428    let len = ca[0].len();
3429    let out =
3430        Int64Chunked::from_iter_options(
3431            name.as_str().into(),
3432            (0..len).map(|i| {
3433                let y = ca[0].get(i)?;
3434                let m = ca[1].get(i)?;
3435                let d = ca[2].get(i)?;
3436                let h = ca[3].get(i).unwrap_or(0);
3437                let min = ca[4].get(i).unwrap_or(0);
3438                let s = ca[5].get(i).unwrap_or(0);
3439                let date = NaiveDate::from_ymd_opt(y, m as u32, d as u32)?;
3440                let naive = date.and_hms_opt(h as u32, min as u32, s as u32)?;
3441                match &tz {
3442                    Some(tz) => tz.from_local_datetime(&naive).single().map(
3443                        |dt_tz: chrono::DateTime<Tz>| dt_tz.with_timezone(&Utc).timestamp_micros(),
3444                    ),
3445                    None => Some(naive.and_utc().timestamp_micros()),
3446                }
3447            }),
3448        );
3449    let out_series = out
3450        .into_series()
3451        .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
3452    Ok(Some(Column::new(name, out_series)))
3453}
3454
3455/// to_timestamp(column, format?) / try_to_timestamp(column, format?) - string to timestamp.
3456/// When format is Some, parse with that format (PySpark-style mapped to chrono); when None, use default.
3457/// Strips whitespace from string values before parsing (PySpark parity #273).
3458/// strict: true for to_timestamp (error on invalid), false for try_to_timestamp (null on invalid).
3459pub fn apply_to_timestamp_format(
3460    column: Column,
3461    format: Option<&str>,
3462    strict: bool,
3463) -> PolarsResult<Option<Column>> {
3464    use chrono::NaiveDateTime;
3465    use polars::datatypes::TimeUnit;
3466    let name = column.field().into_owned().name;
3467    let series = column.take_materialized_series();
3468    if series.dtype() != &DataType::String {
3469        let out_series = series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
3470        return Ok(Some(Column::new(name, out_series)));
3471    }
3472    let chrono_fmt = format
3473        .map(pyspark_format_to_chrono)
3474        .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
3475    let ca = series
3476        .str()
3477        .map_err(|e| PolarsError::ComputeError(format!("to_timestamp: {e}").into()))?;
3478    let out = Int64Chunked::from_iter_options(
3479        name.as_str().into(),
3480        ca.into_iter().map(|opt_s| {
3481            opt_s.and_then(|s| {
3482                NaiveDateTime::parse_from_str(s.trim(), &chrono_fmt)
3483                    .ok()
3484                    .map(|ndt| ndt.and_utc().timestamp_micros())
3485            })
3486        }),
3487    );
3488    let out_series = out
3489        .into_series()
3490        .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
3491    if strict {
3492        // to_timestamp: ensure no nulls introduced by parse (optional: could check and error)
3493        Ok(Some(Column::new(name, out_series)))
3494    } else {
3495        Ok(Some(Column::new(name, out_series)))
3496    }
3497}
3498
3499/// Coerce series to Datetime(Microseconds) for date-part extraction (#403). String parsed as "%Y-%m-%d %H:%M:%S".
3500fn series_to_datetime_micros(series: &Series) -> PolarsResult<Series> {
3501    use chrono::NaiveDateTime;
3502    use polars::datatypes::TimeUnit;
3503    if series.dtype() == &DataType::String {
3504        let name = series.name().as_str().into();
3505        let ca = series
3506            .str()
3507            .map_err(|e| PolarsError::ComputeError(format!("date on string: {e}").into()))?;
3508        const FMT: &str = "%Y-%m-%d %H:%M:%S";
3509        let out = Int64Chunked::from_iter_options(
3510            name,
3511            ca.into_iter().map(|opt_s| {
3512                opt_s.and_then(|s| {
3513                    NaiveDateTime::parse_from_str(s.trim(), FMT)
3514                        .ok()
3515                        .map(|ndt| ndt.and_utc().timestamp_micros())
3516                })
3517            }),
3518        );
3519        out.into_series()
3520            .cast(&DataType::Datetime(TimeUnit::Microseconds, None))
3521    } else {
3522        series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
3523    }
3524}
3525
3526/// hour(column) - extract hour (0-23). Accepts string timestamp column (#403).
3527pub fn apply_hour(column: Column) -> PolarsResult<Option<Column>> {
3528    let name = column.field().into_owned().name;
3529    let series = column.take_materialized_series();
3530    let dt_series = series_to_datetime_micros(&series)?;
3531    let ca = dt_series
3532        .datetime()
3533        .map_err(|e| PolarsError::ComputeError(format!("hour: {e}").into()))?;
3534    let out = ca.hour().into_series();
3535    Ok(Some(Column::new(name, out)))
3536}
3537
3538/// minute(column) - extract minute. Accepts string timestamp column (#403).
3539pub fn apply_minute(column: Column) -> PolarsResult<Option<Column>> {
3540    let name = column.field().into_owned().name;
3541    let series = column.take_materialized_series();
3542    let dt_series = series_to_datetime_micros(&series)?;
3543    let ca = dt_series
3544        .datetime()
3545        .map_err(|e| PolarsError::ComputeError(format!("minute: {e}").into()))?;
3546    let out = ca.minute().into_series();
3547    Ok(Some(Column::new(name, out)))
3548}
3549
3550/// second(column) - extract second. Accepts string timestamp column (#403).
3551pub fn apply_second(column: Column) -> PolarsResult<Option<Column>> {
3552    let name = column.field().into_owned().name;
3553    let series = column.take_materialized_series();
3554    let dt_series = series_to_datetime_micros(&series)?;
3555    let ca = dt_series
3556        .datetime()
3557        .map_err(|e| PolarsError::ComputeError(format!("second: {e}").into()))?;
3558    let out = ca.second().into_series();
3559    Ok(Some(Column::new(name, out)))
3560}
3561
3562/// make_date(year, month, day) - three columns to date.
3563pub fn apply_make_date(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3564    use chrono::NaiveDate;
3565    if columns.len() < 3 {
3566        return Err(PolarsError::ComputeError(
3567            "make_date needs three columns (year, month, day)".into(),
3568        ));
3569    }
3570    let name = columns[0].field().into_owned().name;
3571    let y_series = std::mem::take(&mut columns[0]).take_materialized_series();
3572    let m_series = std::mem::take(&mut columns[1]).take_materialized_series();
3573    let d_series = std::mem::take(&mut columns[2]).take_materialized_series();
3574    let y_ca = y_series
3575        .cast(&DataType::Int32)?
3576        .i32()
3577        .map_err(|e| PolarsError::ComputeError(format!("make_date: {e}").into()))?
3578        .clone();
3579    let m_ca = m_series
3580        .cast(&DataType::Int32)?
3581        .i32()
3582        .map_err(|e| PolarsError::ComputeError(format!("make_date: {e}").into()))?
3583        .clone();
3584    let d_ca = d_series
3585        .cast(&DataType::Int32)?
3586        .i32()
3587        .map_err(|e| PolarsError::ComputeError(format!("make_date: {e}").into()))?
3588        .clone();
3589    let out = Int32Chunked::from_iter_options(
3590        name.as_str().into(),
3591        y_ca.into_iter()
3592            .zip(&m_ca)
3593            .zip(&d_ca)
3594            .map(|((oy, om), od)| match (oy, om, od) {
3595                (Some(y), Some(m), Some(d)) => {
3596                    NaiveDate::from_ymd_opt(y, m as u32, d as u32).map(naivedate_to_days)
3597                }
3598                _ => None,
3599            }),
3600    );
3601    let out_series = out.into_series().cast(&DataType::Date)?;
3602    Ok(Some(Column::new(name, out_series)))
3603}
3604
3605/// unix_date(column) - date to days since 1970-01-01.
3606pub fn apply_unix_date(column: Column) -> PolarsResult<Option<Column>> {
3607    let name = column.field().into_owned().name;
3608    let series = column.take_materialized_series();
3609    let casted = series.cast(&DataType::Date)?;
3610    let days = casted.cast(&DataType::Int32)?;
3611    Ok(Some(Column::new(name, days)))
3612}
3613
3614/// date_from_unix_date(column) - days since epoch to date.
3615pub fn apply_date_from_unix_date(column: Column) -> PolarsResult<Option<Column>> {
3616    let name = column.field().into_owned().name;
3617    let series = column.take_materialized_series();
3618    let days = series.cast(&DataType::Int32)?;
3619    let out = days.cast(&DataType::Date)?;
3620    Ok(Some(Column::new(name, out)))
3621}
3622
3623/// pmod(dividend, divisor) - positive modulus.
3624pub fn apply_pmod(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3625    if columns.len() < 2 {
3626        return Err(PolarsError::ComputeError("pmod needs two columns".into()));
3627    }
3628    let name = columns[0].field().into_owned().name;
3629    let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
3630    let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
3631    let a = float_series_to_f64(&a_series)?;
3632    let b = float_series_to_f64(&b_series)?;
3633    let out = Float64Chunked::from_iter_options(
3634        name.as_str().into(),
3635        a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
3636            (Some(x), Some(y)) if y != 0.0 => {
3637                let r = x % y;
3638                Some(if r >= 0.0 { r } else { r + y.abs() })
3639            }
3640            _ => None,
3641        }),
3642    );
3643    Ok(Some(Column::new(name, out.into_series())))
3644}
3645
3646/// factorial(n) - n! for n in 0..=20; null otherwise.
3647fn factorial_u64(n: i64) -> Option<i64> {
3648    if n < 0 {
3649        return None;
3650    }
3651    if n > 20 {
3652        return None; // 21! overflows i64
3653    }
3654    let mut acc: i64 = 1;
3655    for i in 1..=n {
3656        acc = acc.checked_mul(i)?;
3657    }
3658    Some(acc)
3659}
3660
3661/// from_utc_timestamp(ts_col, tz) - interpret ts as UTC, convert to tz. Timestamps stored as UTC micros; instant unchanged.
3662pub fn apply_from_utc_timestamp(column: Column, tz_str: &str) -> PolarsResult<Option<Column>> {
3663    let _: Tz = tz_str
3664        .parse()
3665        .map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {tz_str}").into()))?;
3666    Ok(Some(column))
3667}
3668
3669/// to_utc_timestamp(ts_col, tz) - interpret ts as in tz, convert to UTC. For UTC-stored timestamps, instant unchanged.
3670pub fn apply_to_utc_timestamp(column: Column, tz_str: &str) -> PolarsResult<Option<Column>> {
3671    let _: Tz = tz_str
3672        .parse()
3673        .map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {tz_str}").into()))?;
3674    Ok(Some(column))
3675}
3676
3677/// convert_timezone(source_tz, target_tz, ts_col) - convert between timezones. Same instant.
3678pub fn apply_convert_timezone(
3679    column: Column,
3680    _source_tz: &str,
3681    _target_tz: &str,
3682) -> PolarsResult<Option<Column>> {
3683    Ok(Some(column))
3684}
3685
3686/// factorial(column) - element-wise factorial.
3687pub fn apply_factorial(column: Column) -> PolarsResult<Option<Column>> {
3688    let name = column.field().into_owned().name;
3689    let series = column.take_materialized_series();
3690    let casted = series
3691        .cast(&DataType::Int64)
3692        .map_err(|e| PolarsError::ComputeError(format!("factorial cast: {e}").into()))?;
3693    let ca = casted
3694        .i64()
3695        .map_err(|e| PolarsError::ComputeError(format!("factorial: {e}").into()))?;
3696    let out = Int64Chunked::from_iter_options(
3697        name.as_str().into(),
3698        ca.into_iter().map(|opt_n| opt_n.and_then(factorial_u64)),
3699    );
3700    Ok(Some(Column::new(name, out.into_series())))
3701}
3702
3703/// url_decode(column) - percent-decode URL-encoded string (PySpark url_decode).
3704pub fn apply_url_decode(column: Column) -> PolarsResult<Option<Column>> {
3705    use percent_encoding::percent_decode_str;
3706    let name = column.field().into_owned().name;
3707    let series = column.take_materialized_series();
3708    let ca = series
3709        .str()
3710        .map_err(|e| PolarsError::ComputeError(format!("url_decode: {e}").into()))?;
3711    let out = StringChunked::from_iter_options(
3712        name.as_str().into(),
3713        ca.into_iter().map(|opt_s| {
3714            opt_s.and_then(|s| {
3715                percent_decode_str(s)
3716                    .decode_utf8()
3717                    .ok()
3718                    .map(|c| c.into_owned())
3719            })
3720        }),
3721    );
3722    Ok(Some(Column::new(name, out.into_series())))
3723}
3724
3725/// url_encode(column) - percent-encode string for URL (PySpark url_encode).
3726pub fn apply_url_encode(column: Column) -> PolarsResult<Option<Column>> {
3727    use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
3728    let name = column.field().into_owned().name;
3729    let series = column.take_materialized_series();
3730    let ca = series
3731        .str()
3732        .map_err(|e| PolarsError::ComputeError(format!("url_encode: {e}").into()))?;
3733    let out = StringChunked::from_iter_options(
3734        name.as_str().into(),
3735        ca.into_iter().map(|opt_s| {
3736            opt_s.map(|s| {
3737                utf8_percent_encode(s, NON_ALPHANUMERIC)
3738                    .to_string()
3739                    .replace("%20", "+")
3740            })
3741        }),
3742    );
3743    Ok(Some(Column::new(name, out.into_series())))
3744}
3745
3746/// shiftRightUnsigned - logical right shift for i64 (PySpark shiftRightUnsigned).
3747pub fn apply_shift_right_unsigned(column: Column, n: i32) -> PolarsResult<Option<Column>> {
3748    let name = column.field().into_owned().name;
3749    let series = column.take_materialized_series();
3750    let s = series.cast(&DataType::Int64)?;
3751    let ca = s
3752        .i64()
3753        .map_err(|e| PolarsError::ComputeError(format!("shift_right_unsigned: {e}").into()))?;
3754    let u = n as u32;
3755    let out = Int64Chunked::from_iter_options(
3756        name.as_str().into(),
3757        ca.into_iter()
3758            .map(|opt_v| opt_v.map(|v| ((v as u64) >> u) as i64)),
3759    );
3760    Ok(Some(Column::new(name, out.into_series())))
3761}
3762
3763/// json_array_length(json_str, path) - length of JSON array at path (PySpark json_array_length).
3764pub fn apply_json_array_length(column: Column, path: &str) -> PolarsResult<Option<Column>> {
3765    let name = column.field().into_owned().name;
3766    let series = column.take_materialized_series();
3767    let ca = series
3768        .str()
3769        .map_err(|e| PolarsError::ComputeError(format!("json_array_length: {e}").into()))?;
3770    let path = path.trim_start_matches('$').trim_start_matches('.');
3771    let path_parts: Vec<&str> = if path.is_empty() {
3772        vec![]
3773    } else {
3774        path.split('.').collect()
3775    };
3776    let out = Int64Chunked::from_iter_options(
3777        name.as_str().into(),
3778        ca.into_iter().map(|opt_s| {
3779            opt_s.and_then(|s| {
3780                let v: serde_json::Value = serde_json::from_str(s).ok()?;
3781                let mut current = &v;
3782                for part in &path_parts {
3783                    current = current.get(part)?;
3784                }
3785                current.as_array().map(|a| a.len() as i64)
3786            })
3787        }),
3788    );
3789    Ok(Some(Column::new(name, out.into_series())))
3790}
3791
3792/// json_object_keys(json_str) - return list of keys of JSON object (PySpark json_object_keys).
3793pub fn apply_json_object_keys(column: Column) -> PolarsResult<Option<Column>> {
3794    let name = column.field().into_owned().name;
3795    let series = column.take_materialized_series();
3796    let ca = series
3797        .str()
3798        .map_err(|e| PolarsError::ComputeError(format!("json_object_keys: {e}").into()))?;
3799    let out: ListChunked = ca
3800        .into_iter()
3801        .map(|opt_s| {
3802            opt_s.and_then(|s| {
3803                let v: serde_json::Value = serde_json::from_str(s).ok()?;
3804                let obj = v.as_object()?;
3805                let keys: Vec<String> = obj.keys().map(String::from).collect();
3806                Some(Series::new("".into(), keys))
3807            })
3808        })
3809        .collect();
3810    Ok(Some(Column::new(name, out.into_series())))
3811}
3812
3813/// json_tuple(json_str, key1, key2, ...) - extract keys from JSON; returns struct with one field per key (PySpark json_tuple).
3814pub fn apply_json_tuple(column: Column, keys: &[String]) -> PolarsResult<Option<Column>> {
3815    let name = column.field().into_owned().name;
3816    let series = column.take_materialized_series();
3817    let ca = series
3818        .str()
3819        .map_err(|e| PolarsError::ComputeError(format!("json_tuple: {e}").into()))?;
3820    let keys = keys.to_vec();
3821    let mut columns_per_key: Vec<Vec<Option<String>>> =
3822        (0..keys.len()).map(|_| Vec::new()).collect();
3823    for opt_s in ca.into_iter() {
3824        for (i, key) in keys.iter().enumerate() {
3825            let val = opt_s.and_then(|s| {
3826                let v: serde_json::Value = serde_json::from_str(s).ok()?;
3827                let obj = v.as_object()?;
3828                obj.get(key).and_then(|x| x.as_str()).map(String::from)
3829            });
3830            columns_per_key[i].push(val);
3831        }
3832    }
3833    let field_series: Vec<Series> = keys
3834        .iter()
3835        .zip(columns_per_key.iter())
3836        .map(|(k, vals)| Series::new(k.as_str().into(), vals.clone()))
3837        .collect();
3838    let out_df = DataFrame::new_infer_height(field_series.into_iter().map(|s| s.into()).collect())?;
3839    let out_struct = out_df.into_struct(name.as_str().into());
3840    Ok(Some(Column::new(name, out_struct.into_series())))
3841}
3842
3843/// from_csv(str_col, schema) - parse CSV string to struct (PySpark from_csv). Minimal: split by comma, up to 32 columns.
3844pub fn apply_from_csv(column: Column) -> PolarsResult<Option<Column>> {
3845    let name = column.field().into_owned().name;
3846    let series = column.take_materialized_series();
3847    let ca = series
3848        .str()
3849        .map_err(|e| PolarsError::ComputeError(format!("from_csv: {e}").into()))?;
3850    const MAX_COLS: usize = 32;
3851    let mut columns: Vec<Vec<Option<String>>> = (0..MAX_COLS).map(|_| Vec::new()).collect();
3852    for opt_s in ca.into_iter() {
3853        let parts: Vec<&str> = opt_s
3854            .map(|s| s.split(',').collect::<Vec<_>>())
3855            .unwrap_or_default();
3856        for (i, col) in columns.iter_mut().enumerate().take(MAX_COLS) {
3857            let v = parts.get(i).map(|p| (*p).to_string());
3858            col.push(v);
3859        }
3860    }
3861    let field_series: Vec<Series> = (0..MAX_COLS)
3862        .map(|i| Series::new(format!("_c{i}").into(), columns[i].clone()))
3863        .collect();
3864    let out_df = DataFrame::new_infer_height(field_series.into_iter().map(|s| s.into()).collect())?;
3865    let out_series = out_df.into_struct(name.as_str().into()).into_series();
3866    Ok(Some(Column::new(name, out_series)))
3867}
3868
3869/// to_csv(struct_col) - format struct as CSV string (PySpark to_csv). Minimal: uses struct cast to string.
3870pub fn apply_to_csv(column: Column) -> PolarsResult<Option<Column>> {
3871    let name = column.field().into_owned().name;
3872    let series = column.take_materialized_series();
3873    let out = series.cast(&DataType::String)?;
3874    Ok(Some(Column::new(name, out)))
3875}
3876
3877/// parse_url(url_str, part, key) - extract URL component (PySpark parse_url).
3878/// When part is QUERY/QUERYSTRING and key is Some(k), returns the value for that query parameter only.
3879pub fn apply_parse_url(
3880    column: Column,
3881    part: &str,
3882    key: Option<&str>,
3883) -> PolarsResult<Option<Column>> {
3884    use url::Url;
3885    let name = column.field().into_owned().name;
3886    let series = column.take_materialized_series();
3887    let ca = series
3888        .str()
3889        .map_err(|e| PolarsError::ComputeError(format!("parse_url: {e}").into()))?;
3890    let part_upper = part.trim().to_uppercase();
3891    let key_owned = key.map(String::from);
3892    let out = StringChunked::from_iter_options(
3893        name.as_str().into(),
3894        ca.into_iter().map(|opt_s| {
3895            opt_s.and_then(|s| {
3896                let u = Url::parse(s).ok()?;
3897                let out: Option<String> = match part_upper.as_str() {
3898                    "PROTOCOL" | "PROT" => Some(u.scheme().to_string()),
3899                    "HOST" => u.host_str().map(String::from),
3900                    "PATH" | "FILE" | "PATHNAME" => Some(u.path().to_string()),
3901                    "QUERY" | "REF" | "QUERYSTRING" => {
3902                        if let Some(ref k) = key_owned {
3903                            u.query_pairs()
3904                                .find(|(name, _)| name.as_ref() == k.as_str())
3905                                .map(|(_, value)| value.into_owned())
3906                        } else {
3907                            u.query().map(String::from)
3908                        }
3909                    }
3910                    "USERINFO" => Some(format!("{}:{}", u.username(), u.password().unwrap_or(""))),
3911                    "AUTHORITY" => u.host_str().map(|h| h.to_string()),
3912                    _ => None,
3913                };
3914                out
3915            })
3916        }),
3917    );
3918    Ok(Some(Column::new(name, out.into_series())))
3919}
3920
3921/// hash one column (PySpark hash) - uses Murmur3 32-bit for parity with PySpark.
3922pub fn apply_hash_one(column: Column) -> PolarsResult<Option<Column>> {
3923    let name = column.field().into_owned().name;
3924    let series = column.take_materialized_series();
3925    let out = Int64Chunked::from_iter_options(name.as_str().into(), series_to_hash_iter(series));
3926    Ok(Some(Column::new(name, out.into_series())))
3927}
3928
3929/// hash struct (multiple columns combined) - PySpark hash (Murmur3).
3930pub fn apply_hash_struct(column: Column) -> PolarsResult<Option<Column>> {
3931    let name = column.field().into_owned().name;
3932    let series = column.take_materialized_series();
3933    let out = Int64Chunked::from_iter_options(name.as_str().into(), series_to_hash_iter(series));
3934    Ok(Some(Column::new(name, out.into_series())))
3935}
3936
3937fn series_to_hash_iter(series: Series) -> impl Iterator<Item = Option<i64>> {
3938    use std::io::Cursor;
3939    (0..series.len()).map(move |i| {
3940        let av = series.get(i).ok()?;
3941        let bytes = any_value_to_hash_bytes(&av);
3942        let h = murmur3::murmur3_32(&mut Cursor::new(bytes), 0).ok()?;
3943        // PySpark hash returns 32-bit signed int; we store as i64 for column type
3944        Some(h as i32 as i64)
3945    })
3946}
3947
3948fn any_value_to_hash_bytes(av: &polars::datatypes::AnyValue) -> Vec<u8> {
3949    use polars::datatypes::AnyValue;
3950    let mut buf = Vec::new();
3951    match av {
3952        AnyValue::Null => buf.push(0),
3953        AnyValue::Boolean(v) => buf.push(*v as u8),
3954        AnyValue::Int32(v) => buf.extend_from_slice(&v.to_le_bytes()),
3955        AnyValue::Int64(v) => buf.extend_from_slice(&v.to_le_bytes()),
3956        AnyValue::UInt32(v) => buf.extend_from_slice(&v.to_le_bytes()),
3957        AnyValue::UInt64(v) => buf.extend_from_slice(&v.to_le_bytes()),
3958        AnyValue::Float32(v) => buf.extend_from_slice(&v.to_bits().to_le_bytes()),
3959        AnyValue::Float64(v) => buf.extend_from_slice(&v.to_bits().to_le_bytes()),
3960        AnyValue::String(v) => buf.extend_from_slice(v.as_bytes()),
3961        AnyValue::Binary(v) => buf.extend_from_slice(v),
3962        _ => buf.extend_from_slice(av.to_string().as_bytes()),
3963    }
3964    buf
3965}
3966
3967/// Build array [start, start+step, ...] up to but not past stop (PySpark sequence).
3968/// Input column is a struct with fields "0"=start, "1"=stop, "2"=step (step optional, default 1).
3969pub fn apply_sequence(column: Column) -> PolarsResult<Option<Column>> {
3970    use polars::chunked_array::builder::get_list_builder;
3971    let name = column.field().into_owned().name;
3972    let series = column.take_materialized_series();
3973    let st = series.struct_().map_err(|e| {
3974        PolarsError::ComputeError(format!("sequence: expected struct column: {e}").into())
3975    })?;
3976    let start_s = st
3977        .field_by_name("0")
3978        .map_err(|e| PolarsError::ComputeError(format!("sequence field 0: {e}").into()))?;
3979    let stop_s = st
3980        .field_by_name("1")
3981        .map_err(|e| PolarsError::ComputeError(format!("sequence field 1: {e}").into()))?;
3982    let step_s = st.field_by_name("2").ok(); // optional
3983    let start_series = start_s
3984        .cast(&DataType::Int64)
3985        .map_err(|e| PolarsError::ComputeError(format!("sequence start: {e}").into()))?;
3986    let stop_series = stop_s
3987        .cast(&DataType::Int64)
3988        .map_err(|e| PolarsError::ComputeError(format!("sequence stop: {e}").into()))?;
3989    let step_series_opt: Option<Series> = step_s
3990        .as_ref()
3991        .map(|s| s.cast(&DataType::Int64))
3992        .transpose()
3993        .map_err(|e| PolarsError::ComputeError(format!("sequence step: {e}").into()))?;
3994    let start_ca = start_series
3995        .i64()
3996        .map_err(|e| PolarsError::ComputeError(format!("sequence: {e}").into()))?;
3997    let stop_ca = stop_series
3998        .i64()
3999        .map_err(|e| PolarsError::ComputeError(format!("sequence: {e}").into()))?;
4000    let step_ca = step_series_opt.as_ref().and_then(|s| s.i64().ok());
4001    let n = start_ca.len();
4002    let mut builder = get_list_builder(&DataType::Int64, 64, n, name.as_str().into());
4003    for i in 0..n {
4004        let start_v = start_ca.get(i);
4005        let stop_v = stop_ca.get(i);
4006        let step_v: Option<i64> = step_ca.as_ref().and_then(|ca| ca.get(i)).or(Some(1));
4007        match (start_v, stop_v, step_v) {
4008            (Some(s), Some(st), Some(step)) if step != 0 => {
4009                let mut vals: Vec<i64> = Vec::new();
4010                if step > 0 {
4011                    let mut v = s;
4012                    while v <= st {
4013                        vals.push(v);
4014                        v += step;
4015                    }
4016                } else {
4017                    let mut v = s;
4018                    while v >= st {
4019                        vals.push(v);
4020                        v += step;
4021                    }
4022                }
4023                let series = Series::new("".into(), vals);
4024                builder.append_series(&series)?;
4025            }
4026            _ => builder.append_null(),
4027        }
4028    }
4029    Ok(Some(Column::new(name, builder.finish().into_series())))
4030}
4031
4032/// Replace or add a struct field (PySpark withField). Used when Polars 0.53+ no longer accepts "*" in with_fields.
4033pub fn apply_struct_with_field(
4034    struct_col: Column,
4035    value_col: Column,
4036    field_name: &str,
4037) -> PolarsResult<Option<Column>> {
4038    use polars::chunked_array::StructChunked;
4039    let name = struct_col.field().into_owned().name;
4040    let struct_series = struct_col.take_materialized_series();
4041    let st = struct_series.struct_().map_err(|e| {
4042        PolarsError::ComputeError(format!("with_field: expected struct column: {e}").into())
4043    })?;
4044    let len = st.len();
4045    let fields_series = st.fields_as_series();
4046    let value_series = value_col.take_materialized_series();
4047    let mut new_fields: Vec<Series> = Vec::with_capacity(fields_series.len() + 1);
4048    let mut replaced = false;
4049    for s in &fields_series {
4050        let fname = s.name().as_str();
4051        let new_s = if fname == field_name {
4052            replaced = true;
4053            let mut v = value_series.clone();
4054            v.rename(PlSmallStr::from(field_name));
4055            v
4056        } else {
4057            s.clone()
4058        };
4059        new_fields.push(new_s);
4060    }
4061    if !replaced {
4062        new_fields.push(value_series);
4063    }
4064    let out = StructChunked::from_series(name.as_str().into(), len, new_fields.iter())
4065        .map_err(|e| PolarsError::ComputeError(format!("with_field: build struct: {e}").into()))?;
4066    Ok(Some(Column::new(name, out.into_series())))
4067}
4068
4069/// Random permutation of list elements (PySpark shuffle). Uses rand::seq::SliceRandom.
4070pub fn apply_shuffle(column: Column) -> PolarsResult<Option<Column>> {
4071    use polars::chunked_array::builder::get_list_builder;
4072    use rand::seq::SliceRandom;
4073    let name = column.field().into_owned().name;
4074    let series = column.take_materialized_series();
4075    let list_ca = series
4076        .list()
4077        .map_err(|e| PolarsError::ComputeError(format!("shuffle: {e}").into()))?;
4078    let inner_dtype = list_ca.inner_dtype().clone();
4079    let mut builder = get_list_builder(&inner_dtype, 64, list_ca.len(), name.as_str().into());
4080    for opt_list in list_ca.amortized_iter() {
4081        match opt_list {
4082            None => builder.append_null(),
4083            Some(amort) => {
4084                let list_s = amort.as_ref();
4085                let n = list_s.len();
4086                let mut indices: Vec<u32> = (0..n as u32).collect();
4087                indices.shuffle(&mut rand::thread_rng());
4088                let idx_ca = UInt32Chunked::from_vec("".into(), indices);
4089                let taken = list_s
4090                    .take(&idx_ca)
4091                    .map_err(|e| PolarsError::ComputeError(format!("shuffle take: {e}").into()))?;
4092                builder.append_series(&taken)?;
4093            }
4094        }
4095    }
4096    Ok(Some(Column::new(name, builder.finish().into_series())))
4097}
4098
4099// --- Bitmap (PySpark 3.5+) ---
4100
4101/// Size of bitmap in bytes (32768 bits).
4102const BITMAP_BYTES: usize = 4096;
4103
4104/// Parse string to date (PySpark cast string->date). Accepts "YYYY-MM-DD" and "YYYY-MM-DD HH:MM:SS" (truncates to date).
4105fn parse_str_to_date(s: &str) -> Option<chrono::NaiveDate> {
4106    use chrono::{NaiveDate, NaiveDateTime};
4107    let s = s.trim();
4108    NaiveDate::parse_from_str(s, "%Y-%m-%d")
4109        .ok()
4110        .or_else(|| {
4111            if s.len() >= 10 {
4112                NaiveDate::parse_from_str(&s[0..10], "%Y-%m-%d").ok()
4113            } else {
4114                None
4115            }
4116        })
4117        .or_else(|| {
4118            NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
4119                .ok()
4120                .map(|dt| dt.date())
4121        })
4122}
4123
4124/// Parse string to integer (Spark parity). Empty/whitespace/invalid -> None. Otherwise parse as i64.
4125fn parse_str_to_int(s: &str) -> Option<i64> {
4126    let s = s.trim();
4127    if s.is_empty() {
4128        return None;
4129    }
4130    s.parse::<i64>().ok()
4131}
4132
4133/// Apply string-to-int cast. Handles string columns: empty/invalid -> null (Spark parity); passes through int columns; others error (strict) or null.
4134pub fn apply_string_to_int(
4135    column: Column,
4136    strict: bool,
4137    target: DataType,
4138) -> PolarsResult<Option<Column>> {
4139    let name = column.field().into_owned().name;
4140    let series = column.take_materialized_series();
4141    let out: Series = match series.dtype() {
4142        DataType::String => {
4143            let ca = series
4144                .str()
4145                .map_err(|e| PolarsError::ComputeError(format!("string to int: {e}").into()))?;
4146            let mut results: Vec<Option<i64>> = Vec::with_capacity(ca.len());
4147            for opt_s in ca.into_iter() {
4148                let v = opt_s.and_then(parse_str_to_int);
4149                if strict {
4150                    if let Some(s) = opt_s {
4151                        if v.is_none() {
4152                            return Err(PolarsError::ComputeError(
4153                                format!(
4154                                    "conversion from `str` to `{}` failed in column '{}' for value \"{s}\"",
4155                                    target,
4156                                    name.as_str()
4157                                )
4158                                .into(),
4159                            ));
4160                        }
4161                    }
4162                }
4163                results.push(v);
4164            }
4165            match &target {
4166                DataType::Int32 => {
4167                    let vals: Vec<Option<i32>> = results
4168                        .into_iter()
4169                        .map(|o| {
4170                            o.and_then(|n| {
4171                                (n >= i64::from(i32::MIN) && n <= i64::from(i32::MAX))
4172                                    .then_some(n as i32)
4173                            })
4174                        })
4175                        .collect();
4176                    let chunked =
4177                        Int32Chunked::from_iter_options(name.as_str().into(), vals.into_iter());
4178                    chunked.into_series()
4179                }
4180                DataType::Int64 => {
4181                    let chunked =
4182                        Int64Chunked::from_iter_options(name.as_str().into(), results.into_iter());
4183                    chunked.into_series()
4184                }
4185                _ => unreachable!("target is Int32 or Int64"),
4186            }
4187        }
4188        DataType::Int32 | DataType::Int64 => series.cast(&target)?,
4189        _ => {
4190            if strict {
4191                return Err(PolarsError::ComputeError(
4192                    format!(
4193                        "casting from {} to {} not supported",
4194                        series.dtype(),
4195                        target
4196                    )
4197                    .into(),
4198                ));
4199            }
4200            Series::new_null(name.clone(), series.len()).cast(&target)?
4201        }
4202    };
4203    Ok(Some(Column::new(name, out)))
4204}
4205
4206/// Parse string to boolean (PySpark cast string->boolean). "true"/"false"/"1"/"0" case-insensitive.
4207/// For try_cast: invalid strings -> null. For cast: invalid strings -> error.
4208fn parse_str_to_bool(s: &str, strict: bool) -> Option<bool> {
4209    let lower = s.trim().to_lowercase();
4210    match lower.as_str() {
4211        "true" | "1" | "yes" => Some(true),
4212        "false" | "0" | "no" => Some(false),
4213        _ if strict => None, // will error
4214        _ => None,
4215    }
4216}
4217
4218/// Apply string-to-boolean cast. Handles string columns; passes through boolean; numeric types
4219/// (0/0.0 -> false, non-zero -> true for PySpark parity #399); null for others (try_cast) or error (cast).
4220pub fn apply_string_to_boolean(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
4221    let name = column.field().into_owned().name;
4222    let series = column.take_materialized_series();
4223    let out: BooleanChunked = match series.dtype() {
4224        DataType::String => {
4225            let ca = series
4226                .str()
4227                .map_err(|e| PolarsError::ComputeError(format!("string to boolean: {e}").into()))?;
4228            let mut results = Vec::with_capacity(ca.len());
4229            for opt_s in ca.into_iter() {
4230                let v = match opt_s {
4231                    None => None,
4232                    Some(s) => {
4233                        let parsed = parse_str_to_bool(s, strict);
4234                        if strict && parsed.is_none() {
4235                            return Err(PolarsError::ComputeError(
4236                                format!("casting from string to boolean failed for value '{s}'")
4237                                    .into(),
4238                            ));
4239                        }
4240                        parsed
4241                    }
4242                };
4243                results.push(v);
4244            }
4245            BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter())
4246        }
4247        DataType::Boolean => {
4248            let ca = series
4249                .bool()
4250                .map_err(|e| PolarsError::ComputeError(format!("boolean: {e}").into()))?;
4251            BooleanChunked::from_iter_options(name.as_str().into(), ca.into_iter())
4252        }
4253        DataType::Int8 => {
4254            let ca = series
4255                .i8()
4256                .map_err(|e| PolarsError::ComputeError(format!("i8: {e}").into()))?;
4257            BooleanChunked::from_iter_options(
4258                name.as_str().into(),
4259                ca.into_iter().map(|o| o.map(|v| v != 0)),
4260            )
4261        }
4262        DataType::Int16 => {
4263            let ca = series
4264                .i16()
4265                .map_err(|e| PolarsError::ComputeError(format!("i16: {e}").into()))?;
4266            BooleanChunked::from_iter_options(
4267                name.as_str().into(),
4268                ca.into_iter().map(|o| o.map(|v| v != 0)),
4269            )
4270        }
4271        DataType::Int32 => {
4272            let ca = series
4273                .i32()
4274                .map_err(|e| PolarsError::ComputeError(format!("i32: {e}").into()))?;
4275            BooleanChunked::from_iter_options(
4276                name.as_str().into(),
4277                ca.into_iter().map(|o| o.map(|v| v != 0)),
4278            )
4279        }
4280        DataType::Int64 => {
4281            let ca = series
4282                .i64()
4283                .map_err(|e| PolarsError::ComputeError(format!("i64: {e}").into()))?;
4284            BooleanChunked::from_iter_options(
4285                name.as_str().into(),
4286                ca.into_iter().map(|o| o.map(|v| v != 0)),
4287            )
4288        }
4289        DataType::UInt8 => {
4290            let ca = series
4291                .u8()
4292                .map_err(|e| PolarsError::ComputeError(format!("u8: {e}").into()))?;
4293            BooleanChunked::from_iter_options(
4294                name.as_str().into(),
4295                ca.into_iter().map(|o| o.map(|v| v != 0)),
4296            )
4297        }
4298        DataType::UInt16 => {
4299            let ca = series
4300                .u16()
4301                .map_err(|e| PolarsError::ComputeError(format!("u16: {e}").into()))?;
4302            BooleanChunked::from_iter_options(
4303                name.as_str().into(),
4304                ca.into_iter().map(|o| o.map(|v| v != 0)),
4305            )
4306        }
4307        DataType::UInt32 => {
4308            let ca = series
4309                .u32()
4310                .map_err(|e| PolarsError::ComputeError(format!("u32: {e}").into()))?;
4311            BooleanChunked::from_iter_options(
4312                name.as_str().into(),
4313                ca.into_iter().map(|o| o.map(|v| v != 0)),
4314            )
4315        }
4316        DataType::UInt64 => {
4317            let ca = series
4318                .u64()
4319                .map_err(|e| PolarsError::ComputeError(format!("u64: {e}").into()))?;
4320            BooleanChunked::from_iter_options(
4321                name.as_str().into(),
4322                ca.into_iter().map(|o| o.map(|v| v != 0)),
4323            )
4324        }
4325        DataType::Float32 => {
4326            let ca = series
4327                .f32()
4328                .map_err(|e| PolarsError::ComputeError(format!("f32: {e}").into()))?;
4329            BooleanChunked::from_iter_options(
4330                name.as_str().into(),
4331                ca.into_iter().map(|o| o.map(|v| v != 0.0)),
4332            )
4333        }
4334        DataType::Float64 => {
4335            let ca = series
4336                .f64()
4337                .map_err(|e| PolarsError::ComputeError(format!("f64: {e}").into()))?;
4338            BooleanChunked::from_iter_options(
4339                name.as_str().into(),
4340                ca.into_iter().map(|o| o.map(|v| v != 0.0)),
4341            )
4342        }
4343        _ => {
4344            if strict {
4345                return Err(PolarsError::ComputeError(
4346                    format!("casting from {} to boolean not supported", series.dtype()).into(),
4347                ));
4348            }
4349            BooleanChunked::from_iter_options(name.as_str().into(), (0..series.len()).map(|_| None))
4350        }
4351    };
4352    Ok(Some(Column::new(name, out.into_series())))
4353}
4354
4355/// Parse string to f64 (Spark-style to_number/try_to_number for doubles).
4356fn parse_str_to_double(s: &str) -> Option<f64> {
4357    let s = s.trim();
4358    if s.is_empty() {
4359        return None;
4360    }
4361    s.parse::<f64>().ok()
4362}
4363
4364/// Apply string-to-double cast. Handles string columns: empty/invalid -> null (Spark parity);
4365/// passes through numeric columns; others error (strict) or null.
4366pub fn apply_string_to_double(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
4367    let name = column.field().into_owned().name;
4368    let series = column.take_materialized_series();
4369    let out: Series = match series.dtype() {
4370        DataType::String => {
4371            let ca = series
4372                .str()
4373                .map_err(|e| PolarsError::ComputeError(format!("string to double: {e}").into()))?;
4374            let mut results: Vec<Option<f64>> = Vec::with_capacity(ca.len());
4375            for opt_s in ca.into_iter() {
4376                let v = opt_s.and_then(parse_str_to_double);
4377                if strict {
4378                    if let Some(s) = opt_s {
4379                        if v.is_none() {
4380                            return Err(PolarsError::ComputeError(
4381                                format!(
4382                                    "conversion from `str` to `double` failed in column '{}' for value \"{s}\"",
4383                                    name.as_str()
4384                                )
4385                                .into(),
4386                            ));
4387                        }
4388                    }
4389                }
4390                results.push(v);
4391            }
4392            Float64Chunked::from_iter_options(name.as_str().into(), results.into_iter())
4393                .into_series()
4394        }
4395        DataType::Float32 | DataType::Float64 => series.cast(&DataType::Float64)?,
4396        DataType::Int32 | DataType::Int64 | DataType::UInt32 | DataType::UInt64 => {
4397            series.cast(&DataType::Float64)?
4398        }
4399        DataType::Null => {
4400            // PySpark parity (#260): lit(None).cast("double") yields typed null column.
4401            Float64Chunked::from_iter_options(
4402                name.as_str().into(),
4403                (0..series.len()).map(|_| None::<f64>),
4404            )
4405            .into_series()
4406        }
4407        _ => {
4408            if strict {
4409                return Err(PolarsError::ComputeError(
4410                    format!("casting from {} to double not supported", series.dtype()).into(),
4411                ));
4412            }
4413            Float64Chunked::from_iter_options(
4414                name.as_str().into(),
4415                (0..series.len()).map(|_| None::<f64>),
4416            )
4417            .into_series()
4418        }
4419    };
4420    Ok(Some(Column::new(name, out)))
4421}
4422
4423/// Apply string-to-date cast. Handles string columns (accepts date and datetime strings, Spark parity); passes through date; casts datetime to date; others error (cast) or null (try_cast).
4424pub fn apply_string_to_date(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
4425    let name = column.field().into_owned().name;
4426    let series = column.take_materialized_series();
4427    let epoch = robin_sparkless_core::date_utils::epoch_naive_date();
4428    let out: Series = match series.dtype() {
4429        DataType::String => {
4430            let ca = series
4431                .str()
4432                .map_err(|e| PolarsError::ComputeError(format!("string to date: {e}").into()))?;
4433            let mut results = Vec::with_capacity(ca.len());
4434            for opt_s in ca.into_iter() {
4435                let v =
4436                    opt_s.and_then(|s| parse_str_to_date(s).map(|d| (d - epoch).num_days() as i32));
4437                if strict {
4438                    if let Some(s) = opt_s {
4439                        if v.is_none() {
4440                            return Err(PolarsError::ComputeError(
4441                                format!(
4442                                    "conversion from `str` to `date` failed in column '{}' for value \"{s}\"",
4443                                    name.as_str()
4444                                )
4445                                .into(),
4446                            ));
4447                        }
4448                    }
4449                }
4450                results.push(v);
4451            }
4452            let chunked =
4453                Int32Chunked::from_iter_options(name.as_str().into(), results.into_iter());
4454            chunked.into_series().cast(&DataType::Date)?
4455        }
4456        DataType::Date => series,
4457        DataType::Datetime(_, _) => series.cast(&DataType::Date)?,
4458        DataType::Null => {
4459            // PySpark parity (#260): lit(None).cast("date") yields typed null column.
4460            let days = Int32Chunked::from_iter_options(
4461                name.as_str().into(),
4462                (0..series.len()).map(|_| None::<i32>),
4463            );
4464            days.into_series().cast(&DataType::Date)?
4465        }
4466        _ => {
4467            if strict {
4468                return Err(PolarsError::ComputeError(
4469                    format!("casting from {} to date not supported", series.dtype()).into(),
4470                ));
4471            }
4472            let days = Int32Chunked::from_iter_options(
4473                name.as_str().into(),
4474                (0..series.len()).map(|_| None::<i32>),
4475            );
4476            days.into_series().cast(&DataType::Date)?
4477        }
4478    };
4479    Ok(Some(Column::new(name, out)))
4480}
4481
4482/// Apply string-to-date with optional format (PySpark to_date(col, format)). When format is None uses default parsing; when Some parses with given format.
4483pub fn apply_string_to_date_format(
4484    column: Column,
4485    format: Option<&str>,
4486    strict: bool,
4487) -> PolarsResult<Option<Column>> {
4488    let name = column.field().into_owned().name;
4489    let series = column.take_materialized_series();
4490    let epoch = robin_sparkless_core::date_utils::epoch_naive_date();
4491    let out: Series = match series.dtype() {
4492        DataType::String => {
4493            let ca = series
4494                .str()
4495                .map_err(|e| PolarsError::ComputeError(format!("string to date: {e}").into()))?;
4496            let fmt = format.map(pyspark_format_to_chrono);
4497            let mut results = Vec::with_capacity(ca.len());
4498            for opt_s in ca.into_iter() {
4499                let v = opt_s.and_then(|s| {
4500                    let parsed = if let Some(ref chrono_fmt) = fmt {
4501                        chrono::NaiveDate::parse_from_str(s.trim(), chrono_fmt).ok()
4502                    } else {
4503                        parse_str_to_date(s)
4504                    };
4505                    parsed.map(|d| (d.signed_duration_since(epoch).num_days()) as i32)
4506                });
4507                if strict {
4508                    if let Some(s) = opt_s {
4509                        if v.is_none() {
4510                            return Err(PolarsError::ComputeError(
4511                                format!(
4512                                    "to_date failed in column '{}' for value \"{s}\"",
4513                                    name.as_str()
4514                                )
4515                                .into(),
4516                            ));
4517                        }
4518                    }
4519                }
4520                results.push(v);
4521            }
4522            let chunked =
4523                Int32Chunked::from_iter_options(name.as_str().into(), results.into_iter());
4524            chunked.into_series().cast(&DataType::Date)?
4525        }
4526        DataType::Date => series,
4527        DataType::Datetime(_, _) => series.cast(&DataType::Date)?,
4528        DataType::Null => {
4529            let days = Int32Chunked::from_iter_options(
4530                name.as_str().into(),
4531                (0..series.len()).map(|_| None::<i32>),
4532            );
4533            days.into_series().cast(&DataType::Date)?
4534        }
4535        _ => {
4536            if strict {
4537                return Err(PolarsError::ComputeError(
4538                    format!("to_date from {} not supported", series.dtype()).into(),
4539                ));
4540            }
4541            let days = Int32Chunked::from_iter_options(
4542                name.as_str().into(),
4543                (0..series.len()).map(|_| None::<i32>),
4544            );
4545            days.into_series().cast(&DataType::Date)?
4546        }
4547    };
4548    Ok(Some(Column::new(name, out)))
4549}
4550
4551/// Count set bits in a bitmap (binary column). PySpark bitmap_count.
4552pub fn apply_bitmap_count(column: Column) -> PolarsResult<Option<Column>> {
4553    let name = column.field().into_owned().name;
4554    let series = column.take_materialized_series();
4555    let ca = series
4556        .binary()
4557        .map_err(|e| PolarsError::ComputeError(format!("bitmap_count: {e}").into()))?;
4558    let out = Int64Chunked::from_iter_options(
4559        name.as_str().into(),
4560        ca.into_iter().map(|opt_b| {
4561            opt_b.map(|b| b.iter().map(|&byte| byte.count_ones() as i64).sum::<i64>())
4562        }),
4563    );
4564    Ok(Some(Column::new(name, out.into_series())))
4565}
4566
4567/// Build one bitmap from a list of bit positions (0..32767). Used after implode for bitmap_construct_agg.
4568pub fn apply_bitmap_construct_agg(column: Column) -> PolarsResult<Option<Column>> {
4569    let name = column.field().into_owned().name;
4570    let series = column.take_materialized_series();
4571    let list_ca = series
4572        .list()
4573        .map_err(|e| PolarsError::ComputeError(format!("bitmap_construct_agg: {e}").into()))?;
4574    let out: BinaryChunked = list_ca
4575        .amortized_iter()
4576        .map(|opt_list| {
4577            opt_list.and_then(|list_series| {
4578                let mut buf = vec![0u8; BITMAP_BYTES];
4579                let ca = list_series.as_ref().i64().ok()?;
4580                for pos in ca.into_iter().flatten() {
4581                    let pos = pos as usize;
4582                    if pos < BITMAP_BYTES * 8 {
4583                        let byte_idx = pos / 8;
4584                        let bit_idx = pos % 8;
4585                        buf[byte_idx] |= 1 << bit_idx;
4586                    }
4587                }
4588                Some(bytes::Bytes::from(buf))
4589            })
4590        })
4591        .collect();
4592    Ok(Some(Column::new(name, out.into_series())))
4593}
4594
4595/// Bitwise OR of a list of bitmaps (binary). Used after implode for bitmap_or_agg.
4596pub fn apply_bitmap_or_agg(column: Column) -> PolarsResult<Option<Column>> {
4597    let name = column.field().into_owned().name;
4598    let series = column.take_materialized_series();
4599    let list_ca = series
4600        .list()
4601        .map_err(|e| PolarsError::ComputeError(format!("bitmap_or_agg: {e}").into()))?;
4602    let out: BinaryChunked = list_ca
4603        .amortized_iter()
4604        .map(|opt_list| {
4605            opt_list.and_then(|list_series| {
4606                let list_c = list_series.as_ref().as_list();
4607                let mut buf = vec![0u8; BITMAP_BYTES];
4608                for opt_bin in list_c.amortized_iter().flatten() {
4609                    let bin_ca: &BinaryChunked = opt_bin.as_ref().binary().ok()?;
4610                    for b in bin_ca.into_iter().flatten() {
4611                        let b: &[u8] = b;
4612                        for (i, &byte) in b.iter().take(BITMAP_BYTES).enumerate() {
4613                            buf[i] |= byte;
4614                        }
4615                    }
4616                }
4617                Some(bytes::Bytes::from(buf))
4618            })
4619        })
4620        .collect();
4621    Ok(Some(Column::new(name, out.into_series())))
4622}
4623
4624// --- to_timestamp_ltz / to_timestamp_ntz ---
4625
4626/// Parse string as timestamp in local timezone, return UTC micros (PySpark to_timestamp_ltz).
4627pub fn apply_to_timestamp_ltz_format(
4628    column: Column,
4629    format: Option<&str>,
4630    strict: bool,
4631) -> PolarsResult<Option<Column>> {
4632    use chrono::offset::TimeZone;
4633    use chrono::{Local, NaiveDateTime, Utc};
4634    use polars::datatypes::TimeUnit;
4635    let chrono_fmt = format
4636        .map(pyspark_format_to_chrono)
4637        .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
4638    let name = column.field().into_owned().name;
4639    let series = column.take_materialized_series();
4640    let ca = series
4641        .str()
4642        .map_err(|e| PolarsError::ComputeError(format!("to_timestamp_ltz: {e}").into()))?;
4643    let out = Int64Chunked::from_iter_options(
4644        name.as_str().into(),
4645        ca.into_iter().map(|opt_s| {
4646            opt_s.and_then(|s| {
4647                NaiveDateTime::parse_from_str(s, &chrono_fmt)
4648                    .ok()
4649                    .and_then(|ndt| {
4650                        Local
4651                            .from_local_datetime(&ndt)
4652                            .single()
4653                            .map(|dt| dt.with_timezone(&Utc).timestamp_micros())
4654                    })
4655            })
4656        }),
4657    );
4658    let out_series = out
4659        .into_series()
4660        .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
4661    let _ = strict;
4662    Ok(Some(Column::new(name, out_series)))
4663}
4664
4665/// Parse string as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
4666pub fn apply_to_timestamp_ntz_format(
4667    column: Column,
4668    format: Option<&str>,
4669    strict: bool,
4670) -> PolarsResult<Option<Column>> {
4671    use chrono::NaiveDateTime;
4672    use polars::datatypes::TimeUnit;
4673    let chrono_fmt = format
4674        .map(pyspark_format_to_chrono)
4675        .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
4676    let name = column.field().into_owned().name;
4677    let series = column.take_materialized_series();
4678    let ca = series
4679        .str()
4680        .map_err(|e| PolarsError::ComputeError(format!("to_timestamp_ntz: {e}").into()))?;
4681    let out = Int64Chunked::from_iter_options(
4682        name.as_str().into(),
4683        ca.into_iter().map(|opt_s| {
4684            opt_s.and_then(|s| {
4685                NaiveDateTime::parse_from_str(s, &chrono_fmt)
4686                    .ok()
4687                    .map(|ndt| ndt.and_utc().timestamp_micros())
4688            })
4689        }),
4690    );
4691    let out_series = out
4692        .into_series()
4693        .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
4694    let _ = strict;
4695    Ok(Some(Column::new(name, out_series)))
4696}