1use chrono::{Datelike, TimeZone};
5use chrono_tz::Tz;
6use polars::prelude::*;
7use regex::Regex;
8use std::borrow::Cow;
9
10pub fn apply_split_with_limit(
13 column: Column,
14 delimiter: &str,
15 limit: i32,
16) -> PolarsResult<Option<Column>> {
17 let name = column.field().into_owned().name;
18 let series = column.take_materialized_series();
19 let ca = series
20 .str()
21 .map_err(|e| PolarsError::ComputeError(format!("split_with_limit: {e}").into()))?;
22 let n = if limit <= 0 {
23 usize::MAX
24 } else {
25 limit as usize
26 };
27 let values_capacity = ca.len().saturating_mul(64);
28 let mut builder =
29 ListStringChunkedBuilder::new(name.as_str().into(), ca.len(), values_capacity);
30 for opt_s in ca.into_iter() {
31 match opt_s {
32 Some(s) => {
33 if delimiter.is_empty() {
34 builder.append_values_iter(s.split(delimiter));
36 } else {
37 builder.append_values_iter(s.splitn(n, delimiter));
38 }
39 }
40 None => builder.append_null(),
41 }
42 }
43 let out = builder.finish();
44 Ok(Some(Column::new(name, out.into_series())))
45}
46
47pub fn apply_split_part_regex(
49 column: Column,
50 pattern: &str,
51 part_num: i64,
52) -> PolarsResult<Option<Column>> {
53 let name = column.field().into_owned().name;
54 let series = column.take_materialized_series();
55 let ca = series
56 .str()
57 .map_err(|e| PolarsError::ComputeError(format!("split_part_regex: {e}").into()))?;
58 let re = Regex::new(pattern)
59 .map_err(|e| PolarsError::ComputeError(format!("split_part_regex pattern: {e}").into()))?;
60 let out = StringChunked::from_iter_options(
61 name.as_str().into(),
62 ca.into_iter().map(|opt_s| {
63 opt_s.map(|s| {
64 let parts: Vec<&str> = re.split(s).collect();
65 let idx = if part_num > 0 {
66 (part_num - 1) as usize
67 } else {
68 let n = parts.len() as i64;
69 (n + part_num) as usize
70 };
71 parts.get(idx).map(|&p| p.to_string()).unwrap_or_default()
72 })
73 }),
74 );
75 Ok(Some(Column::new(name, out.into_series())))
76}
77
78fn soundex_one(s: &str) -> Cow<'_, str> {
80 use soundex::american_soundex;
81 let code = american_soundex(s);
82 Cow::Owned(code.chars().take(4).collect::<String>())
83}
84
85pub fn apply_soundex(column: Column) -> PolarsResult<Option<Column>> {
87 let name = column.field().into_owned().name;
88 let series = column.take_materialized_series();
89 let ca = series
90 .str()
91 .map_err(|e| PolarsError::ComputeError(format!("soundex: {e}").into()))?;
92 let out: StringChunked = ca.apply_values(soundex_one);
93 Ok(Some(Column::new(name, out.into_series())))
94}
95
96pub fn apply_crc32(column: Column) -> PolarsResult<Option<Column>> {
98 use crc32fast::Hasher;
99 let name = column.field().into_owned().name;
100 let series = column.take_materialized_series();
101 let ca = series
102 .str()
103 .map_err(|e| PolarsError::ComputeError(format!("crc32: {e}").into()))?;
104 let out = Int64Chunked::from_iter_options(
105 name.as_str().into(),
106 ca.into_iter().map(|opt_s| {
107 opt_s.map(|s| {
108 let mut hasher = Hasher::new();
109 hasher.update(s.as_bytes());
110 hasher.finalize() as i64
111 })
112 }),
113 );
114 Ok(Some(Column::new(name, out.into_series())))
115}
116
117pub fn apply_xxhash64(column: Column) -> PolarsResult<Option<Column>> {
119 use std::hash::Hasher;
120 use twox_hash::XxHash64;
121 let name = column.field().into_owned().name;
122 let series = column.take_materialized_series();
123 let ca = series
124 .str()
125 .map_err(|e| PolarsError::ComputeError(format!("xxhash64: {e}").into()))?;
126 let out = Int64Chunked::from_iter_options(
127 name.as_str().into(),
128 ca.into_iter().map(|opt_s| {
129 opt_s.map(|s| {
130 let mut hasher = XxHash64::default();
131 hasher.write(s.as_bytes());
132 hasher.finish() as i64
133 })
134 }),
135 );
136 Ok(Some(Column::new(name, out.into_series())))
137}
138
139pub fn apply_levenshtein(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
141 use strsim::levenshtein;
142 if columns.len() < 2 {
143 return Err(PolarsError::ComputeError(
144 "levenshtein needs two columns".into(),
145 ));
146 }
147 let name = columns[0].field().into_owned().name;
148 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
149 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
150 let a_ca = a_series
151 .str()
152 .map_err(|e| PolarsError::ComputeError(format!("levenshtein: {e}").into()))?;
153 let b_ca = b_series
154 .str()
155 .map_err(|e| PolarsError::ComputeError(format!("levenshtein: {e}").into()))?;
156 let out = Int64Chunked::from_iter_options(
157 name.as_str().into(),
158 a_ca.into_iter().zip(b_ca).map(|(a, b)| match (a, b) {
159 (Some(a), Some(b)) => Some(levenshtein(a, b) as i64),
160 _ => None,
161 }),
162 );
163 Ok(Some(Column::new(name, out.into_series())))
164}
165
166pub fn apply_array_flatten(column: Column) -> PolarsResult<Option<Column>> {
168 let name = column.field().into_owned().name;
169 let series = column.take_materialized_series();
170 let list_ca = series
171 .list()
172 .map_err(|e| PolarsError::ComputeError(format!("array_flatten: {e}").into()))?;
173 let inner_dtype = match list_ca.inner_dtype() {
174 DataType::List(inner) => *inner.clone(),
175 other => other.clone(),
176 };
177 let out = list_ca.try_apply_amortized(|amort_s| {
178 let s = amort_s.as_ref();
179 let list_s = s.as_list();
180 if list_s.is_empty() {
181 return Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype));
182 }
183 let mut acc: Vec<Series> = Vec::new();
184 for elem in list_s.amortized_iter().flatten() {
185 acc.push(elem.deep_clone());
186 }
187 if acc.is_empty() {
188 Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
189 } else {
190 let mut result = acc.remove(0);
191 for s in acc {
192 result.extend(&s)?;
193 }
194 Ok(result)
195 }
196 })?;
197 Ok(Some(Column::new(name, out.into_series())))
198}
199
200pub fn apply_array_distinct_first_order(column: Column) -> PolarsResult<Option<Column>> {
202 let name = column.field().into_owned().name;
203 let series = column.take_materialized_series();
204 let list_ca = series
205 .list()
206 .map_err(|e| PolarsError::ComputeError(format!("array_distinct: {e}").into()))?;
207 let inner_dtype = list_ca.inner_dtype().clone();
208 let out = list_ca.try_apply_amortized(|amort_s| {
209 let list_s = amort_s.as_ref().as_list();
210 let mut result: Vec<Series> = Vec::new();
211 for elem in list_s.amortized_iter().flatten() {
212 let taken = elem.deep_clone();
213 let is_dup = result.iter().any(|s| s.get(0).ok() == taken.get(0).ok());
214 if !is_dup {
215 result.push(taken);
216 }
217 }
218 if result.is_empty() {
219 Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
220 } else {
221 let mut combined = result.remove(0);
222 for s in result {
223 combined.extend(&s)?;
224 }
225 Ok(combined)
226 }
227 })?;
228 Ok(Some(Column::new(name, out.into_series())))
229}
230
231pub fn apply_array_repeat(column: Column, n: i64) -> PolarsResult<Option<Column>> {
234 let name = column.field().into_owned().name;
235 let series = column.take_materialized_series();
236 let n_usize = n.max(0) as usize;
237
238 if !matches!(series.dtype(), DataType::List(_)) {
240 use polars::chunked_array::builder::get_list_builder;
241 let inner_dtype = series.dtype().clone();
242 let len = series.len();
243 let mut builder = get_list_builder(&inner_dtype, 64, len, name.as_str().into());
244 for i in 0..len {
245 let opt_av = series.get(i);
246 let elem_series = match opt_av {
247 Ok(av) => any_value_to_single_series(av, &inner_dtype)?,
248 Err(_) => Series::new_empty(PlSmallStr::EMPTY, &inner_dtype),
249 };
250 let mut repeated = elem_series.clone();
251 for _ in 1..n_usize {
252 repeated.extend(&elem_series)?;
253 }
254 builder.append_series(&repeated).map_err(|e| {
255 PolarsError::ComputeError(format!("array_repeat scalar: {e}").into())
256 })?;
257 }
258 let out = builder.finish().into_series();
259 return Ok(Some(Column::new(name, out)));
260 }
261
262 let list_ca = series
264 .list()
265 .map_err(|e| PolarsError::ComputeError(format!("array_repeat: {e}").into()))?;
266 let inner_dtype = list_ca.inner_dtype().clone();
267 let n = n.max(0) as usize;
268 let out = list_ca.try_apply_amortized(move |amort_s| {
269 let list_s = amort_s.as_ref().as_list();
270 let mut repeated: Vec<Series> = Vec::new();
271 for elem in list_s.amortized_iter().flatten() {
272 let taken = elem.deep_clone();
273 for _ in 0..n {
274 repeated.push(taken.clone());
275 }
276 }
277 if repeated.is_empty() {
278 Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
279 } else {
280 let mut result = repeated.remove(0);
281 for s in repeated {
282 result.extend(&s)?;
283 }
284 Ok(result)
285 }
286 })?;
287 Ok(Some(Column::new(name, out.into_series())))
288}
289
290fn any_value_to_single_series(av: AnyValue, dtype: &DataType) -> PolarsResult<Series> {
291 Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &[av], dtype, false)
292}
293
294pub fn apply_array_append(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
296 use std::cell::RefCell;
297 if columns.len() < 2 {
298 return Err(PolarsError::ComputeError(
299 "array_append needs two columns (array, element)".into(),
300 ));
301 }
302 let name = columns[0].field().into_owned().name;
303 let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
304 let elem_series = std::mem::take(&mut columns[1]).take_materialized_series();
305 let list_ca = list_series
306 .list()
307 .map_err(|e| PolarsError::ComputeError(format!("array_append: {e}").into()))?;
308 let inner_dtype = list_ca.inner_dtype().clone();
309 let elem_casted = elem_series.cast(&inner_dtype)?;
310 let elem_len = elem_casted.len();
311 let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
312 let idx = RefCell::new(0usize);
313 let out = list_ca.try_apply_amortized(|amort_s| {
314 let i = *idx.borrow();
315 *idx.borrow_mut() += 1;
316 let ei = if elem_len == 1 { 0 } else { i };
317 let list_s = amort_s.as_ref().as_list();
318 let mut acc: Vec<Series> = Vec::new();
319 for e in list_s.amortized_iter().flatten() {
320 acc.push(e.deep_clone());
321 }
322 if let Some(Some(av)) = elem_vec.get(ei) {
323 let single = any_value_to_single_series(av.clone(), &inner_dtype)?;
324 acc.push(single);
325 }
326 if acc.is_empty() {
327 Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
328 } else {
329 let mut result = acc.remove(0);
330 for s in acc {
331 result.extend(&s)?;
332 }
333 Ok(result)
334 }
335 })?;
336 Ok(Some(Column::new(name, out.into_series())))
337}
338
339pub fn apply_array_prepend(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
341 use std::cell::RefCell;
342 if columns.len() < 2 {
343 return Err(PolarsError::ComputeError(
344 "array_prepend needs two columns (array, element)".into(),
345 ));
346 }
347 let name = columns[0].field().into_owned().name;
348 let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
349 let elem_series = std::mem::take(&mut columns[1]).take_materialized_series();
350 let list_ca = list_series
351 .list()
352 .map_err(|e| PolarsError::ComputeError(format!("array_prepend: {e}").into()))?;
353 let inner_dtype = list_ca.inner_dtype().clone();
354 let elem_casted = elem_series.cast(&inner_dtype)?;
355 let elem_len = elem_casted.len();
356 let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
357 let idx = RefCell::new(0usize);
358 let out = list_ca.try_apply_amortized(|amort_s| {
359 let i = *idx.borrow();
360 *idx.borrow_mut() += 1;
361 let ei = if elem_len == 1 { 0 } else { i };
362 let list_s = amort_s.as_ref().as_list();
363 let mut acc: Vec<Series> = Vec::new();
364 if let Some(Some(av)) = elem_vec.get(ei) {
365 let single = any_value_to_single_series(av.clone(), &inner_dtype)?;
366 acc.push(single);
367 }
368 for e in list_s.amortized_iter().flatten() {
369 acc.push(e.deep_clone());
370 }
371 if acc.is_empty() {
372 Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
373 } else {
374 let mut result = acc.remove(0);
375 for s in acc {
376 result.extend(&s)?;
377 }
378 Ok(result)
379 }
380 })?;
381 Ok(Some(Column::new(name, out.into_series())))
382}
383
384pub fn apply_array_insert(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
386 use std::cell::RefCell;
387 if columns.len() < 3 {
388 return Err(PolarsError::ComputeError(
389 "array_insert needs three columns (array, position, element)".into(),
390 ));
391 }
392 let name = columns[0].field().into_owned().name;
393 let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
394 let pos_series = std::mem::take(&mut columns[1]).take_materialized_series();
395 let elem_series = std::mem::take(&mut columns[2]).take_materialized_series();
396 let list_ca = list_series
397 .list()
398 .map_err(|e| PolarsError::ComputeError(format!("array_insert: {e}").into()))?;
399 let inner_dtype = list_ca.inner_dtype().clone();
400 let pos_ca = pos_series
401 .cast(&DataType::Int64)?
402 .i64()
403 .map_err(|e| {
404 PolarsError::ComputeError(format!("array_insert: position column: {e}").into())
405 })?
406 .clone();
407 let elem_casted = elem_series.cast(&inner_dtype)?;
408 let pos_len = pos_ca.len();
409 let pos_vec: Vec<i64> = (0..pos_len).map(|i| pos_ca.get(i).unwrap_or(1)).collect();
410 let elem_len = elem_casted.len();
411 let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
412 let idx = RefCell::new(0usize);
413 let out = list_ca.try_apply_amortized(|amort_s| {
414 let i = *idx.borrow();
415 *idx.borrow_mut() += 1;
416 let pi = if pos_len == 1 { 0 } else { i };
417 let ei = if elem_len == 1 { 0 } else { i };
418 let list_s = amort_s.as_ref().as_list();
419 let pos_val = pos_vec.get(pi).copied().unwrap_or(1);
420 let mut acc: Vec<Series> = Vec::new();
421 for e in list_s.amortized_iter().flatten() {
422 acc.push(e.deep_clone());
423 }
424 let len = acc.len() as i64;
425 let pos = if pos_val < 0 {
426 (len + pos_val + 1).max(0).min(len) as usize
427 } else {
428 ((pos_val - 1).max(0)).min(len) as usize
429 };
430 let single = elem_vec
431 .get(ei)
432 .and_then(|o| o.as_ref())
433 .map(|av: &AnyValue| any_value_to_single_series(av.clone(), &inner_dtype));
434 if let Some(Ok(s)) = single {
435 acc.insert(pos, s);
436 }
437 if acc.is_empty() {
438 Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
439 } else {
440 let mut result = acc.remove(0);
441 for s in acc {
442 result.extend(&s)?;
443 }
444 Ok(result)
445 }
446 })?;
447 Ok(Some(Column::new(name, out.into_series())))
448}
449
450fn series_to_set_key(s: &Series) -> String {
451 if s.len() == 1 {
452 if let Ok(av) = s.get(0) {
453 return format!("{:?}", av);
454 }
455 }
456 std::string::ToString::to_string(s)
457}
458
459pub fn apply_array_except(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
461 if columns.len() < 2 {
462 return Err(PolarsError::ComputeError(
463 "array_except needs two columns (array1, array2)".into(),
464 ));
465 }
466 let name = columns[0].field().into_owned().name;
467 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
468 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
469 let a_ca = a_series
470 .list()
471 .map_err(|e| PolarsError::ComputeError(format!("array_except: {e}").into()))?;
472 let b_ca = b_series
473 .list()
474 .map_err(|e| PolarsError::ComputeError(format!("array_except: {e}").into()))?;
475 let inner_dtype = a_ca.inner_dtype().clone();
476 let mut builder = polars::chunked_array::builder::get_list_builder(
477 &inner_dtype,
478 64,
479 a_ca.len(),
480 name.as_str().into(),
481 );
482 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
483 match (opt_a, opt_b) {
484 (Some(a_amort), Some(b_amort)) => {
485 let a_list = a_amort.as_ref().as_list();
486 let b_list = b_amort.as_ref().as_list();
487 let b_keys: std::collections::HashSet<String> = b_list
488 .amortized_iter()
489 .flatten()
490 .map(|e| series_to_set_key(&e.deep_clone()))
491 .collect();
492 let mut acc: Vec<Series> = Vec::new();
493 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
494 for e in a_list.amortized_iter().flatten() {
495 let s = e.deep_clone();
496 let key = series_to_set_key(&s);
497 if !b_keys.contains(&key) && seen.insert(key) {
498 acc.push(s);
499 }
500 }
501 let result = if acc.is_empty() {
502 Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
503 } else {
504 let mut r = acc.remove(0);
505 for s in acc {
506 r.extend(&s)?;
507 }
508 r
509 };
510 builder.append_series(&result)?;
511 }
512 _ => builder.append_null(),
513 }
514 }
515 Ok(Some(Column::new(name, builder.finish().into_series())))
516}
517
518pub fn apply_arrays_overlap(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
520 if columns.len() < 2 {
521 return Err(PolarsError::ComputeError(
522 "arrays_overlap needs two columns (array1, array2)".into(),
523 ));
524 }
525 let name = columns[0].field().into_owned().name;
526 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
527 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
528 let a_ca = a_series
529 .list()
530 .map_err(|e| PolarsError::ComputeError(format!("arrays_overlap: {e}").into()))?;
531 let b_ca = b_series
532 .list()
533 .map_err(|e| PolarsError::ComputeError(format!("arrays_overlap: {e}").into()))?;
534 let mut results: Vec<bool> = Vec::with_capacity(a_ca.len());
535 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
536 let overlap = match (opt_a, opt_b) {
537 (Some(a_amort), Some(b_amort)) => {
538 let a_list = a_amort.as_ref().as_list();
539 let b_list = b_amort.as_ref().as_list();
540 let a_keys: std::collections::HashSet<String> = a_list
541 .amortized_iter()
542 .flatten()
543 .map(|e| series_to_set_key(&e.deep_clone()))
544 .collect();
545 let b_keys: std::collections::HashSet<String> = b_list
546 .amortized_iter()
547 .flatten()
548 .map(|e| series_to_set_key(&e.deep_clone()))
549 .collect();
550 !a_keys.is_disjoint(&b_keys)
551 }
552 _ => false,
553 };
554 results.push(overlap);
555 }
556 let out =
557 BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter().map(Some));
558 Ok(Some(Column::new(name, out.into_series())))
559}
560
561pub fn apply_arrays_zip(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
563 if columns.len() < 2 {
564 return Err(PolarsError::ComputeError(
565 "arrays_zip needs at least two columns".into(),
566 ));
567 }
568 let name = columns[0].field().into_owned().name;
569 let n = columns.len();
570 let mut series_vec: Vec<Series> = Vec::with_capacity(n);
571 for col in columns.iter_mut() {
572 series_vec.push(std::mem::take(col).take_materialized_series());
573 }
574 let list_cas: Vec<_> = series_vec
575 .iter()
576 .map(|s| {
577 s.list()
578 .map_err(|e| PolarsError::ComputeError(format!("arrays_zip: {e}").into()))
579 })
580 .collect::<PolarsResult<Vec<_>>>()?;
581 let len = list_cas[0].len();
582 let inner_dtype = list_cas[0].inner_dtype().clone();
583 use polars::chunked_array::StructChunked;
584 use polars::chunked_array::builder::get_list_builder;
585 use polars::datatypes::Field;
586 let struct_fields: Vec<Field> = (0..n)
587 .map(|i| Field::new(format!("field_{i}").into(), inner_dtype.clone()))
588 .collect();
589 let out_struct = DataType::Struct(struct_fields);
590 let mut builder = get_list_builder(&out_struct, 64, len, name.as_str().into());
591 for row_idx in 0..len {
592 let mut max_len = 0usize;
593 let mut row_lists: Vec<Vec<Series>> = Vec::with_capacity(n);
594 for ca in &list_cas {
595 let opt_amort = ca.amortized_iter().nth(row_idx).flatten();
596 if let Some(amort) = opt_amort {
597 let list_s = amort.as_ref().as_list();
598 let elems: Vec<Series> = list_s
599 .amortized_iter()
600 .flatten()
601 .map(|e| e.deep_clone())
602 .collect();
603 max_len = max_len.max(elems.len());
604 row_lists.push(elems);
605 } else {
606 row_lists.push(vec![]);
607 }
608 }
609 if max_len == 0 {
610 builder.append_null();
611 } else {
612 let mut struct_parts: Vec<Vec<Series>> =
613 (0..n).map(|_| Vec::with_capacity(max_len)).collect();
614 for i in 0..max_len {
615 for (j, lst) in row_lists.iter().enumerate() {
616 let elem = lst
617 .get(i)
618 .cloned()
619 .unwrap_or_else(|| Series::full_null(PlSmallStr::EMPTY, 1, &inner_dtype));
620 struct_parts[j].push(elem);
621 }
622 }
623 let field_series: Vec<Series> = struct_parts
624 .into_iter()
625 .enumerate()
626 .map(|(j, mut parts)| {
627 let mut r = parts.remove(0);
628 for s in parts {
629 let _ = r.extend(&s);
630 }
631 r.with_name(format!("field_{j}").as_str().into())
632 })
633 .collect();
634 let field_refs: Vec<&Series> = field_series.iter().collect();
635 let st =
636 StructChunked::from_series(PlSmallStr::EMPTY, max_len, field_refs.iter().copied())
637 .map_err(|e| {
638 PolarsError::ComputeError(format!("arrays_zip struct: {e}").into())
639 })?
640 .into_series();
641 builder.append_series(&st)?;
642 }
643 }
644 Ok(Some(Column::new(name, builder.finish().into_series())))
645}
646
647pub fn apply_array_intersect(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
649 if columns.len() < 2 {
650 return Err(PolarsError::ComputeError(
651 "array_intersect needs two columns (array1, array2)".into(),
652 ));
653 }
654 let name = columns[0].field().into_owned().name;
655 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
656 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
657 let a_ca = a_series
658 .list()
659 .map_err(|e| PolarsError::ComputeError(format!("array_intersect: {e}").into()))?;
660 let b_ca = b_series
661 .list()
662 .map_err(|e| PolarsError::ComputeError(format!("array_intersect: {e}").into()))?;
663 let inner_dtype = a_ca.inner_dtype().clone();
664 let mut builder = polars::chunked_array::builder::get_list_builder(
665 &inner_dtype,
666 64,
667 a_ca.len(),
668 name.as_str().into(),
669 );
670 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
671 match (opt_a, opt_b) {
672 (Some(a_amort), Some(b_amort)) => {
673 let a_list = a_amort.as_ref().as_list();
674 let b_list = b_amort.as_ref().as_list();
675 let b_keys: std::collections::HashSet<String> = b_list
676 .amortized_iter()
677 .flatten()
678 .map(|e| series_to_set_key(&e.deep_clone()))
679 .collect();
680 let mut acc: Vec<Series> = Vec::new();
681 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
682 for e in a_list.amortized_iter().flatten() {
683 let s = e.deep_clone();
684 let key = series_to_set_key(&s);
685 if b_keys.contains(&key) && seen.insert(key) {
686 acc.push(s);
687 }
688 }
689 let result = if acc.is_empty() {
690 Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
691 } else {
692 let mut r = acc.remove(0);
693 for s in acc {
694 r.extend(&s)?;
695 }
696 r
697 };
698 builder.append_series(&result)?;
699 }
700 _ => builder.append_null(),
701 }
702 }
703 Ok(Some(Column::new(name, builder.finish().into_series())))
704}
705
706pub fn apply_array_union(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
708 if columns.len() < 2 {
709 return Err(PolarsError::ComputeError(
710 "array_union needs two columns (array1, array2)".into(),
711 ));
712 }
713 let name = columns[0].field().into_owned().name;
714 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
715 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
716 let a_ca = a_series
717 .list()
718 .map_err(|e| PolarsError::ComputeError(format!("array_union: {e}").into()))?;
719 let b_ca = b_series
720 .list()
721 .map_err(|e| PolarsError::ComputeError(format!("array_union: {e}").into()))?;
722 let inner_dtype = a_ca.inner_dtype().clone();
723 let mut builder = polars::chunked_array::builder::get_list_builder(
724 &inner_dtype,
725 64,
726 a_ca.len(),
727 name.as_str().into(),
728 );
729 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
730 match (opt_a, opt_b) {
731 (Some(a_amort), Some(b_amort)) => {
732 let a_list = a_amort.as_ref().as_list();
733 let b_list = b_amort.as_ref().as_list();
734 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
735 let mut acc: Vec<Series> = Vec::new();
736 for e in a_list.amortized_iter().flatten() {
737 let s = e.deep_clone();
738 let key = series_to_set_key(&s);
739 if seen.insert(key) {
740 acc.push(s);
741 }
742 }
743 for e in b_list.amortized_iter().flatten() {
744 let s = e.deep_clone();
745 let key = series_to_set_key(&s);
746 if seen.insert(key) {
747 acc.push(s);
748 }
749 }
750 let result = if acc.is_empty() {
751 Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
752 } else {
753 let mut r = acc.remove(0);
754 for s in acc {
755 r.extend(&s)?;
756 }
757 r
758 };
759 builder.append_series(&result)?;
760 }
761 _ => builder.append_null(),
762 }
763 }
764 Ok(Some(Column::new(name, builder.finish().into_series())))
765}
766
767pub fn apply_str_to_map(
769 column: Column,
770 pair_delim: &str,
771 key_value_delim: &str,
772) -> PolarsResult<Option<Column>> {
773 use polars::chunked_array::StructChunked;
774 use polars::chunked_array::builder::get_list_builder;
775 use polars::datatypes::Field;
776 let name = column.field().into_owned().name;
777 let series = column.take_materialized_series();
778 let ca = series
779 .str()
780 .map_err(|e| PolarsError::ComputeError(format!("str_to_map: {e}").into()))?;
781 let out_struct = DataType::Struct(vec![
782 Field::new("key".into(), DataType::String),
783 Field::new("value".into(), DataType::String),
784 ]);
785 let mut builder = get_list_builder(&out_struct, 64, ca.len(), name.as_str().into());
786 for opt_s in ca.into_iter() {
787 if let Some(s) = opt_s {
788 let pairs: Vec<(String, String)> = s
789 .split(pair_delim)
790 .filter_map(|part| {
791 let kv: Vec<&str> = part.splitn(2, key_value_delim).collect();
792 if kv.len() >= 2 {
793 Some((kv[0].to_string(), kv[1].to_string()))
794 } else if kv.len() == 1 && !kv[0].is_empty() {
795 Some((kv[0].to_string(), String::new()))
796 } else {
797 None
798 }
799 })
800 .collect();
801 if pairs.is_empty() {
802 builder.append_null();
803 } else {
804 let keys: Vec<String> = pairs.iter().map(|(k, _)| k.clone()).collect();
805 let vals: Vec<String> = pairs.iter().map(|(_, v)| v.clone()).collect();
806 let k_series = Series::new("key".into(), keys);
807 let v_series = Series::new("value".into(), vals);
808 let fields: [&Series; 2] = [&k_series, &v_series];
809 let st = StructChunked::from_series(
810 PlSmallStr::EMPTY,
811 pairs.len(),
812 fields.iter().copied(),
813 )
814 .map_err(|e| PolarsError::ComputeError(format!("str_to_map struct: {e}").into()))?
815 .into_series();
816 builder.append_series(&st)?;
817 }
818 } else {
819 builder.append_null();
820 }
821 }
822 Ok(Some(Column::new(name, builder.finish().into_series())))
823}
824
825pub fn apply_map_concat(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
827 use polars::chunked_array::StructChunked;
828 use polars::chunked_array::builder::get_list_builder;
829 use polars::datatypes::Field;
830 if columns.len() < 2 {
831 return Err(PolarsError::ComputeError(
832 "map_concat needs at least two columns".into(),
833 ));
834 }
835 let name = columns[0].field().into_owned().name;
836 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
837 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
838 let a_ca = a_series
839 .list()
840 .map_err(|e| PolarsError::ComputeError(format!("map_concat: {e}").into()))?;
841 let b_ca = b_series
842 .list()
843 .map_err(|e| PolarsError::ComputeError(format!("map_concat: {e}").into()))?;
844 let struct_dtype = a_ca.inner_dtype().clone();
845 let (key_dtype, value_dtype) = match &struct_dtype {
846 DataType::Struct(fields) => {
847 let k = fields
848 .iter()
849 .find(|f| f.name == "key")
850 .map(|f| f.dtype.clone())
851 .unwrap_or(DataType::String);
852 let v = fields
853 .iter()
854 .find(|f| f.name == "value")
855 .map(|f| f.dtype.clone())
856 .unwrap_or(DataType::String);
857 (k, v)
858 }
859 _ => (DataType::String, DataType::String),
860 };
861 let out_struct = DataType::Struct(vec![
862 Field::new("key".into(), key_dtype),
863 Field::new("value".into(), value_dtype),
864 ]);
865 let n = a_ca.len();
866 let mut builder = get_list_builder(&out_struct, 64, n, name.as_str().into());
867 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
868 let mut merged: std::collections::BTreeMap<String, (Series, Series)> =
869 std::collections::BTreeMap::new();
870 for amort in [opt_a, opt_b].into_iter().flatten() {
871 let list_s = amort.as_ref().as_list();
872 for elem in list_s.amortized_iter().flatten() {
873 let s = elem.deep_clone();
874 let st = s.struct_().map_err(|e| {
875 PolarsError::ComputeError(format!("map_concat struct: {e}").into())
876 })?;
877 let k_s = st.field_by_name("key").map_err(|e| {
878 PolarsError::ComputeError(format!("map_concat key: {e}").into())
879 })?;
880 let v_s = st.field_by_name("value").map_err(|e| {
881 PolarsError::ComputeError(format!("map_concat value: {e}").into())
882 })?;
883 let key = std::string::ToString::to_string(&k_s);
884 merged.insert(key, (k_s, v_s));
885 }
886 }
887 if merged.is_empty() {
888 builder.append_null();
889 } else {
890 let mut row_structs: Vec<Series> = Vec::new();
891 for (_, (k_s, v_s)) in merged {
892 let len = k_s.len();
893 let fields: [&Series; 2] = [&k_s, &v_s];
894 let st = StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
895 .map_err(|e| {
896 PolarsError::ComputeError(format!("map_concat build: {e}").into())
897 })?
898 .into_series();
899 row_structs.push(st);
900 }
901 let mut combined = row_structs.remove(0);
902 for s in row_structs {
903 combined.extend(&s)?;
904 }
905 builder.append_series(&combined)?;
906 }
907 }
908 Ok(Some(Column::new(name, builder.finish().into_series())))
909}
910
911pub fn apply_map_contains_key(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
913 if columns.len() < 2 {
914 return Err(PolarsError::ComputeError(
915 "map_contains_key needs two columns (map, key)".into(),
916 ));
917 }
918 let name = columns[0].field().into_owned().name;
919 let map_series = std::mem::take(&mut columns[0]).take_materialized_series();
920 let key_series = std::mem::take(&mut columns[1]).take_materialized_series();
921 let map_ca = map_series
922 .list()
923 .map_err(|e| PolarsError::ComputeError(format!("map_contains_key: {e}").into()))?;
924 let key_str = key_series.cast(&DataType::String)?;
925 let key_vec: Vec<String> = (0..key_str.len())
926 .map(|i| key_str.get(i).map(|av| av.to_string()).unwrap_or_default())
927 .collect();
928 let key_len = key_vec.len();
929 let mut results: Vec<bool> = Vec::with_capacity(map_ca.len());
930 for (i, opt_amort) in map_ca.amortized_iter().enumerate() {
931 let target = key_vec
932 .get(if key_len == 1 { 0 } else { i })
933 .map(|s| s.as_str())
934 .unwrap_or("");
935 let mut found = false;
936 if let Some(amort) = opt_amort {
937 let list_s = amort.as_ref().as_list();
938 for elem in list_s.amortized_iter().flatten() {
939 let s = elem.deep_clone();
940 if let Ok(st) = s.struct_() {
941 if let Ok(k) = st.field_by_name("key") {
942 let k_str: String =
943 k.get(0).ok().map(|av| av.to_string()).unwrap_or_default();
944 if k_str == target {
945 found = true;
946 break;
947 }
948 }
949 }
950 }
951 }
952 results.push(found);
953 }
954 let out =
955 BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter().map(Some));
956 Ok(Some(Column::new(name, out.into_series())))
957}
958
959pub fn apply_get(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
961 if columns.len() < 2 {
962 return Err(PolarsError::ComputeError(
963 "get needs two columns (map, key)".into(),
964 ));
965 }
966 let name = columns[0].field().into_owned().name;
967 let map_series = std::mem::take(&mut columns[0]).take_materialized_series();
968 let key_series = std::mem::take(&mut columns[1]).take_materialized_series();
969 let map_ca = map_series
970 .list()
971 .map_err(|e| PolarsError::ComputeError(format!("get: {e}").into()))?;
972 let key_str = key_series.cast(&DataType::String)?;
973 let key_vec: Vec<String> = (0..key_str.len())
974 .map(|i| key_str.get(i).map(|av| av.to_string()).unwrap_or_default())
975 .collect();
976 let key_len = key_vec.len();
977 let value_dtype = match map_ca.inner_dtype() {
978 DataType::Struct(fields) => fields
979 .iter()
980 .find(|f| f.name == "value")
981 .map(|f| f.dtype.clone())
982 .unwrap_or(DataType::String),
983 _ => DataType::String,
984 };
985 let mut result_series: Vec<Series> = Vec::with_capacity(map_ca.len());
986 for (i, opt_amort) in map_ca.amortized_iter().enumerate() {
987 let target = key_vec
988 .get(if key_len == 1 { 0 } else { i })
989 .map(|s| s.as_str())
990 .unwrap_or("");
991 let mut found: Option<Series> = None;
992 if let Some(amort) = opt_amort {
993 let list_s = amort.as_ref().as_list();
994 for elem in list_s.amortized_iter().flatten() {
995 let s = elem.deep_clone();
996 if let Ok(st) = s.struct_() {
997 if let Ok(k) = st.field_by_name("key") {
998 let k_str: String =
999 k.get(0).ok().map(|av| av.to_string()).unwrap_or_default();
1000 if k_str == target {
1001 if let Ok(v) = st.field_by_name("value") {
1002 found = Some(v);
1003 }
1004 break;
1005 }
1006 }
1007 }
1008 }
1009 }
1010 result_series
1011 .push(found.unwrap_or_else(|| Series::full_null(PlSmallStr::EMPTY, 1, &value_dtype)));
1012 }
1013 let mut out = result_series.remove(0);
1014 for s in result_series {
1015 out.extend(&s)?;
1016 }
1017 Ok(Some(Column::new(name, out)))
1018}
1019
1020pub fn apply_ascii(column: Column) -> PolarsResult<Option<Column>> {
1022 let name = column.field().into_owned().name;
1023 let series = column.take_materialized_series();
1024 let ca = series
1025 .str()
1026 .map_err(|e| PolarsError::ComputeError(format!("ascii: {e}").into()))?;
1027 let out = Int32Chunked::from_iter_options(
1028 name.as_str().into(),
1029 ca.into_iter()
1030 .map(|opt_s| opt_s.and_then(|s| s.chars().next().map(|c| c as i32))),
1031 );
1032 Ok(Some(Column::new(name, out.into_series())))
1033}
1034
1035pub fn apply_format_number(column: Column, decimals: u32) -> PolarsResult<Option<Column>> {
1037 let name = column.field().into_owned().name;
1038 let series = column.take_materialized_series();
1039 let prec = decimals as usize;
1040 let out: StringChunked = match series.dtype() {
1041 DataType::Float64 => {
1042 let ca = series
1043 .f64()
1044 .map_err(|e| PolarsError::ComputeError(format!("format_number: {e}").into()))?;
1045 StringChunked::from_iter_options(
1046 name.as_str().into(),
1047 ca.into_iter()
1048 .map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
1049 )
1050 }
1051 DataType::Float32 => {
1052 let ca = series
1053 .f32()
1054 .map_err(|e| PolarsError::ComputeError(format!("format_number: {e}").into()))?;
1055 StringChunked::from_iter_options(
1056 name.as_str().into(),
1057 ca.into_iter()
1058 .map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
1059 )
1060 }
1061 _ => {
1062 let f64_series = series.cast(&DataType::Float64).map_err(|e| {
1063 PolarsError::ComputeError(format!("format_number cast: {e}").into())
1064 })?;
1065 let ca = f64_series
1066 .f64()
1067 .map_err(|e| PolarsError::ComputeError(format!("format_number: {e}").into()))?;
1068 StringChunked::from_iter_options(
1069 name.as_str().into(),
1070 ca.into_iter()
1071 .map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
1072 )
1073 }
1074 };
1075 Ok(Some(Column::new(name, out.into_series())))
1076}
1077
1078fn series_value_at_as_string(s: &Series, i: usize) -> Option<String> {
1080 match s.dtype() {
1081 DataType::String => s.str().ok()?.get(i).map(|v| v.to_string()),
1082 DataType::Int32 => s.i32().ok()?.get(i).map(|v| v.to_string()),
1083 DataType::Int64 => s.i64().ok()?.get(i).map(|v| v.to_string()),
1084 DataType::Float32 => s.f32().ok()?.get(i).map(|v| v.to_string()),
1085 DataType::Float64 => s.f64().ok()?.get(i).map(|v| v.to_string()),
1086 DataType::Boolean => s.bool().ok()?.get(i).map(|v| v.to_string()),
1087 _ => s.get(i).ok().map(|av| av.to_string()),
1088 }
1089}
1090
1091pub fn apply_format_string(columns: &mut [Column], format: &str) -> PolarsResult<Option<Column>> {
1094 let n = columns.len();
1095 if n == 0 {
1096 return Err(PolarsError::ComputeError(
1097 "format_string needs at least one column".into(),
1098 ));
1099 }
1100 let name = columns[0].field().into_owned().name;
1101 let mut series_vec: Vec<Series> = Vec::with_capacity(n);
1102 for col in columns.iter_mut() {
1103 series_vec.push(std::mem::take(col).take_materialized_series());
1104 }
1105 let len = series_vec[0].len();
1106 let mut result = Vec::with_capacity(len);
1107 for i in 0..len {
1108 let values: Vec<Option<String>> = series_vec
1109 .iter()
1110 .map(|s| series_value_at_as_string(s, i))
1111 .collect();
1112 let out = format_string_row(format, &values);
1113 result.push(out);
1114 }
1115 let out = StringChunked::from_iter_options(name.as_str().into(), result.into_iter());
1116 Ok(Some(Column::new(name, out.into_series())))
1117}
1118
1119fn format_string_row(format: &str, values: &[Option<String>]) -> Option<String> {
1120 let mut out = String::new();
1121 let mut idx = 0;
1122 let mut chars = format.chars().peekable();
1123 while let Some(c) = chars.next() {
1124 if c == '%' {
1125 match chars.next() {
1126 Some('%') => out.push('%'),
1127 Some('s') => {
1128 if idx >= values.len() {
1129 return None;
1130 }
1131 match &values[idx] {
1132 Some(v) => out.push_str(v),
1133 None => return None,
1134 }
1135 idx += 1;
1136 }
1137 Some('d') | Some('i') => {
1138 if idx >= values.len() {
1139 return None;
1140 }
1141 match &values[idx] {
1142 Some(v) => out.push_str(v),
1143 None => return None,
1144 }
1145 idx += 1;
1146 }
1147 Some('f') | Some('g') | Some('e') => {
1148 if idx >= values.len() {
1149 return None;
1150 }
1151 match &values[idx] {
1152 Some(v) => out.push_str(v),
1153 None => return None,
1154 }
1155 idx += 1;
1156 }
1157 _ => return None,
1158 }
1159 } else {
1160 out.push(c);
1161 }
1162 }
1163 Some(out)
1164}
1165
1166pub fn apply_find_in_set(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
1170 if columns.len() < 2 {
1171 return Err(PolarsError::ComputeError(
1172 "find_in_set needs two columns".into(),
1173 ));
1174 }
1175 let name = columns[0].field().into_owned().name;
1176 let str_series = std::mem::take(&mut columns[0]).take_materialized_series();
1177 let set_series = std::mem::take(&mut columns[1]).take_materialized_series();
1178 let str_ca = str_series
1179 .str()
1180 .map_err(|e| PolarsError::ComputeError(format!("find_in_set: {e}").into()))?;
1181 let set_ca = set_series
1182 .str()
1183 .map_err(|e| PolarsError::ComputeError(format!("find_in_set: {e}").into()))?;
1184 let out = Int64Chunked::from_iter_options(
1185 name.as_str().into(),
1186 str_ca
1187 .into_iter()
1188 .zip(set_ca)
1189 .map(|(opt_str, opt_set)| match (opt_str, opt_set) {
1190 (Some(s), Some(set_str)) => {
1191 if s.contains(',') {
1192 Some(0i64)
1193 } else {
1194 let parts: Vec<&str> = set_str.split(',').collect();
1195 let idx = parts.iter().position(|p| *p == s);
1196 Some(idx.map(|i| (i + 1) as i64).unwrap_or(0))
1197 }
1198 }
1199 _ => None,
1200 }),
1201 );
1202 Ok(Some(Column::new(name, out.into_series())))
1203}
1204
1205pub fn apply_regexp_extract_lookaround(
1208 column: Column,
1209 pattern: &str,
1210 group_index: usize,
1211) -> PolarsResult<Option<Column>> {
1212 use fancy_regex::Regex;
1213 let name = column.field().into_owned().name;
1214 let series = column.take_materialized_series();
1215 let ca = series
1216 .str()
1217 .map_err(|e| PolarsError::ComputeError(format!("regexp_extract: {e}").into()))?;
1218 let re = Regex::new(pattern).map_err(|e| {
1219 PolarsError::ComputeError(
1220 format!("regexp_extract invalid regex (lookaround) '{pattern}': {e}").into(),
1221 )
1222 })?;
1223 let out = StringChunked::from_iter_options(
1224 name.as_str().into(),
1225 ca.into_iter().map(|opt_s| {
1226 opt_s.and_then(|s| {
1227 let caps = re.captures(s).ok().flatten()?;
1228 let m = caps.get(group_index)?;
1229 Some(m.as_str().to_string())
1230 })
1231 }),
1232 );
1233 Ok(Some(Column::new(name, out.into_series())))
1234}
1235
1236pub fn apply_regexp_instr(
1239 column: Column,
1240 pattern: String,
1241 group_idx: usize,
1242) -> PolarsResult<Option<Column>> {
1243 use regex::Regex;
1244 let name = column.field().into_owned().name;
1245 let series = column.take_materialized_series();
1246 let ca = series
1247 .str()
1248 .map_err(|e| PolarsError::ComputeError(format!("regexp_instr: {e}").into()))?;
1249 let re = Regex::new(&pattern).map_err(|e| {
1250 PolarsError::ComputeError(format!("regexp_instr invalid regex '{pattern}': {e}").into())
1251 })?;
1252 let out = Int64Chunked::from_iter_options(
1253 name.as_str().into(),
1254 ca.into_iter().map(|opt_s| {
1255 opt_s.and_then(|s| {
1256 let m = if group_idx == 0 {
1257 re.find(s)
1258 } else {
1259 re.captures(s).and_then(|cap| cap.get(group_idx))
1260 };
1261 m.map(|m| (m.start() + 1) as i64)
1262 })
1263 }),
1264 );
1265 Ok(Some(Column::new(name, out.into_series())))
1266}
1267
1268pub fn apply_base64(column: Column) -> PolarsResult<Option<Column>> {
1270 use base64::Engine;
1271 let name = column.field().into_owned().name;
1272 let series = column.take_materialized_series();
1273 let ca = series
1274 .str()
1275 .map_err(|e| PolarsError::ComputeError(format!("base64: {e}").into()))?;
1276 let out = StringChunked::from_iter_options(
1277 name.as_str().into(),
1278 ca.into_iter().map(|opt_s| {
1279 opt_s.map(|s| base64::engine::general_purpose::STANDARD.encode(s.as_bytes()))
1280 }),
1281 );
1282 Ok(Some(Column::new(name, out.into_series())))
1283}
1284
1285pub fn apply_unbase64(column: Column) -> PolarsResult<Option<Column>> {
1287 use base64::Engine;
1288 let name = column.field().into_owned().name;
1289 let series = column.take_materialized_series();
1290 let ca = series
1291 .str()
1292 .map_err(|e| PolarsError::ComputeError(format!("unbase64: {e}").into()))?;
1293 let out = StringChunked::from_iter_options(
1294 name.as_str().into(),
1295 ca.into_iter().map(|opt_s| {
1296 opt_s.and_then(|s| {
1297 let decoded = base64::engine::general_purpose::STANDARD
1298 .decode(s.as_bytes())
1299 .ok()?;
1300 String::from_utf8(decoded).ok()
1301 })
1302 }),
1303 );
1304 Ok(Some(Column::new(name, out.into_series())))
1305}
1306
1307pub fn apply_sha1(column: Column) -> PolarsResult<Option<Column>> {
1309 use sha1::Digest;
1310 let name = column.field().into_owned().name;
1311 let series = column.take_materialized_series();
1312 let ca = series
1313 .str()
1314 .map_err(|e| PolarsError::ComputeError(format!("sha1: {e}").into()))?;
1315 let out = StringChunked::from_iter_options(
1316 name.as_str().into(),
1317 ca.into_iter().map(|opt_s| {
1318 opt_s.map(|s| {
1319 let mut hasher = sha1::Sha1::new();
1320 hasher.update(s.as_bytes());
1321 format!("{:x}", hasher.finalize())
1322 })
1323 }),
1324 );
1325 Ok(Some(Column::new(name, out.into_series())))
1326}
1327
1328pub fn apply_sha2(column: Column, bit_length: i32) -> PolarsResult<Option<Column>> {
1330 let name = column.field().into_owned().name;
1331 let series = column.take_materialized_series();
1332 let ca = series
1333 .str()
1334 .map_err(|e| PolarsError::ComputeError(format!("sha2: {e}").into()))?;
1335 let out = StringChunked::from_iter_options(
1336 name.as_str().into(),
1337 ca.into_iter().map(|opt_s| {
1338 opt_s.map(|s| {
1339 let bytes = s.as_bytes();
1340 use sha2::Digest;
1341 match bit_length {
1342 256 => format!("{:x}", sha2::Sha256::digest(bytes)),
1343 384 => format!("{:x}", sha2::Sha384::digest(bytes)),
1344 512 => format!("{:x}", sha2::Sha512::digest(bytes)),
1345 _ => format!("{:x}", sha2::Sha256::digest(bytes)),
1346 }
1347 })
1348 }),
1349 );
1350 Ok(Some(Column::new(name, out.into_series())))
1351}
1352
1353pub fn apply_md5(column: Column) -> PolarsResult<Option<Column>> {
1355 let name = column.field().into_owned().name;
1356 let series = column.take_materialized_series();
1357 let ca = series
1358 .str()
1359 .map_err(|e| PolarsError::ComputeError(format!("md5: {e}").into()))?;
1360 let out = StringChunked::from_iter_options(
1361 name.as_str().into(),
1362 ca.into_iter()
1363 .map(|opt_s| opt_s.map(|s| format!("{:x}", md5::compute(s.as_bytes())))),
1364 );
1365 Ok(Some(Column::new(name, out.into_series())))
1366}
1367
1368pub fn apply_encode(column: Column, charset: &str) -> PolarsResult<Option<Column>> {
1370 let name = column.field().into_owned().name;
1371 let series = column.take_materialized_series();
1372 let ca = series
1373 .str()
1374 .map_err(|e| PolarsError::ComputeError(format!("encode: {e}").into()))?;
1375 let cs = charset.to_lowercase();
1376 let out = StringChunked::from_iter_options(
1377 name.as_str().into(),
1378 ca.into_iter().map(|opt_s| {
1379 opt_s.map(|s| {
1380 let bytes: Vec<u8> = match cs.as_str() {
1381 "utf-8" | "utf8" => s.as_bytes().to_vec(),
1382 _ => s.as_bytes().to_vec(), };
1384 hex::encode(bytes)
1385 })
1386 }),
1387 );
1388 Ok(Some(Column::new(name, out.into_series())))
1389}
1390
1391pub fn apply_decode(column: Column, charset: &str) -> PolarsResult<Option<Column>> {
1393 let name = column.field().into_owned().name;
1394 let series = column.take_materialized_series();
1395 let ca = series
1396 .str()
1397 .map_err(|e| PolarsError::ComputeError(format!("decode: {e}").into()))?;
1398 let _ = charset;
1399 let out = StringChunked::from_iter_options(
1400 name.as_str().into(),
1401 ca.into_iter().map(|opt_s| {
1402 opt_s.and_then(|s| {
1403 let bytes = hex::decode(s.as_bytes()).ok()?;
1404 String::from_utf8(bytes).ok()
1405 })
1406 }),
1407 );
1408 Ok(Some(Column::new(name, out.into_series())))
1409}
1410
1411pub fn apply_to_binary(column: Column, fmt: &str) -> PolarsResult<Option<Column>> {
1413 let name = column.field().into_owned().name;
1414 let series = column.take_materialized_series();
1415 let ca = series
1416 .str()
1417 .map_err(|e| PolarsError::ComputeError(format!("to_binary: {e}").into()))?;
1418 let fmt_lower = fmt.to_lowercase();
1419 let out = StringChunked::from_iter_options(
1420 name.as_str().into(),
1421 ca.into_iter().map(|opt_s| {
1422 opt_s.and_then(|s| {
1423 let hex_str = match fmt_lower.as_str() {
1424 "hex" => {
1425 hex::decode(s.as_bytes()).ok()?;
1427 Some(s.to_string())
1428 }
1429 "utf-8" | "utf8" => Some(hex::encode(s.as_bytes())),
1430 _ => Some(hex::encode(s.as_bytes())),
1431 };
1432 hex_str
1433 })
1434 }),
1435 );
1436 Ok(Some(Column::new(name, out.into_series())))
1437}
1438
1439pub fn apply_try_to_binary(column: Column, fmt: &str) -> PolarsResult<Option<Column>> {
1441 apply_to_binary(column, fmt)
1442}
1443
1444fn aes_gcm_encrypt_one(plaintext: &[u8], key: &[u8]) -> Option<String> {
1446 use aes_gcm::Aes128Gcm;
1447 use aes_gcm::aead::generic_array::GenericArray;
1448 use aes_gcm::aead::{Aead, KeyInit};
1449 use rand::RngCore;
1450 let key_arr: [u8; 16] = key
1451 .iter()
1452 .copied()
1453 .chain(std::iter::repeat(0))
1454 .take(16)
1455 .collect::<Vec<_>>()
1456 .try_into()
1457 .ok()?;
1458 let cipher = Aes128Gcm::new(GenericArray::from_slice(&key_arr));
1459 let mut nonce = [0u8; 12];
1460 rand::thread_rng().fill_bytes(&mut nonce);
1461 let nonce = GenericArray::from_slice(&nonce);
1462 let ciphertext = cipher.encrypt(nonce, plaintext).ok()?;
1463 let mut out = nonce.as_slice().to_vec();
1464 out.extend(ciphertext);
1465 Some(hex::encode(out))
1466}
1467
1468fn aes_gcm_decrypt_one(hex_input: &str, key: &[u8]) -> Option<String> {
1470 use aes_gcm::Aes128Gcm;
1471 use aes_gcm::aead::generic_array::GenericArray;
1472 use aes_gcm::aead::{Aead, KeyInit};
1473 let bytes = hex::decode(hex_input.as_bytes()).ok()?;
1474 if bytes.len() < 12 + 16 {
1475 return None; }
1477 let (nonce_bytes, ct) = bytes.split_at(12);
1478 let key_arr: [u8; 16] = key
1479 .iter()
1480 .copied()
1481 .chain(std::iter::repeat(0))
1482 .take(16)
1483 .collect::<Vec<_>>()
1484 .try_into()
1485 .ok()?;
1486 let cipher = Aes128Gcm::new(GenericArray::from_slice(&key_arr));
1487 let nonce = GenericArray::from_slice(nonce_bytes);
1488 let plaintext = cipher.decrypt(nonce, ct).ok()?;
1489 String::from_utf8(plaintext).ok()
1490}
1491
1492pub fn apply_aes_encrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
1494 let name = column.field().into_owned().name;
1495 let series = column.take_materialized_series();
1496 let ca = series
1497 .str()
1498 .map_err(|e| PolarsError::ComputeError(format!("aes_encrypt: {e}").into()))?;
1499 let key_bytes = key.as_bytes();
1500 let out = StringChunked::from_iter_options(
1501 name.as_str().into(),
1502 ca.into_iter()
1503 .map(|opt_s| opt_s.and_then(|s| aes_gcm_encrypt_one(s.as_bytes(), key_bytes))),
1504 );
1505 Ok(Some(Column::new(name, out.into_series())))
1506}
1507
1508pub fn apply_aes_decrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
1510 let name = column.field().into_owned().name;
1511 let series = column.take_materialized_series();
1512 let ca = series
1513 .str()
1514 .map_err(|e| PolarsError::ComputeError(format!("aes_decrypt: {e}").into()))?;
1515 let key_bytes = key.as_bytes();
1516 let out = StringChunked::from_iter_options(
1517 name.as_str().into(),
1518 ca.into_iter()
1519 .map(|opt_s| opt_s.and_then(|s| aes_gcm_decrypt_one(s, key_bytes))),
1520 );
1521 Ok(Some(Column::new(name, out.into_series())))
1522}
1523
1524pub fn apply_try_aes_decrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
1526 apply_aes_decrypt(column, key)
1527}
1528
1529pub fn apply_char(column: Column) -> PolarsResult<Option<Column>> {
1531 let name = column.field().into_owned().name;
1532 let series = column.take_materialized_series();
1533 let to_char = |v: i64| -> String {
1534 let u = v as u32;
1535 if u <= 0x10FFFF {
1536 char::from_u32(u).map(|c| c.to_string()).unwrap_or_default()
1537 } else {
1538 String::new()
1539 }
1540 };
1541 let out: StringChunked = match series.dtype() {
1542 DataType::Int32 => {
1543 let ca = series
1544 .i32()
1545 .map_err(|e| PolarsError::ComputeError(format!("char: {e}").into()))?;
1546 StringChunked::from_iter_options(
1547 name.as_str().into(),
1548 ca.into_iter().map(|opt_v| opt_v.map(|v| to_char(v as i64))),
1549 )
1550 }
1551 DataType::Int64 => {
1552 let ca = series
1553 .i64()
1554 .map_err(|e| PolarsError::ComputeError(format!("char: {e}").into()))?;
1555 StringChunked::from_iter_options(
1556 name.as_str().into(),
1557 ca.into_iter().map(|opt_v| opt_v.map(to_char)),
1558 )
1559 }
1560 _ => {
1561 let i64_series = series
1562 .cast(&DataType::Int64)
1563 .map_err(|e| PolarsError::ComputeError(format!("char cast: {e}").into()))?;
1564 let ca = i64_series
1565 .i64()
1566 .map_err(|e| PolarsError::ComputeError(format!("char: {e}").into()))?;
1567 StringChunked::from_iter_options(
1568 name.as_str().into(),
1569 ca.into_iter().map(|opt_v| opt_v.map(to_char)),
1570 )
1571 }
1572 };
1573 Ok(Some(Column::new(name, out.into_series())))
1574}
1575
1576fn date_series_to_days(series: &Series) -> PolarsResult<Int32Chunked> {
1579 let casted = series.cast(&DataType::Date)?;
1580 let days_series = casted.cast(&DataType::Int32)?;
1581 days_series
1582 .i32()
1583 .map_err(|e| PolarsError::ComputeError(format!("date_series_to_days: {e}").into()))
1584 .cloned()
1585}
1586
1587fn days_to_naive_date(days: i32) -> Option<chrono::NaiveDate> {
1588 let base = robin_sparkless_core::date_utils::epoch_naive_date();
1589 base.checked_add_signed(chrono::TimeDelta::days(days as i64))
1590}
1591
1592fn naivedate_to_days(d: chrono::NaiveDate) -> i32 {
1593 let base = robin_sparkless_core::date_utils::epoch_naive_date();
1594 (d.signed_duration_since(base).num_days()) as i32
1595}
1596
1597pub fn apply_add_months(column: Column, n: i32) -> PolarsResult<Option<Column>> {
1599 use chrono::Months;
1600 let name = column.field().into_owned().name;
1601 let series = column.take_materialized_series();
1602 let ca = date_series_to_days(&series)?;
1603 let u = n.unsigned_abs();
1604 let out = ca.into_iter().map(|opt_d| {
1605 opt_d.and_then(|days| {
1606 let d = days_to_naive_date(days)?;
1607 let next = if n >= 0 {
1608 d.checked_add_months(Months::new(u))?
1609 } else {
1610 d.checked_sub_months(Months::new(u))?
1611 };
1612 Some(naivedate_to_days(next))
1613 })
1614 });
1615 let out = Int32Chunked::from_iter_options(name.as_str().into(), out);
1616 let out_series = out.into_series().cast(&DataType::Date)?;
1617 Ok(Some(Column::new(name, out_series)))
1618}
1619
1620fn parse_day_of_week(s: &str) -> Option<u32> {
1622 let s = s.trim().to_lowercase();
1623 match s.as_str() {
1624 "sun" | "sunday" => Some(1),
1625 "mon" | "monday" => Some(2),
1626 "tue" | "tuesday" => Some(3),
1627 "wed" | "wednesday" => Some(4),
1628 "thu" | "thursday" => Some(5),
1629 "fri" | "friday" => Some(6),
1630 "sat" | "saturday" => Some(7),
1631 _ => None,
1632 }
1633}
1634
1635fn chrono_weekday_to_dayofweek(w: chrono::Weekday) -> u32 {
1637 match w {
1638 chrono::Weekday::Mon => 2,
1639 chrono::Weekday::Tue => 3,
1640 chrono::Weekday::Wed => 4,
1641 chrono::Weekday::Thu => 5,
1642 chrono::Weekday::Fri => 6,
1643 chrono::Weekday::Sat => 7,
1644 chrono::Weekday::Sun => 1,
1645 }
1646}
1647
1648pub fn apply_next_day(column: Column, day_of_week: &str) -> PolarsResult<Option<Column>> {
1649 let target = parse_day_of_week(day_of_week).ok_or_else(|| {
1650 PolarsError::ComputeError(format!("next_day: invalid day '{day_of_week}'").into())
1651 })?;
1652 let name = column.field().into_owned().name;
1653 let series = column.take_materialized_series();
1654 let ca = date_series_to_days(&series)?;
1655 let out = ca.into_iter().map(|opt_d| {
1656 opt_d.and_then(|days| {
1657 let d = days_to_naive_date(days)?;
1658 let current = chrono_weekday_to_dayofweek(d.weekday());
1659 let diff = if target >= current {
1660 (target - current) as i64
1661 } else {
1662 (7 - (current - target)) as i64
1663 };
1664 let days_to_add = if diff == 0 { 7 } else { diff }; let next = d.checked_add_signed(chrono::TimeDelta::days(days_to_add))?;
1666 Some(naivedate_to_days(next))
1667 })
1668 });
1669 let out = Int32Chunked::from_iter_options(name.as_str().into(), out);
1670 let out_series = out.into_series().cast(&DataType::Date)?;
1671 Ok(Some(Column::new(name, out_series)))
1672}
1673
1674pub fn apply_dayname(column: Column) -> PolarsResult<Option<Column>> {
1676 let name = column.field().into_owned().name;
1677 let series = column.take_materialized_series();
1678 let ca = date_series_to_days(&series)?;
1679 let out = StringChunked::from_iter_options(
1680 name.as_str().into(),
1681 ca.into_iter().map(|opt_d| {
1682 opt_d.and_then(|days| {
1683 let d = days_to_naive_date(days)?;
1684 Some(d.weekday().to_string())
1685 })
1686 }),
1687 );
1688 Ok(Some(Column::new(name, out.into_series())))
1689}
1690
1691pub fn apply_weekday(column: Column) -> PolarsResult<Option<Column>> {
1693 let name = column.field().into_owned().name;
1694 let series = column.take_materialized_series();
1695 let ca = date_series_to_days(&series)?;
1696 let out = Int32Chunked::from_iter_options(
1697 name.as_str().into(),
1698 ca.into_iter().map(|opt_d| {
1699 opt_d.and_then(|days| {
1700 let d = days_to_naive_date(days)?;
1701 Some(d.weekday().num_days_from_monday() as i32)
1702 })
1703 }),
1704 );
1705 Ok(Some(Column::new(name, out.into_series())))
1706}
1707
1708pub fn apply_months_between(
1711 columns: &mut [Column],
1712 round_off: bool,
1713) -> PolarsResult<Option<Column>> {
1714 if columns.len() < 2 {
1715 return Err(PolarsError::ComputeError(
1716 "months_between needs two columns (end, start)".into(),
1717 ));
1718 }
1719 let name = columns[0].field().into_owned().name;
1720 let end_series = std::mem::take(&mut columns[0]).take_materialized_series();
1721 let start_series = std::mem::take(&mut columns[1]).take_materialized_series();
1722 let end_ca = date_series_to_days(&end_series)?;
1723 let start_ca = date_series_to_days(&start_series)?;
1724 let out = end_ca
1726 .into_iter()
1727 .zip(&start_ca)
1728 .map(|(oe, os)| match (oe, os) {
1729 (Some(e), Some(s)) => {
1730 let days = (e - s) as f64;
1731 let months = days / 31.0;
1732 Some(if round_off {
1733 (months * 1e8).round() / 1e8
1734 } else {
1735 months
1736 })
1737 }
1738 _ => None,
1739 });
1740 let out = Float64Chunked::from_iter_options(name.as_str().into(), out);
1741 Ok(Some(Column::new(name, out.into_series())))
1742}
1743
1744fn float_series_to_f64(series: &Series) -> PolarsResult<Float64Chunked> {
1747 if series.dtype() == &DataType::String {
1748 let ca = series
1750 .str()
1751 .map_err(|e| PolarsError::ComputeError(format!("float_series_to_f64: {e}").into()))?;
1752 let name = series.name().as_str().into();
1753 let results: Vec<Option<f64>> = ca
1754 .into_iter()
1755 .map(|opt_s| opt_s.and_then(parse_str_to_double))
1756 .collect();
1757 return Ok(Float64Chunked::from_iter_options(name, results.into_iter()));
1758 }
1759 let casted = series.cast(&DataType::Float64)?;
1760 casted
1761 .f64()
1762 .map_err(|e| PolarsError::ComputeError(format!("float_series_to_f64: {e}").into()))
1763 .cloned()
1764}
1765
1766pub fn apply_sin(column: Column) -> PolarsResult<Option<Column>> {
1768 let name = column.field().into_owned().name;
1769 let series = column.take_materialized_series();
1770 let ca = float_series_to_f64(&series)?;
1771 let out = ca.apply_values(f64::sin).into_series();
1772 Ok(Some(Column::new(name, out)))
1773}
1774
1775pub fn apply_cos(column: Column) -> PolarsResult<Option<Column>> {
1777 let name = column.field().into_owned().name;
1778 let series = column.take_materialized_series();
1779 let ca = float_series_to_f64(&series)?;
1780 let out = ca.apply_values(f64::cos).into_series();
1781 Ok(Some(Column::new(name, out)))
1782}
1783
1784fn bround_one(x: f64, scale: i32) -> f64 {
1786 if x.is_nan() || x.is_infinite() {
1787 return x;
1788 }
1789 let factor = 10_f64.powi(scale);
1790 let scaled = x * factor;
1791 let rounded =
1792 if scaled.fract().abs() > 0.5_f64 - 1e-10 && scaled.fract().abs() < 0.5_f64 + 1e-10 {
1793 let floor_val = scaled.trunc();
1795 if floor_val as i64 % 2 == 0 {
1796 floor_val
1797 } else {
1798 floor_val + scaled.signum()
1799 }
1800 } else {
1801 scaled.round()
1802 };
1803 rounded / factor
1804}
1805
1806fn conv_one(s: &str, from_base: i32, to_base: i32) -> Option<String> {
1808 if !(2..=36).contains(&from_base) || !(2..=36).contains(&to_base) {
1809 return None;
1810 }
1811 let s_trim = s.trim();
1812 if s_trim.is_empty() {
1813 return None;
1814 }
1815 let n = i64::from_str_radix(s_trim, from_base as u32).ok()?;
1816 let to_b = to_base as u32;
1817 const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz";
1818 if n == 0 {
1819 return Some("0".to_string());
1820 }
1821 let mut result = String::new();
1822 let mut val = if n < 0 {
1823 result.push('-');
1824 n.unsigned_abs()
1825 } else {
1826 n as u64
1827 };
1828 let mut buf = String::new();
1829 while val > 0 {
1830 buf.push(DIGITS[(val % to_b as u64) as usize] as char);
1831 val /= to_b as u64;
1832 }
1833 result.push_str(&buf.chars().rev().collect::<String>());
1834 Some(result)
1835}
1836
1837pub fn apply_conv(column: Column, from_base: i32, to_base: i32) -> PolarsResult<Option<Column>> {
1839 use std::borrow::Cow;
1840 let name = column.field().into_owned().name;
1841 let series = column.take_materialized_series();
1842 let out = if series.dtype() == &DataType::String {
1843 let ca = series
1844 .str()
1845 .map_err(|e| PolarsError::ComputeError(format!("conv: {e}").into()))?;
1846 ca.apply(|opt_s| opt_s.and_then(|s| conv_one(s, from_base, to_base).map(Cow::Owned)))
1847 .into_series()
1848 } else if series.dtype() == &DataType::Int64 {
1849 let ca = series
1850 .i64()
1851 .map_err(|e| PolarsError::ComputeError(format!("conv: {e}").into()))?;
1852 let to_b = to_base as u32;
1853 const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz";
1854 let format_int = |n: i64| -> Option<String> {
1855 if !(2..=36).contains(&to_b) {
1856 return None;
1857 }
1858 if n == 0 {
1859 return Some("0".to_string());
1860 }
1861 let mut result = String::new();
1862 let mut val = if n < 0 {
1863 result.push('-');
1864 n.unsigned_abs()
1865 } else {
1866 n as u64
1867 };
1868 let mut buf = String::new();
1869 while val > 0 {
1870 buf.push(DIGITS[(val % to_b as u64) as usize] as char);
1871 val /= to_b as u64;
1872 }
1873 result.push_str(&buf.chars().rev().collect::<String>());
1874 Some(result)
1875 };
1876 let out_ca = StringChunked::from_iter_options(
1877 name.as_str().into(),
1878 ca.into_iter().map(|opt| opt.and_then(format_int)),
1879 );
1880 out_ca.into_series()
1881 } else {
1882 let s_str = series.cast(&DataType::String)?;
1883 let ca = s_str
1884 .str()
1885 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1886 ca.apply(|opt_s| opt_s.and_then(|s| conv_one(s, from_base, to_base).map(Cow::Owned)))
1887 .into_series()
1888 };
1889 Ok(Some(Column::new(name, out)))
1890}
1891
1892pub fn apply_hex(column: Column) -> PolarsResult<Option<Column>> {
1894 let name = column.field().into_owned().name;
1895 let series = column.take_materialized_series();
1896 let out = if series.dtype() == &DataType::Int64 || series.dtype() == &DataType::Int32 {
1897 let s = series.cast(&DataType::Int64)?;
1898 let ca = s
1899 .i64()
1900 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1901 let out_ca = StringChunked::from_iter_options(
1902 name.as_str().into(),
1903 ca.into_iter().map(|opt| opt.map(|n| format!("{n:X}"))),
1904 );
1905 out_ca.into_series()
1906 } else if series.dtype() == &DataType::String {
1907 let ca = series
1908 .str()
1909 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1910 let out_ca = StringChunked::from_iter_options(
1911 name.as_str().into(),
1912 ca.into_iter().map(|opt| {
1913 opt.map(|s| {
1914 s.as_bytes()
1915 .iter()
1916 .map(|b| format!("{b:02X}"))
1917 .collect::<String>()
1918 })
1919 }),
1920 );
1921 out_ca.into_series()
1922 } else {
1923 let s = series.cast(&DataType::Int64)?;
1924 let ca = s
1925 .i64()
1926 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1927 let out_ca = StringChunked::from_iter_options(
1928 name.as_str().into(),
1929 ca.into_iter().map(|opt| opt.map(|n| format!("{n:X}"))),
1930 );
1931 out_ca.into_series()
1932 };
1933 Ok(Some(Column::new(name, out)))
1934}
1935
1936pub fn apply_unhex(column: Column) -> PolarsResult<Option<Column>> {
1938 use std::borrow::Cow;
1939 let name = column.field().into_owned().name;
1940 let series = column.take_materialized_series();
1941 let ca = series
1942 .str()
1943 .map_err(|e| PolarsError::ComputeError(format!("unhex: {e}").into()))?;
1944 let unhex_one = |s: &str| -> Option<Vec<u8>> {
1945 let s = s.trim();
1946 let chars: Vec<char> = if s.len() % 2 == 1 {
1947 s.chars().skip(1).collect()
1948 } else {
1949 s.chars().collect()
1950 };
1951 let mut bytes = Vec::with_capacity(chars.len() / 2);
1952 for chunk in chars.chunks(2) {
1953 let hex_pair: String = chunk.iter().collect();
1954 let byte = u8::from_str_radix(&hex_pair, 16).ok()?;
1955 bytes.push(byte);
1956 }
1957 Some(bytes)
1958 };
1959 let out = ca
1960 .apply(|opt_s| {
1961 opt_s.and_then(|s| {
1962 unhex_one(s)
1963 .and_then(|b| String::from_utf8(b).ok())
1964 .map(Cow::Owned)
1965 })
1966 })
1967 .into_series();
1968 Ok(Some(Column::new(name, out)))
1969}
1970
1971pub fn apply_bin(column: Column) -> PolarsResult<Option<Column>> {
1973 let name = column.field().into_owned().name;
1974 let series = column.take_materialized_series();
1975 let s = series.cast(&DataType::Int64)?;
1976 let ca = s
1977 .i64()
1978 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1979 let out_ca = StringChunked::from_iter_options(
1980 name.as_str().into(),
1981 ca.into_iter().map(|opt| opt.map(|n| format!("{n:b}"))),
1982 );
1983 Ok(Some(Column::new(name, out_ca.into_series())))
1984}
1985
1986pub fn apply_getbit(column: Column, pos: i64) -> PolarsResult<Option<Column>> {
1988 let name = column.field().into_owned().name;
1989 let series = column.take_materialized_series();
1990 let s = series.cast(&DataType::Int64)?;
1991 let ca = s
1992 .i64()
1993 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1994 let pos = pos.max(0);
1995 let out_ca = Int64Chunked::from_iter_options(
1996 name.as_str().into(),
1997 ca.into_iter().map(|opt| opt.map(|n| (n >> pos) & 1)),
1998 );
1999 Ok(Some(Column::new(name, out_ca.into_series())))
2000}
2001
2002pub fn apply_bit_count(column: Column) -> PolarsResult<Option<Column>> {
2004 let name = column.field().into_owned().name;
2005 let series = column.take_materialized_series();
2006 let s = series.cast(&DataType::Int64)?;
2007 let ca = s
2008 .i64()
2009 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2010 let out_ca = Int64Chunked::from_iter_options(
2011 name.as_str().into(),
2012 ca.into_iter()
2013 .map(|opt| opt.map(|n| i64::from(n.count_ones()))),
2014 );
2015 Ok(Some(Column::new(name, out_ca.into_series())))
2016}
2017
2018pub fn apply_assert_true(column: Column, err_msg: Option<&str>) -> PolarsResult<Option<Column>> {
2022 let name = column.field().into_owned().name;
2023 let series = column.take_materialized_series();
2024 let ca = series
2025 .bool()
2026 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2027 let len = ca.len();
2028 let failed = ca.into_iter().any(|opt| match opt {
2030 Some(true) => false,
2031 Some(false) | None => true,
2032 });
2033 if failed {
2034 let msg = err_msg
2035 .map(String::from)
2036 .unwrap_or_else(|| format!("assert_true failed on column '{name}'"));
2037 return Err(PolarsError::ComputeError(msg.into()));
2038 }
2039 let null_col = BooleanChunked::from_iter_options(name.as_str().into(), (0..len).map(|_| None));
2041 Ok(Some(Column::new(name, null_col.into_series())))
2042}
2043
2044pub fn apply_rand_with_seed(column: Column, seed: Option<u64>) -> PolarsResult<Option<Column>> {
2046 use rand::Rng;
2047 use rand::SeedableRng;
2048 let name = column.field().into_owned().name;
2049 let series = column.take_materialized_series();
2050 let n = series.len();
2051 let values: Vec<f64> = if let Some(s) = seed {
2052 let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2053 (0..n).map(|_| rng.r#gen::<f64>()).collect()
2054 } else {
2055 let mut rng = rand::thread_rng();
2056 (0..n).map(|_| rng.r#gen::<f64>()).collect()
2057 };
2058 let out = Float64Chunked::from_vec(name.as_str().into(), values);
2059 Ok(Some(Column::new(name, out.into_series())))
2060}
2061
2062pub fn apply_randn_with_seed(column: Column, seed: Option<u64>) -> PolarsResult<Option<Column>> {
2064 use rand::SeedableRng;
2065 use rand_distr::Distribution;
2066 let name = column.field().into_owned().name;
2067 let series = column.take_materialized_series();
2068 let n = series.len();
2069 let dist = rand_distr::StandardNormal;
2070 let values: Vec<f64> = if let Some(s) = seed {
2071 let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2072 (0..n).map(|_| dist.sample(&mut rng)).collect()
2073 } else {
2074 let mut rng = rand::thread_rng();
2075 (0..n).map(|_| dist.sample(&mut rng)).collect()
2076 };
2077 let out = Float64Chunked::from_vec(name.as_str().into(), values);
2078 Ok(Some(Column::new(name, out.into_series())))
2079}
2080
2081pub fn series_rand_n(name: &str, n: usize, seed: Option<u64>) -> Series {
2083 use rand::Rng;
2084 use rand::SeedableRng;
2085 let values: Vec<f64> = if let Some(s) = seed {
2086 let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2087 (0..n).map(|_| rng.r#gen::<f64>()).collect()
2088 } else {
2089 let mut rng = rand::thread_rng();
2090 (0..n).map(|_| rng.r#gen::<f64>()).collect()
2091 };
2092 Float64Chunked::from_vec(name.into(), values).into_series()
2093}
2094
2095pub fn series_randn_n(name: &str, n: usize, seed: Option<u64>) -> Series {
2097 use rand::SeedableRng;
2098 use rand_distr::Distribution;
2099 let dist = rand_distr::StandardNormal;
2100 let values: Vec<f64> = if let Some(s) = seed {
2101 let mut rng = rand::rngs::StdRng::seed_from_u64(s);
2102 (0..n).map(|_| dist.sample(&mut rng)).collect()
2103 } else {
2104 let mut rng = rand::thread_rng();
2105 (0..n).map(|_| dist.sample(&mut rng)).collect()
2106 };
2107 Float64Chunked::from_vec(name.into(), values).into_series()
2108}
2109
2110pub fn apply_bit_and(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2112 if columns.len() < 2 {
2113 return Err(PolarsError::ComputeError(
2114 "bit_and needs two columns".into(),
2115 ));
2116 }
2117 let name = columns[0].field().into_owned().name;
2118 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2119 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2120 let a_cast = a_series.cast(&DataType::Int64)?;
2121 let b_cast = b_series.cast(&DataType::Int64)?;
2122 let a = a_cast
2123 .i64()
2124 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2125 let b = b_cast
2126 .i64()
2127 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2128 let out = Int64Chunked::from_iter_options(
2129 name.as_str().into(),
2130 a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
2131 (Some(x), Some(y)) => Some(x & y),
2132 _ => None,
2133 }),
2134 );
2135 Ok(Some(Column::new(name, out.into_series())))
2136}
2137
2138pub fn apply_bit_or(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2140 if columns.len() < 2 {
2141 return Err(PolarsError::ComputeError("bit_or needs two columns".into()));
2142 }
2143 let name = columns[0].field().into_owned().name;
2144 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2145 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2146 let a_cast = a_series.cast(&DataType::Int64)?;
2147 let b_cast = b_series.cast(&DataType::Int64)?;
2148 let a = a_cast
2149 .i64()
2150 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2151 let b = b_cast
2152 .i64()
2153 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2154 let out = Int64Chunked::from_iter_options(
2155 name.as_str().into(),
2156 a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
2157 (Some(x), Some(y)) => Some(x | y),
2158 _ => None,
2159 }),
2160 );
2161 Ok(Some(Column::new(name, out.into_series())))
2162}
2163
2164pub fn apply_bit_xor(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2166 if columns.len() < 2 {
2167 return Err(PolarsError::ComputeError(
2168 "bit_xor needs two columns".into(),
2169 ));
2170 }
2171 let name = columns[0].field().into_owned().name;
2172 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2173 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2174 let a_cast = a_series.cast(&DataType::Int64)?;
2175 let b_cast = b_series.cast(&DataType::Int64)?;
2176 let a = a_cast
2177 .i64()
2178 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2179 let b = b_cast
2180 .i64()
2181 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
2182 let out = Int64Chunked::from_iter_options(
2183 name.as_str().into(),
2184 a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
2185 (Some(x), Some(y)) => Some(x ^ y),
2186 _ => None,
2187 }),
2188 );
2189 Ok(Some(Column::new(name, out.into_series())))
2190}
2191
2192pub fn apply_round(column: Column, decimals: u32) -> PolarsResult<Option<Column>> {
2195 let name = column.field().into_owned().name;
2196 let series = column.take_materialized_series();
2197 let ca = float_series_to_f64(&series)?;
2198 let scale = decimals as i32;
2199 let factor = 10_f64.powi(scale);
2200 let out = ca
2201 .apply_values(|x| (x * factor).round() / factor)
2202 .into_series();
2203 Ok(Some(Column::new(name, out)))
2204}
2205
2206pub fn apply_bround(column: Column, scale: i32) -> PolarsResult<Option<Column>> {
2208 let name = column.field().into_owned().name;
2209 let series = column.take_materialized_series();
2210 let ca = float_series_to_f64(&series)?;
2211 let out = ca.apply_values(|x| bround_one(x, scale)).into_series();
2212 Ok(Some(Column::new(name, out)))
2213}
2214
2215pub fn apply_cot(column: Column) -> PolarsResult<Option<Column>> {
2217 let name = column.field().into_owned().name;
2218 let series = column.take_materialized_series();
2219 let ca = float_series_to_f64(&series)?;
2220 let out = ca.apply_values(|x| 1.0 / x.tan()).into_series();
2221 Ok(Some(Column::new(name, out)))
2222}
2223
2224pub fn apply_csc(column: Column) -> PolarsResult<Option<Column>> {
2226 let name = column.field().into_owned().name;
2227 let series = column.take_materialized_series();
2228 let ca = float_series_to_f64(&series)?;
2229 let out = ca.apply_values(|x| 1.0 / x.sin()).into_series();
2230 Ok(Some(Column::new(name, out)))
2231}
2232
2233pub fn apply_sec(column: Column) -> PolarsResult<Option<Column>> {
2235 let name = column.field().into_owned().name;
2236 let series = column.take_materialized_series();
2237 let ca = float_series_to_f64(&series)?;
2238 let out = ca.apply_values(|x| 1.0 / x.cos()).into_series();
2239 Ok(Some(Column::new(name, out)))
2240}
2241
2242pub fn apply_tan(column: Column) -> PolarsResult<Option<Column>> {
2244 let name = column.field().into_owned().name;
2245 let series = column.take_materialized_series();
2246 let ca = float_series_to_f64(&series)?;
2247 let out = ca.apply_values(f64::tan).into_series();
2248 Ok(Some(Column::new(name, out)))
2249}
2250
2251pub fn apply_asin(column: Column) -> PolarsResult<Option<Column>> {
2253 let name = column.field().into_owned().name;
2254 let series = column.take_materialized_series();
2255 let ca = float_series_to_f64(&series)?;
2256 let out = ca.apply_values(f64::asin).into_series();
2257 Ok(Some(Column::new(name, out)))
2258}
2259
2260pub fn apply_acos(column: Column) -> PolarsResult<Option<Column>> {
2262 let name = column.field().into_owned().name;
2263 let series = column.take_materialized_series();
2264 let ca = float_series_to_f64(&series)?;
2265 let out = ca.apply_values(f64::acos).into_series();
2266 Ok(Some(Column::new(name, out)))
2267}
2268
2269pub fn apply_atan(column: Column) -> PolarsResult<Option<Column>> {
2271 let name = column.field().into_owned().name;
2272 let series = column.take_materialized_series();
2273 let ca = float_series_to_f64(&series)?;
2274 let out = ca.apply_values(f64::atan).into_series();
2275 Ok(Some(Column::new(name, out)))
2276}
2277
2278pub fn apply_atan2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2280 if columns.len() < 2 {
2281 return Err(PolarsError::ComputeError(
2282 "atan2 needs two columns (y, x)".into(),
2283 ));
2284 }
2285 let name = columns[0].field().into_owned().name;
2286 let y_series = std::mem::take(&mut columns[0]).take_materialized_series();
2287 let x_series = std::mem::take(&mut columns[1]).take_materialized_series();
2288 let y_ca = float_series_to_f64(&y_series)?;
2289 let x_ca = float_series_to_f64(&x_series)?;
2290 let out = y_ca.into_iter().zip(&x_ca).map(|(oy, ox)| match (oy, ox) {
2291 (Some(y), Some(x)) => Some(f64::atan2(y, x)),
2292 _ => None,
2293 });
2294 let out = Float64Chunked::from_iter_options(name.as_str().into(), out);
2295 Ok(Some(Column::new(name, out.into_series())))
2296}
2297
2298pub fn apply_degrees(column: Column) -> PolarsResult<Option<Column>> {
2300 let name = column.field().into_owned().name;
2301 let series = column.take_materialized_series();
2302 let ca = float_series_to_f64(&series)?;
2303 let out = ca.apply_values(|r| r.to_degrees()).into_series();
2304 Ok(Some(Column::new(name, out)))
2305}
2306
2307pub fn apply_radians(column: Column) -> PolarsResult<Option<Column>> {
2309 let name = column.field().into_owned().name;
2310 let series = column.take_materialized_series();
2311 let ca = float_series_to_f64(&series)?;
2312 let out = ca.apply_values(|d| d.to_radians()).into_series();
2313 Ok(Some(Column::new(name, out)))
2314}
2315
2316pub fn apply_signum(column: Column) -> PolarsResult<Option<Column>> {
2318 let name = column.field().into_owned().name;
2319 let series = column.take_materialized_series();
2320 let ca = float_series_to_f64(&series)?;
2321 let out = ca
2322 .apply_values(|v| {
2323 if v > 0.0 {
2324 1.0
2325 } else if v < 0.0 {
2326 -1.0
2327 } else {
2328 0.0
2329 }
2330 })
2331 .into_series();
2332 Ok(Some(Column::new(name, out)))
2333}
2334
2335pub fn apply_cosh(column: Column) -> PolarsResult<Option<Column>> {
2337 let name = column.field().into_owned().name;
2338 let series = column.take_materialized_series();
2339 let ca = float_series_to_f64(&series)?;
2340 let out = ca.apply_values(f64::cosh).into_series();
2341 Ok(Some(Column::new(name, out)))
2342}
2343pub fn apply_sinh(column: Column) -> PolarsResult<Option<Column>> {
2344 let name = column.field().into_owned().name;
2345 let series = column.take_materialized_series();
2346 let ca = float_series_to_f64(&series)?;
2347 let out = ca.apply_values(f64::sinh).into_series();
2348 Ok(Some(Column::new(name, out)))
2349}
2350pub fn apply_tanh(column: Column) -> PolarsResult<Option<Column>> {
2351 let name = column.field().into_owned().name;
2352 let series = column.take_materialized_series();
2353 let ca = float_series_to_f64(&series)?;
2354 let out = ca.apply_values(f64::tanh).into_series();
2355 Ok(Some(Column::new(name, out)))
2356}
2357pub fn apply_acosh(column: Column) -> PolarsResult<Option<Column>> {
2358 let name = column.field().into_owned().name;
2359 let series = column.take_materialized_series();
2360 let ca = float_series_to_f64(&series)?;
2361 let out = ca.apply_values(f64::acosh).into_series();
2362 Ok(Some(Column::new(name, out)))
2363}
2364pub fn apply_asinh(column: Column) -> PolarsResult<Option<Column>> {
2365 let name = column.field().into_owned().name;
2366 let series = column.take_materialized_series();
2367 let ca = float_series_to_f64(&series)?;
2368 let out = ca.apply_values(f64::asinh).into_series();
2369 Ok(Some(Column::new(name, out)))
2370}
2371pub fn apply_atanh(column: Column) -> PolarsResult<Option<Column>> {
2372 let name = column.field().into_owned().name;
2373 let series = column.take_materialized_series();
2374 let ca = float_series_to_f64(&series)?;
2375 let out = ca.apply_values(f64::atanh).into_series();
2376 Ok(Some(Column::new(name, out)))
2377}
2378pub fn apply_cbrt(column: Column) -> PolarsResult<Option<Column>> {
2379 let name = column.field().into_owned().name;
2380 let series = column.take_materialized_series();
2381 let ca = float_series_to_f64(&series)?;
2382 let out = ca.apply_values(f64::cbrt).into_series();
2383 Ok(Some(Column::new(name, out)))
2384}
2385pub fn apply_expm1(column: Column) -> PolarsResult<Option<Column>> {
2386 let name = column.field().into_owned().name;
2387 let series = column.take_materialized_series();
2388 let ca = float_series_to_f64(&series)?;
2389 let out = ca.apply_values(f64::exp_m1).into_series();
2390 Ok(Some(Column::new(name, out)))
2391}
2392pub fn apply_log1p(column: Column) -> PolarsResult<Option<Column>> {
2393 let name = column.field().into_owned().name;
2394 let series = column.take_materialized_series();
2395 let ca = float_series_to_f64(&series)?;
2396 let out = ca.apply_values(f64::ln_1p).into_series();
2397 Ok(Some(Column::new(name, out)))
2398}
2399pub fn apply_log10(column: Column) -> PolarsResult<Option<Column>> {
2400 let name = column.field().into_owned().name;
2401 let series = column.take_materialized_series();
2402 let ca = float_series_to_f64(&series)?;
2403 let out = ca.apply_values(f64::log10).into_series();
2404 Ok(Some(Column::new(name, out)))
2405}
2406pub fn apply_log2(column: Column) -> PolarsResult<Option<Column>> {
2407 let name = column.field().into_owned().name;
2408 let series = column.take_materialized_series();
2409 let ca = float_series_to_f64(&series)?;
2410 let out = ca.apply_values(f64::log2).into_series();
2411 Ok(Some(Column::new(name, out)))
2412}
2413pub fn apply_rint(column: Column) -> PolarsResult<Option<Column>> {
2414 let name = column.field().into_owned().name;
2415 let series = column.take_materialized_series();
2416 let ca = float_series_to_f64(&series)?;
2417 let out = ca.apply_values(f64::round).into_series();
2418 Ok(Some(Column::new(name, out)))
2419}
2420
2421pub fn apply_greatest2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2423 if columns.len() < 2 {
2424 return Err(PolarsError::ComputeError(
2425 "greatest2 needs two columns".into(),
2426 ));
2427 }
2428 let name = columns[0].field().into_owned().name;
2429 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2430 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2431 let out = match (a_series.dtype(), b_series.dtype()) {
2432 (DataType::Float64, _) | (_, DataType::Float64) => {
2433 let a = float_series_to_f64(&a_series)?;
2434 let b = float_series_to_f64(&b_series)?;
2435 let out = Float64Chunked::from_iter_options(
2436 name.as_str().into(),
2437 a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2438 (Some(x), Some(y)) => Some(x.max(y)),
2439 (Some(x), None) => Some(x),
2440 (None, Some(y)) => Some(y),
2441 (None, None) => None,
2442 }),
2443 );
2444 out.into_series()
2445 }
2446 (DataType::Int64, _)
2447 | (DataType::Int32, _)
2448 | (_, DataType::Int64)
2449 | (_, DataType::Int32) => {
2450 let a = a_series.cast(&DataType::Int64)?;
2451 let b = b_series.cast(&DataType::Int64)?;
2452 let ca_a = a
2453 .i64()
2454 .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2455 let ca_b = b
2456 .i64()
2457 .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2458 let out = Int64Chunked::from_iter_options(
2459 name.as_str().into(),
2460 ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2461 (Some(x), Some(y)) => Some(x.max(y)),
2462 (Some(x), None) => Some(x),
2463 (None, Some(y)) => Some(y),
2464 (None, None) => None,
2465 }),
2466 );
2467 out.into_series()
2468 }
2469 (DataType::String, _) | (_, DataType::String) => {
2470 let a = a_series.cast(&DataType::String)?;
2471 let b = b_series.cast(&DataType::String)?;
2472 let ca_a = a
2473 .str()
2474 .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2475 let ca_b = b
2476 .str()
2477 .map_err(|e| PolarsError::ComputeError(format!("greatest: {e}").into()))?;
2478 let out = StringChunked::from_iter_options(
2479 name.as_str().into(),
2480 ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2481 (Some(x), Some(y)) => Some(if x >= y { x } else { y }),
2482 (Some(x), None) => Some(x),
2483 (None, Some(y)) => Some(y),
2484 (None, None) => None,
2485 }),
2486 );
2487 out.into_series()
2488 }
2489 _ => {
2490 let a = float_series_to_f64(&a_series)?;
2491 let b = float_series_to_f64(&b_series)?;
2492 let out = Float64Chunked::from_iter_options(
2493 name.as_str().into(),
2494 a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2495 (Some(x), Some(y)) => Some(x.max(y)),
2496 (Some(x), None) => Some(x),
2497 (None, Some(y)) => Some(y),
2498 (None, None) => None,
2499 }),
2500 );
2501 out.into_series()
2502 }
2503 };
2504 Ok(Some(Column::new(name, out)))
2505}
2506
2507pub fn apply_least2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2509 if columns.len() < 2 {
2510 return Err(PolarsError::ComputeError("least2 needs two columns".into()));
2511 }
2512 let name = columns[0].field().into_owned().name;
2513 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2514 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2515 let out = match (a_series.dtype(), b_series.dtype()) {
2516 (DataType::Float64, _) | (_, DataType::Float64) => {
2517 let a = float_series_to_f64(&a_series)?;
2518 let b = float_series_to_f64(&b_series)?;
2519 let out = Float64Chunked::from_iter_options(
2520 name.as_str().into(),
2521 a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2522 (Some(x), Some(y)) => Some(x.min(y)),
2523 (Some(x), None) => Some(x),
2524 (None, Some(y)) => Some(y),
2525 (None, None) => None,
2526 }),
2527 );
2528 out.into_series()
2529 }
2530 (DataType::Int64, _)
2531 | (DataType::Int32, _)
2532 | (_, DataType::Int64)
2533 | (_, DataType::Int32) => {
2534 let a = a_series.cast(&DataType::Int64)?;
2535 let b = b_series.cast(&DataType::Int64)?;
2536 let ca_a = a
2537 .i64()
2538 .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2539 let ca_b = b
2540 .i64()
2541 .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2542 let out = Int64Chunked::from_iter_options(
2543 name.as_str().into(),
2544 ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2545 (Some(x), Some(y)) => Some(x.min(y)),
2546 (Some(x), None) => Some(x),
2547 (None, Some(y)) => Some(y),
2548 (None, None) => None,
2549 }),
2550 );
2551 out.into_series()
2552 }
2553 (DataType::String, _) | (_, DataType::String) => {
2554 let a = a_series.cast(&DataType::String)?;
2555 let b = b_series.cast(&DataType::String)?;
2556 let ca_a = a
2557 .str()
2558 .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2559 let ca_b = b
2560 .str()
2561 .map_err(|e| PolarsError::ComputeError(format!("least: {e}").into()))?;
2562 let out = StringChunked::from_iter_options(
2563 name.as_str().into(),
2564 ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
2565 (Some(x), Some(y)) => Some(if x <= y { x } else { y }),
2566 (Some(x), None) => Some(x),
2567 (None, Some(y)) => Some(y),
2568 (None, None) => None,
2569 }),
2570 );
2571 out.into_series()
2572 }
2573 _ => {
2574 let a = float_series_to_f64(&a_series)?;
2575 let b = float_series_to_f64(&b_series)?;
2576 let out = Float64Chunked::from_iter_options(
2577 name.as_str().into(),
2578 a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
2579 (Some(x), Some(y)) => Some(x.min(y)),
2580 (Some(x), None) => Some(x),
2581 (None, Some(y)) => Some(y),
2582 (None, None) => None,
2583 }),
2584 );
2585 out.into_series()
2586 }
2587 };
2588 Ok(Some(Column::new(name, out)))
2589}
2590
2591pub fn apply_map_from_arrays(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2593 use polars::chunked_array::StructChunked;
2594 use polars::chunked_array::builder::get_list_builder;
2595 use polars::datatypes::Field;
2596 if columns.len() < 2 {
2597 return Err(PolarsError::ComputeError(
2598 "map_from_arrays needs two columns (keys, values)".into(),
2599 ));
2600 }
2601 let name = columns[0].field().into_owned().name;
2602 let keys_series = std::mem::take(&mut columns[0]).take_materialized_series();
2603 let values_series = std::mem::take(&mut columns[1]).take_materialized_series();
2604 let keys_ca = keys_series
2605 .list()
2606 .map_err(|e| PolarsError::ComputeError(format!("map_from_arrays keys: {e}").into()))?;
2607 let values_ca = values_series
2608 .list()
2609 .map_err(|e| PolarsError::ComputeError(format!("map_from_arrays values: {e}").into()))?;
2610 let key_dtype = keys_ca.inner_dtype().clone();
2611 let value_dtype = values_ca.inner_dtype().clone();
2612 let struct_dtype = DataType::Struct(vec![
2613 Field::new("key".into(), key_dtype),
2614 Field::new("value".into(), value_dtype),
2615 ]);
2616 let mut builder = get_list_builder(&struct_dtype, 64, keys_ca.len(), name.as_str().into());
2617 for (opt_k, opt_v) in keys_ca.amortized_iter().zip(values_ca.amortized_iter()) {
2618 match (opt_k, opt_v) {
2619 (Some(k_amort), Some(v_amort)) => {
2620 let k_list = k_amort.as_ref().as_list();
2621 let v_list = v_amort.as_ref().as_list();
2622 let mut row_structs: Vec<Series> = Vec::new();
2623 for (opt_ke, opt_ve) in k_list.amortized_iter().zip(v_list.amortized_iter()) {
2624 if let (Some(ke), Some(ve)) = (opt_ke, opt_ve) {
2625 let ke_s = ke.deep_clone();
2626 let ve_s = ve.deep_clone();
2627 let len = ke_s.len();
2628 let fields: [&Series; 2] = [&ke_s, &ve_s];
2629 let st = StructChunked::from_series(
2630 PlSmallStr::EMPTY,
2631 len,
2632 fields.iter().copied(),
2633 )
2634 .map_err(|e| PolarsError::ComputeError(format!("struct: {e}").into()))?
2635 .into_series();
2636 row_structs.push(st);
2637 }
2638 }
2639 if row_structs.is_empty() {
2640 builder
2641 .append_series(&Series::new_empty(PlSmallStr::EMPTY, &struct_dtype))
2642 .map_err(|e| PolarsError::ComputeError(format!("builder: {e}").into()))?;
2643 } else {
2644 let mut combined = row_structs.remove(0);
2645 for s in row_structs {
2646 combined.extend(&s)?;
2647 }
2648 builder
2649 .append_series(&combined)
2650 .map_err(|e| PolarsError::ComputeError(format!("builder: {e}").into()))?;
2651 }
2652 }
2653 _ => {
2654 builder.append_null();
2655 }
2656 }
2657 }
2658 let out = builder.finish().into_series();
2659 Ok(Some(Column::new(name, out)))
2660}
2661
2662pub fn apply_zip_arrays_to_struct(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2664 use polars::chunked_array::StructChunked;
2665 use polars::chunked_array::builder::get_list_builder;
2666 use polars::datatypes::Field;
2667 if columns.len() < 2 {
2668 return Err(PolarsError::ComputeError(
2669 "zip_arrays_to_struct needs two columns (left, right)".into(),
2670 ));
2671 }
2672 let name = columns[0].field().into_owned().name;
2673 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2674 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2675 let a_ca = a_series
2676 .list()
2677 .map_err(|e| PolarsError::ComputeError(format!("zip_with left: {e}").into()))?;
2678 let b_ca = b_series
2679 .list()
2680 .map_err(|e| PolarsError::ComputeError(format!("zip_with right: {e}").into()))?;
2681 let left_dtype = a_ca.inner_dtype().clone();
2682 let right_dtype = b_ca.inner_dtype().clone();
2683 let struct_dtype = DataType::Struct(vec![
2684 Field::new("left".into(), left_dtype.clone()),
2685 Field::new("right".into(), right_dtype.clone()),
2686 ]);
2687 let mut builder = get_list_builder(&struct_dtype, 64, a_ca.len(), name.as_str().into());
2688 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
2689 match (opt_a, opt_b) {
2690 (Some(a_amort), Some(b_amort)) => {
2691 let a_list = a_amort.as_ref().as_list();
2692 let b_list = b_amort.as_ref().as_list();
2693 let a_elems: Vec<Series> = a_list
2694 .amortized_iter()
2695 .flatten()
2696 .map(|amort| amort.deep_clone())
2697 .collect();
2698 let b_elems: Vec<Series> = b_list
2699 .amortized_iter()
2700 .flatten()
2701 .map(|amort| amort.deep_clone())
2702 .collect();
2703 let max_len = a_elems.len().max(b_elems.len());
2704 let mut row_structs: Vec<Series> = Vec::new();
2705 for i in 0..max_len {
2706 let left_s = a_elems.get(i).cloned();
2707 let right_s = b_elems.get(i).cloned();
2708 let (mut left_series, mut right_series) = match (left_s, right_s) {
2709 (Some(l), Some(r)) => (l, r),
2710 (Some(l), None) => {
2711 let r = Series::from_any_values_and_dtype(
2712 PlSmallStr::EMPTY,
2713 &[AnyValue::Null],
2714 &right_dtype,
2715 false,
2716 )
2717 .map_err(|e| {
2718 PolarsError::ComputeError(format!("zip null: {e}").into())
2719 })?;
2720 (l, r)
2721 }
2722 (None, Some(r)) => {
2723 let l = Series::from_any_values_and_dtype(
2724 PlSmallStr::EMPTY,
2725 &[AnyValue::Null],
2726 &left_dtype,
2727 false,
2728 )
2729 .map_err(|e| {
2730 PolarsError::ComputeError(format!("zip null: {e}").into())
2731 })?;
2732 (l, r)
2733 }
2734 (None, None) => {
2735 let mut l = Series::from_any_values_and_dtype(
2736 PlSmallStr::EMPTY,
2737 &[AnyValue::Null],
2738 &left_dtype,
2739 false,
2740 )
2741 .map_err(|e| {
2742 PolarsError::ComputeError(format!("zip null: {e}").into())
2743 })?;
2744 l.rename("left".into());
2745 let mut r = Series::from_any_values_and_dtype(
2746 PlSmallStr::EMPTY,
2747 &[AnyValue::Null],
2748 &right_dtype,
2749 false,
2750 )
2751 .map_err(|e| {
2752 PolarsError::ComputeError(format!("zip null: {e}").into())
2753 })?;
2754 r.rename("right".into());
2755 (l, r)
2756 }
2757 };
2758 left_series.rename("left".into());
2759 right_series.rename("right".into());
2760 let len = left_series.len();
2761 let fields: [&Series; 2] = [&left_series, &right_series];
2762 let st =
2763 StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
2764 .map_err(|e| {
2765 PolarsError::ComputeError(format!("zip struct: {e}").into())
2766 })?
2767 .into_series();
2768 row_structs.push(st);
2769 }
2770 if row_structs.is_empty() {
2771 builder
2772 .append_series(&Series::new_empty(PlSmallStr::EMPTY, &struct_dtype))
2773 .map_err(|e| {
2774 PolarsError::ComputeError(format!("zip builder: {e}").into())
2775 })?;
2776 } else {
2777 let mut combined = row_structs.remove(0);
2778 for s in row_structs {
2779 combined.extend(&s)?;
2780 }
2781 builder.append_series(&combined).map_err(|e| {
2782 PolarsError::ComputeError(format!("zip builder: {e}").into())
2783 })?;
2784 }
2785 }
2786 _ => builder.append_null(),
2787 }
2788 }
2789 let out = builder.finish().into_series();
2790 Ok(Some(Column::new(name, out)))
2791}
2792
2793pub fn apply_map_zip_to_struct(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
2795 use polars::chunked_array::StructChunked;
2796 use polars::chunked_array::builder::get_list_builder;
2797 use polars::datatypes::Field;
2798 use std::collections::BTreeMap;
2799 if columns.len() < 2 {
2800 return Err(PolarsError::ComputeError(
2801 "map_zip_to_struct needs two columns (map1, map2)".into(),
2802 ));
2803 }
2804 let name = columns[0].field().into_owned().name;
2805 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
2806 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
2807 let a_ca = a_series
2808 .list()
2809 .map_err(|e| PolarsError::ComputeError(format!("map_zip_with map1: {e}").into()))?;
2810 let b_ca = b_series
2811 .list()
2812 .map_err(|e| PolarsError::ComputeError(format!("map_zip_with map2: {e}").into()))?;
2813 let struct_dtype_in = a_ca.inner_dtype().clone();
2814 let (key_dtype, value_dtype) = match &struct_dtype_in {
2815 DataType::Struct(fields) => {
2816 let k = fields
2817 .iter()
2818 .find(|f| f.name == "key")
2819 .map(|f| f.dtype.clone())
2820 .unwrap_or(DataType::String);
2821 let v = fields
2822 .iter()
2823 .find(|f| f.name == "value")
2824 .map(|f| f.dtype.clone())
2825 .unwrap_or(DataType::String);
2826 (k, v)
2827 }
2828 _ => (DataType::String, DataType::String),
2829 };
2830 let out_struct_dtype = DataType::Struct(vec![
2831 Field::new("key".into(), key_dtype.clone()),
2832 Field::new("value1".into(), value_dtype.clone()),
2833 Field::new("value2".into(), value_dtype.clone()),
2834 ]);
2835 let mut builder = get_list_builder(&out_struct_dtype, 64, a_ca.len(), name.as_str().into());
2836 for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
2837 match (opt_a, opt_b) {
2838 (Some(a_amort), Some(b_amort)) => {
2839 let a_list = a_amort.as_ref().as_list();
2840 let b_list = b_amort.as_ref().as_list();
2841 let mut key_to_vals: BTreeMap<String, (Series, Option<Series>, Option<Series>)> =
2842 BTreeMap::new();
2843 for elem in a_list.amortized_iter().flatten() {
2844 let s = elem.deep_clone();
2845 if let Ok(st) = s.struct_() {
2846 if let (Ok(k), Ok(v)) = (st.field_by_name("key"), st.field_by_name("value"))
2847 {
2848 let key_str: String = std::string::ToString::to_string(&k);
2849 key_to_vals
2850 .entry(key_str.clone())
2851 .or_insert_with(|| (k.clone(), None, None))
2852 .1 = Some(v);
2853 }
2854 }
2855 }
2856 for elem in b_list.amortized_iter().flatten() {
2857 let s = elem.deep_clone();
2858 if let Ok(st) = s.struct_() {
2859 if let (Ok(k), Ok(v)) = (st.field_by_name("key"), st.field_by_name("value"))
2860 {
2861 let key_str: String = std::string::ToString::to_string(&k);
2862 key_to_vals
2863 .entry(key_str.clone())
2864 .or_insert_with(|| (k.clone(), None, None))
2865 .2 = Some(v);
2866 }
2867 }
2868 }
2869 let mut row_structs: Vec<Series> = Vec::new();
2870 for (_, (k_series, v1_opt, v2_opt)) in key_to_vals {
2871 let mut k_renamed = k_series;
2872 k_renamed.rename("key".into());
2873 let v1_fallback = || {
2874 Series::from_any_values_and_dtype(
2875 PlSmallStr::EMPTY,
2876 &[AnyValue::Null],
2877 &value_dtype,
2878 false,
2879 )
2880 .map_err(|e| {
2881 PolarsError::ComputeError(format!("map_zip null fallback: {e}").into())
2882 })
2883 };
2884 let mut v1_series = match v1_opt {
2885 Some(s) => s,
2886 None => v1_fallback()?,
2887 };
2888 v1_series.rename("value1".into());
2889 let v2_fallback = || {
2890 Series::from_any_values_and_dtype(
2891 PlSmallStr::EMPTY,
2892 &[AnyValue::Null],
2893 &value_dtype,
2894 false,
2895 )
2896 .map_err(|e| {
2897 PolarsError::ComputeError(format!("map_zip null fallback: {e}").into())
2898 })
2899 };
2900 let mut v2_series = match v2_opt {
2901 Some(s) => s,
2902 None => v2_fallback()?,
2903 };
2904 v2_series.rename("value2".into());
2905 let len = k_renamed.len();
2906 let fields: [&Series; 3] = [&k_renamed, &v1_series, &v2_series];
2907 let st =
2908 StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
2909 .map_err(|e| {
2910 PolarsError::ComputeError(format!("map_zip struct: {e}").into())
2911 })?
2912 .into_series();
2913 row_structs.push(st);
2914 }
2915 if row_structs.is_empty() {
2916 builder
2917 .append_series(&Series::new_empty(PlSmallStr::EMPTY, &out_struct_dtype))
2918 .map_err(|e| {
2919 PolarsError::ComputeError(format!("map_zip builder: {e}").into())
2920 })?;
2921 } else {
2922 let mut combined = row_structs.remove(0);
2923 for s in row_structs {
2924 combined.extend(&s)?;
2925 }
2926 builder.append_series(&combined).map_err(|e| {
2927 PolarsError::ComputeError(format!("map_zip builder: {e}").into())
2928 })?;
2929 }
2930 }
2931 _ => builder.append_null(),
2932 }
2933 }
2934 let out = builder.finish().into_series();
2935 Ok(Some(Column::new(name, out)))
2936}
2937
2938pub fn apply_typeof(column: Column) -> PolarsResult<Option<Column>> {
2940 let name = column.field().into_owned().name;
2941 let series = column.take_materialized_series();
2942 let dtype_str = format!("{:?}", series.dtype());
2943 let len = series.len();
2944 let out = StringChunked::from_iter_options(
2945 name.as_str().into(),
2946 (0..len).map(|_| Some(dtype_str.clone())),
2947 );
2948 Ok(Some(Column::new(name, out.into_series())))
2949}
2950
2951fn binary_series_i64(
2953 a: &Series,
2954 b: &Series,
2955 ctx: &str,
2956) -> PolarsResult<(Int64Chunked, Int64Chunked)> {
2957 let ca_a = a
2958 .i64()
2959 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2960 .clone();
2961 let ca_b = b
2962 .i64()
2963 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2964 .clone();
2965 Ok((ca_a, ca_b))
2966}
2967
2968fn binary_series_i32(
2970 a: &Series,
2971 b: &Series,
2972 ctx: &str,
2973) -> PolarsResult<(Int32Chunked, Int32Chunked)> {
2974 let ca_a = a
2975 .i32()
2976 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2977 .clone();
2978 let ca_b = b
2979 .i32()
2980 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2981 .clone();
2982 Ok((ca_a, ca_b))
2983}
2984
2985fn binary_series_f64(
2987 a: &Series,
2988 b: &Series,
2989 ctx: &str,
2990) -> PolarsResult<(Float64Chunked, Float64Chunked)> {
2991 let a_f = a.cast(&DataType::Float64)?;
2992 let b_f = b.cast(&DataType::Float64)?;
2993 let ca_a = a_f
2994 .f64()
2995 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
2996 .clone();
2997 let ca_b = b_f
2998 .f64()
2999 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?
3000 .clone();
3001 Ok((ca_a, ca_b))
3002}
3003
3004fn series_to_f64_pyspark(s: &Series, ctx: &str) -> PolarsResult<Float64Chunked> {
3010 match s.dtype() {
3011 DataType::String => {
3012 let name = s.name();
3013 let ca = s
3014 .str()
3015 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?;
3016 let out = Float64Chunked::from_iter_options(
3017 name.as_str().into(),
3018 ca.into_iter().map(|opt_s| {
3019 opt_s.and_then(|raw| {
3020 let trimmed = raw.trim();
3021 if trimmed.is_empty() {
3022 return None;
3023 }
3024 trimmed.parse::<f64>().ok()
3025 })
3026 }),
3027 );
3028 Ok(out)
3029 }
3030 DataType::Null => {
3031 Ok(Float64Chunked::full_null(PlSmallStr::EMPTY, s.len()))
3033 }
3034 _ => {
3036 let casted = s
3037 .cast(&DataType::Float64)
3038 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))?;
3039 casted
3040 .f64()
3041 .cloned()
3042 .map_err(|e| PolarsError::ComputeError(format!("{ctx}: {e}").into()))
3043 }
3044 }
3045}
3046
3047#[allow(clippy::useless_conversion)]
3053fn pyspark_binary_arith(
3054 columns: &mut [Column],
3055 ctx: &str,
3056 op: fn(f64, f64) -> f64,
3057) -> PolarsResult<Option<Column>> {
3058 if columns.len() < 2 {
3059 return Err(PolarsError::ComputeError(
3060 format!("{ctx} needs two columns").into(),
3061 ));
3062 }
3063
3064 let name = columns[0].field().into_owned().name;
3065 let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3066 let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3067
3068 let mut ca_a = series_to_f64_pyspark(&a_s, ctx)?;
3069 let mut ca_b = series_to_f64_pyspark(&b_s, ctx)?;
3070
3071 let len_a = ca_a.len();
3073 let len_b = ca_b.len();
3074 if len_a == 1 && len_b > 1 {
3075 let val = ca_a.get(0);
3076 ca_a = Float64Chunked::from_iter_options(
3077 name.as_str().into(),
3078 std::iter::repeat_n(val, len_b),
3079 );
3080 } else if len_b == 1 && len_a > 1 {
3081 let val = ca_b.get(0);
3082 ca_b = Float64Chunked::from_iter_options(
3083 name.as_str().into(),
3084 std::iter::repeat_n(val, len_a),
3085 );
3086 }
3087
3088 let out = Float64Chunked::from_iter_options(
3089 name.as_str().into(),
3090 ca_a.into_iter()
3091 .zip(ca_b.into_iter())
3092 .map(|(oa, ob)| match (oa, ob) {
3093 (Some(a), Some(b)) => Some(op(a, b)),
3094 _ => None,
3095 }),
3096 )
3097 .into_series();
3098
3099 Ok(Some(Column::new(name, out)))
3100}
3101
3102pub fn apply_pyspark_add(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3104 pyspark_binary_arith(columns, "pyspark_add", |a, b| a + b)
3105}
3106
3107pub fn apply_pyspark_subtract(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3109 pyspark_binary_arith(columns, "pyspark_subtract", |a, b| a - b)
3110}
3111
3112pub fn apply_pyspark_multiply(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3114 pyspark_binary_arith(columns, "pyspark_multiply", |a, b| a * b)
3115}
3116
3117#[allow(clippy::useless_conversion)] pub fn apply_pyspark_divide(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3121 if columns.len() < 2 {
3122 return Err(PolarsError::ComputeError(
3123 "pyspark_divide needs two columns".into(),
3124 ));
3125 }
3126
3127 let name = columns[0].field().into_owned().name;
3128 let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3129 let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3130
3131 let mut ca_a = series_to_f64_pyspark(&a_s, "pyspark_divide")?;
3132 let mut ca_b = series_to_f64_pyspark(&b_s, "pyspark_divide")?;
3133
3134 let len_a = ca_a.len();
3135 let len_b = ca_b.len();
3136 if len_a == 1 && len_b > 1 {
3137 let val = ca_a.get(0);
3138 ca_a = Float64Chunked::from_iter_options(
3139 name.as_str().into(),
3140 std::iter::repeat_n(val, len_b),
3141 );
3142 } else if len_b == 1 && len_a > 1 {
3143 let val = ca_b.get(0);
3144 ca_b = Float64Chunked::from_iter_options(
3145 name.as_str().into(),
3146 std::iter::repeat_n(val, len_a),
3147 );
3148 }
3149
3150 let out = Float64Chunked::from_iter_options(
3151 name.as_str().into(),
3152 ca_a.into_iter()
3153 .zip(ca_b.into_iter())
3154 .map(|(oa, ob)| match (oa, ob) {
3155 (Some(a), Some(b)) => {
3156 if b == 0.0 {
3157 None } else {
3159 Some(a / b)
3160 }
3161 }
3162 _ => None,
3163 }),
3164 )
3165 .into_series();
3166
3167 Ok(Some(Column::new(name, out)))
3168}
3169
3170pub fn apply_pyspark_mod(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3172 pyspark_binary_arith(columns, "pyspark_mod", |a, b| a % b)
3173}
3174
3175pub fn apply_try_add(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3177 if columns.len() < 2 {
3178 return Err(PolarsError::ComputeError(
3179 "try_add needs two columns".into(),
3180 ));
3181 }
3182 let name = columns[0].field().into_owned().name;
3183 let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3184 let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3185 let out = match (a_s.dtype(), b_s.dtype()) {
3186 (DataType::Int64, DataType::Int64) => {
3187 let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_add")?;
3188 Int64Chunked::from_iter_options(
3189 name.as_str().into(),
3190 ca_a.into_iter()
3191 .zip(&ca_b)
3192 .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_add(b)))),
3193 )
3194 .into_series()
3195 }
3196 (DataType::Int32, DataType::Int32) => {
3197 let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_add")?;
3198 Int32Chunked::from_iter_options(
3199 name.as_str().into(),
3200 ca_a.into_iter()
3201 .zip(&ca_b)
3202 .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_add(b)))),
3203 )
3204 .into_series()
3205 }
3206 _ => {
3207 let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_add")?;
3208 Float64Chunked::from_iter_options(
3209 name.as_str().into(),
3210 ca_a.into_iter()
3211 .zip(&ca_b)
3212 .map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a + b))),
3213 )
3214 .into_series()
3215 }
3216 };
3217 Ok(Some(Column::new(name, out)))
3218}
3219
3220pub fn apply_try_subtract(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3222 if columns.len() < 2 {
3223 return Err(PolarsError::ComputeError(
3224 "try_subtract needs two columns".into(),
3225 ));
3226 }
3227 let name = columns[0].field().into_owned().name;
3228 let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3229 let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3230 let out = match (a_s.dtype(), b_s.dtype()) {
3231 (DataType::Int64, DataType::Int64) => {
3232 let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_subtract")?;
3233 Int64Chunked::from_iter_options(
3234 name.as_str().into(),
3235 ca_a.into_iter()
3236 .zip(&ca_b)
3237 .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_sub(b)))),
3238 )
3239 .into_series()
3240 }
3241 (DataType::Int32, DataType::Int32) => {
3242 let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_subtract")?;
3243 Int32Chunked::from_iter_options(
3244 name.as_str().into(),
3245 ca_a.into_iter()
3246 .zip(&ca_b)
3247 .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_sub(b)))),
3248 )
3249 .into_series()
3250 }
3251 _ => {
3252 let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_subtract")?;
3253 Float64Chunked::from_iter_options(
3254 name.as_str().into(),
3255 ca_a.into_iter()
3256 .zip(&ca_b)
3257 .map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a - b))),
3258 )
3259 .into_series()
3260 }
3261 };
3262 Ok(Some(Column::new(name, out)))
3263}
3264
3265pub fn apply_try_multiply(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3267 if columns.len() < 2 {
3268 return Err(PolarsError::ComputeError(
3269 "try_multiply needs two columns".into(),
3270 ));
3271 }
3272 let name = columns[0].field().into_owned().name;
3273 let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
3274 let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
3275 let out = match (a_s.dtype(), b_s.dtype()) {
3276 (DataType::Int64, DataType::Int64) => {
3277 let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_multiply")?;
3278 Int64Chunked::from_iter_options(
3279 name.as_str().into(),
3280 ca_a.into_iter()
3281 .zip(&ca_b)
3282 .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_mul(b)))),
3283 )
3284 .into_series()
3285 }
3286 (DataType::Int32, DataType::Int32) => {
3287 let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_multiply")?;
3288 Int32Chunked::from_iter_options(
3289 name.as_str().into(),
3290 ca_a.into_iter()
3291 .zip(&ca_b)
3292 .map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_mul(b)))),
3293 )
3294 .into_series()
3295 }
3296 _ => {
3297 let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_multiply")?;
3298 Float64Chunked::from_iter_options(
3299 name.as_str().into(),
3300 ca_a.into_iter()
3301 .zip(&ca_b)
3302 .map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a * b))),
3303 )
3304 .into_series()
3305 }
3306 };
3307 Ok(Some(Column::new(name, out)))
3308}
3309
3310fn unquote_simple_date_format(s: &str) -> String {
3312 let mut out = String::with_capacity(s.len());
3313 let mut chars = s.chars().peekable();
3314 while let Some(c) = chars.next() {
3315 if c == '\'' {
3316 if chars.peek() == Some(&'\'') {
3317 chars.next();
3318 out.push('\'');
3319 } else {
3320 for q in chars.by_ref() {
3321 if q == '\'' {
3322 break;
3323 }
3324 out.push(q);
3325 }
3326 }
3327 } else {
3328 out.push(c);
3329 }
3330 }
3331 out
3332}
3333
3334pub(crate) fn pyspark_format_to_chrono(s: &str) -> String {
3337 let s = unquote_simple_date_format(s);
3338 s.replace("yyyy", "%Y")
3339 .replace("MM", "%m")
3340 .replace("dd", "%d")
3341 .replace("HH", "%H")
3342 .replace("mm", "%M")
3343 .replace("ss", "%S")
3344}
3345
3346pub fn apply_unix_timestamp(column: Column, format: Option<&str>) -> PolarsResult<Option<Column>> {
3348 use chrono::{DateTime, NaiveDateTime, Utc};
3349 let chrono_fmt = format
3350 .map(pyspark_format_to_chrono)
3351 .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
3352 let name = column.field().into_owned().name;
3353 let series = column.take_materialized_series();
3354 let ca = series
3355 .str()
3356 .map_err(|e| PolarsError::ComputeError(format!("unix_timestamp: {e}").into()))?;
3357 let out = Int64Chunked::from_iter_options(
3358 name.as_str().into(),
3359 ca.into_iter().map(|opt_s| {
3360 opt_s.and_then(|s| {
3361 NaiveDateTime::parse_from_str(s, &chrono_fmt)
3362 .ok()
3363 .map(|ndt| DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc).timestamp())
3364 })
3365 }),
3366 );
3367 Ok(Some(Column::new(name, out.into_series())))
3368}
3369
3370pub fn apply_from_unixtime(column: Column, format: Option<&str>) -> PolarsResult<Option<Column>> {
3372 use chrono::{DateTime, Utc};
3373 let chrono_fmt = format
3374 .map(pyspark_format_to_chrono)
3375 .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
3376 let name = column.field().into_owned().name;
3377 let series = column.take_materialized_series();
3378 let casted = series
3379 .cast(&DataType::Int64)
3380 .map_err(|e| PolarsError::ComputeError(format!("from_unixtime cast: {e}").into()))?;
3381 let ca = casted
3382 .i64()
3383 .map_err(|e| PolarsError::ComputeError(format!("from_unixtime: {e}").into()))?;
3384 let out = StringChunked::from_iter_options(
3385 name.as_str().into(),
3386 ca.into_iter().map(|opt_secs| {
3387 opt_secs.and_then(|secs| {
3388 DateTime::<Utc>::from_timestamp(secs, 0)
3389 .map(|dt| dt.format(&chrono_fmt).to_string())
3390 })
3391 }),
3392 );
3393 Ok(Some(Column::new(name, out.into_series())))
3394}
3395
3396pub fn apply_make_timestamp(
3399 columns: &mut [Column],
3400 timezone: Option<&str>,
3401) -> PolarsResult<Option<Column>> {
3402 use chrono::{NaiveDate, Utc};
3403 use polars::datatypes::TimeUnit;
3404 if columns.len() < 6 {
3405 return Err(PolarsError::ComputeError(
3406 "make_timestamp needs six columns (year, month, day, hour, min, sec)".into(),
3407 ));
3408 }
3409 let tz: Option<Tz> = timezone
3410 .map(|s| {
3411 s.parse()
3412 .map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {s}").into()))
3413 })
3414 .transpose()?;
3415 let name = columns[0].field().into_owned().name;
3416 let series: Vec<Series> = (0..6)
3417 .map(|i| std::mem::take(&mut columns[i]).take_materialized_series())
3418 .collect();
3419 let ca: Vec<Int32Chunked> = series
3420 .iter()
3421 .map(|s| {
3422 let c = s.cast(&DataType::Int32)?;
3423 Ok(c.i32()
3424 .map_err(|e| PolarsError::ComputeError(format!("make_timestamp: {e}").into()))?
3425 .clone())
3426 })
3427 .collect::<PolarsResult<Vec<_>>>()?;
3428 let len = ca[0].len();
3429 let out =
3430 Int64Chunked::from_iter_options(
3431 name.as_str().into(),
3432 (0..len).map(|i| {
3433 let y = ca[0].get(i)?;
3434 let m = ca[1].get(i)?;
3435 let d = ca[2].get(i)?;
3436 let h = ca[3].get(i).unwrap_or(0);
3437 let min = ca[4].get(i).unwrap_or(0);
3438 let s = ca[5].get(i).unwrap_or(0);
3439 let date = NaiveDate::from_ymd_opt(y, m as u32, d as u32)?;
3440 let naive = date.and_hms_opt(h as u32, min as u32, s as u32)?;
3441 match &tz {
3442 Some(tz) => tz.from_local_datetime(&naive).single().map(
3443 |dt_tz: chrono::DateTime<Tz>| dt_tz.with_timezone(&Utc).timestamp_micros(),
3444 ),
3445 None => Some(naive.and_utc().timestamp_micros()),
3446 }
3447 }),
3448 );
3449 let out_series = out
3450 .into_series()
3451 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
3452 Ok(Some(Column::new(name, out_series)))
3453}
3454
3455pub fn apply_to_timestamp_format(
3460 column: Column,
3461 format: Option<&str>,
3462 strict: bool,
3463) -> PolarsResult<Option<Column>> {
3464 use chrono::NaiveDateTime;
3465 use polars::datatypes::TimeUnit;
3466 let name = column.field().into_owned().name;
3467 let series = column.take_materialized_series();
3468 if series.dtype() != &DataType::String {
3469 let out_series = series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
3470 return Ok(Some(Column::new(name, out_series)));
3471 }
3472 let chrono_fmt = format
3473 .map(pyspark_format_to_chrono)
3474 .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
3475 let ca = series
3476 .str()
3477 .map_err(|e| PolarsError::ComputeError(format!("to_timestamp: {e}").into()))?;
3478 let out = Int64Chunked::from_iter_options(
3479 name.as_str().into(),
3480 ca.into_iter().map(|opt_s| {
3481 opt_s.and_then(|s| {
3482 NaiveDateTime::parse_from_str(s.trim(), &chrono_fmt)
3483 .ok()
3484 .map(|ndt| ndt.and_utc().timestamp_micros())
3485 })
3486 }),
3487 );
3488 let out_series = out
3489 .into_series()
3490 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
3491 if strict {
3492 Ok(Some(Column::new(name, out_series)))
3494 } else {
3495 Ok(Some(Column::new(name, out_series)))
3496 }
3497}
3498
3499fn series_to_datetime_micros(series: &Series) -> PolarsResult<Series> {
3501 use chrono::NaiveDateTime;
3502 use polars::datatypes::TimeUnit;
3503 if series.dtype() == &DataType::String {
3504 let name = series.name().as_str().into();
3505 let ca = series
3506 .str()
3507 .map_err(|e| PolarsError::ComputeError(format!("date on string: {e}").into()))?;
3508 const FMT: &str = "%Y-%m-%d %H:%M:%S";
3509 let out = Int64Chunked::from_iter_options(
3510 name,
3511 ca.into_iter().map(|opt_s| {
3512 opt_s.and_then(|s| {
3513 NaiveDateTime::parse_from_str(s.trim(), FMT)
3514 .ok()
3515 .map(|ndt| ndt.and_utc().timestamp_micros())
3516 })
3517 }),
3518 );
3519 out.into_series()
3520 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))
3521 } else {
3522 series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
3523 }
3524}
3525
3526pub fn apply_hour(column: Column) -> PolarsResult<Option<Column>> {
3528 let name = column.field().into_owned().name;
3529 let series = column.take_materialized_series();
3530 let dt_series = series_to_datetime_micros(&series)?;
3531 let ca = dt_series
3532 .datetime()
3533 .map_err(|e| PolarsError::ComputeError(format!("hour: {e}").into()))?;
3534 let out = ca.hour().into_series();
3535 Ok(Some(Column::new(name, out)))
3536}
3537
3538pub fn apply_minute(column: Column) -> PolarsResult<Option<Column>> {
3540 let name = column.field().into_owned().name;
3541 let series = column.take_materialized_series();
3542 let dt_series = series_to_datetime_micros(&series)?;
3543 let ca = dt_series
3544 .datetime()
3545 .map_err(|e| PolarsError::ComputeError(format!("minute: {e}").into()))?;
3546 let out = ca.minute().into_series();
3547 Ok(Some(Column::new(name, out)))
3548}
3549
3550pub fn apply_second(column: Column) -> PolarsResult<Option<Column>> {
3552 let name = column.field().into_owned().name;
3553 let series = column.take_materialized_series();
3554 let dt_series = series_to_datetime_micros(&series)?;
3555 let ca = dt_series
3556 .datetime()
3557 .map_err(|e| PolarsError::ComputeError(format!("second: {e}").into()))?;
3558 let out = ca.second().into_series();
3559 Ok(Some(Column::new(name, out)))
3560}
3561
3562pub fn apply_make_date(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3564 use chrono::NaiveDate;
3565 if columns.len() < 3 {
3566 return Err(PolarsError::ComputeError(
3567 "make_date needs three columns (year, month, day)".into(),
3568 ));
3569 }
3570 let name = columns[0].field().into_owned().name;
3571 let y_series = std::mem::take(&mut columns[0]).take_materialized_series();
3572 let m_series = std::mem::take(&mut columns[1]).take_materialized_series();
3573 let d_series = std::mem::take(&mut columns[2]).take_materialized_series();
3574 let y_ca = y_series
3575 .cast(&DataType::Int32)?
3576 .i32()
3577 .map_err(|e| PolarsError::ComputeError(format!("make_date: {e}").into()))?
3578 .clone();
3579 let m_ca = m_series
3580 .cast(&DataType::Int32)?
3581 .i32()
3582 .map_err(|e| PolarsError::ComputeError(format!("make_date: {e}").into()))?
3583 .clone();
3584 let d_ca = d_series
3585 .cast(&DataType::Int32)?
3586 .i32()
3587 .map_err(|e| PolarsError::ComputeError(format!("make_date: {e}").into()))?
3588 .clone();
3589 let out = Int32Chunked::from_iter_options(
3590 name.as_str().into(),
3591 y_ca.into_iter()
3592 .zip(&m_ca)
3593 .zip(&d_ca)
3594 .map(|((oy, om), od)| match (oy, om, od) {
3595 (Some(y), Some(m), Some(d)) => {
3596 NaiveDate::from_ymd_opt(y, m as u32, d as u32).map(naivedate_to_days)
3597 }
3598 _ => None,
3599 }),
3600 );
3601 let out_series = out.into_series().cast(&DataType::Date)?;
3602 Ok(Some(Column::new(name, out_series)))
3603}
3604
3605pub fn apply_unix_date(column: Column) -> PolarsResult<Option<Column>> {
3607 let name = column.field().into_owned().name;
3608 let series = column.take_materialized_series();
3609 let casted = series.cast(&DataType::Date)?;
3610 let days = casted.cast(&DataType::Int32)?;
3611 Ok(Some(Column::new(name, days)))
3612}
3613
3614pub fn apply_date_from_unix_date(column: Column) -> PolarsResult<Option<Column>> {
3616 let name = column.field().into_owned().name;
3617 let series = column.take_materialized_series();
3618 let days = series.cast(&DataType::Int32)?;
3619 let out = days.cast(&DataType::Date)?;
3620 Ok(Some(Column::new(name, out)))
3621}
3622
3623pub fn apply_pmod(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
3625 if columns.len() < 2 {
3626 return Err(PolarsError::ComputeError("pmod needs two columns".into()));
3627 }
3628 let name = columns[0].field().into_owned().name;
3629 let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
3630 let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
3631 let a = float_series_to_f64(&a_series)?;
3632 let b = float_series_to_f64(&b_series)?;
3633 let out = Float64Chunked::from_iter_options(
3634 name.as_str().into(),
3635 a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
3636 (Some(x), Some(y)) if y != 0.0 => {
3637 let r = x % y;
3638 Some(if r >= 0.0 { r } else { r + y.abs() })
3639 }
3640 _ => None,
3641 }),
3642 );
3643 Ok(Some(Column::new(name, out.into_series())))
3644}
3645
3646fn factorial_u64(n: i64) -> Option<i64> {
3648 if n < 0 {
3649 return None;
3650 }
3651 if n > 20 {
3652 return None; }
3654 let mut acc: i64 = 1;
3655 for i in 1..=n {
3656 acc = acc.checked_mul(i)?;
3657 }
3658 Some(acc)
3659}
3660
3661pub fn apply_from_utc_timestamp(column: Column, tz_str: &str) -> PolarsResult<Option<Column>> {
3663 let _: Tz = tz_str
3664 .parse()
3665 .map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {tz_str}").into()))?;
3666 Ok(Some(column))
3667}
3668
3669pub fn apply_to_utc_timestamp(column: Column, tz_str: &str) -> PolarsResult<Option<Column>> {
3671 let _: Tz = tz_str
3672 .parse()
3673 .map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {tz_str}").into()))?;
3674 Ok(Some(column))
3675}
3676
3677pub fn apply_convert_timezone(
3679 column: Column,
3680 _source_tz: &str,
3681 _target_tz: &str,
3682) -> PolarsResult<Option<Column>> {
3683 Ok(Some(column))
3684}
3685
3686pub fn apply_factorial(column: Column) -> PolarsResult<Option<Column>> {
3688 let name = column.field().into_owned().name;
3689 let series = column.take_materialized_series();
3690 let casted = series
3691 .cast(&DataType::Int64)
3692 .map_err(|e| PolarsError::ComputeError(format!("factorial cast: {e}").into()))?;
3693 let ca = casted
3694 .i64()
3695 .map_err(|e| PolarsError::ComputeError(format!("factorial: {e}").into()))?;
3696 let out = Int64Chunked::from_iter_options(
3697 name.as_str().into(),
3698 ca.into_iter().map(|opt_n| opt_n.and_then(factorial_u64)),
3699 );
3700 Ok(Some(Column::new(name, out.into_series())))
3701}
3702
3703pub fn apply_url_decode(column: Column) -> PolarsResult<Option<Column>> {
3705 use percent_encoding::percent_decode_str;
3706 let name = column.field().into_owned().name;
3707 let series = column.take_materialized_series();
3708 let ca = series
3709 .str()
3710 .map_err(|e| PolarsError::ComputeError(format!("url_decode: {e}").into()))?;
3711 let out = StringChunked::from_iter_options(
3712 name.as_str().into(),
3713 ca.into_iter().map(|opt_s| {
3714 opt_s.and_then(|s| {
3715 percent_decode_str(s)
3716 .decode_utf8()
3717 .ok()
3718 .map(|c| c.into_owned())
3719 })
3720 }),
3721 );
3722 Ok(Some(Column::new(name, out.into_series())))
3723}
3724
3725pub fn apply_url_encode(column: Column) -> PolarsResult<Option<Column>> {
3727 use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
3728 let name = column.field().into_owned().name;
3729 let series = column.take_materialized_series();
3730 let ca = series
3731 .str()
3732 .map_err(|e| PolarsError::ComputeError(format!("url_encode: {e}").into()))?;
3733 let out = StringChunked::from_iter_options(
3734 name.as_str().into(),
3735 ca.into_iter().map(|opt_s| {
3736 opt_s.map(|s| {
3737 utf8_percent_encode(s, NON_ALPHANUMERIC)
3738 .to_string()
3739 .replace("%20", "+")
3740 })
3741 }),
3742 );
3743 Ok(Some(Column::new(name, out.into_series())))
3744}
3745
3746pub fn apply_shift_right_unsigned(column: Column, n: i32) -> PolarsResult<Option<Column>> {
3748 let name = column.field().into_owned().name;
3749 let series = column.take_materialized_series();
3750 let s = series.cast(&DataType::Int64)?;
3751 let ca = s
3752 .i64()
3753 .map_err(|e| PolarsError::ComputeError(format!("shift_right_unsigned: {e}").into()))?;
3754 let u = n as u32;
3755 let out = Int64Chunked::from_iter_options(
3756 name.as_str().into(),
3757 ca.into_iter()
3758 .map(|opt_v| opt_v.map(|v| ((v as u64) >> u) as i64)),
3759 );
3760 Ok(Some(Column::new(name, out.into_series())))
3761}
3762
3763pub fn apply_json_array_length(column: Column, path: &str) -> PolarsResult<Option<Column>> {
3765 let name = column.field().into_owned().name;
3766 let series = column.take_materialized_series();
3767 let ca = series
3768 .str()
3769 .map_err(|e| PolarsError::ComputeError(format!("json_array_length: {e}").into()))?;
3770 let path = path.trim_start_matches('$').trim_start_matches('.');
3771 let path_parts: Vec<&str> = if path.is_empty() {
3772 vec![]
3773 } else {
3774 path.split('.').collect()
3775 };
3776 let out = Int64Chunked::from_iter_options(
3777 name.as_str().into(),
3778 ca.into_iter().map(|opt_s| {
3779 opt_s.and_then(|s| {
3780 let v: serde_json::Value = serde_json::from_str(s).ok()?;
3781 let mut current = &v;
3782 for part in &path_parts {
3783 current = current.get(part)?;
3784 }
3785 current.as_array().map(|a| a.len() as i64)
3786 })
3787 }),
3788 );
3789 Ok(Some(Column::new(name, out.into_series())))
3790}
3791
3792pub fn apply_json_object_keys(column: Column) -> PolarsResult<Option<Column>> {
3794 let name = column.field().into_owned().name;
3795 let series = column.take_materialized_series();
3796 let ca = series
3797 .str()
3798 .map_err(|e| PolarsError::ComputeError(format!("json_object_keys: {e}").into()))?;
3799 let out: ListChunked = ca
3800 .into_iter()
3801 .map(|opt_s| {
3802 opt_s.and_then(|s| {
3803 let v: serde_json::Value = serde_json::from_str(s).ok()?;
3804 let obj = v.as_object()?;
3805 let keys: Vec<String> = obj.keys().map(String::from).collect();
3806 Some(Series::new("".into(), keys))
3807 })
3808 })
3809 .collect();
3810 Ok(Some(Column::new(name, out.into_series())))
3811}
3812
3813pub fn apply_json_tuple(column: Column, keys: &[String]) -> PolarsResult<Option<Column>> {
3815 let name = column.field().into_owned().name;
3816 let series = column.take_materialized_series();
3817 let ca = series
3818 .str()
3819 .map_err(|e| PolarsError::ComputeError(format!("json_tuple: {e}").into()))?;
3820 let keys = keys.to_vec();
3821 let mut columns_per_key: Vec<Vec<Option<String>>> =
3822 (0..keys.len()).map(|_| Vec::new()).collect();
3823 for opt_s in ca.into_iter() {
3824 for (i, key) in keys.iter().enumerate() {
3825 let val = opt_s.and_then(|s| {
3826 let v: serde_json::Value = serde_json::from_str(s).ok()?;
3827 let obj = v.as_object()?;
3828 obj.get(key).and_then(|x| x.as_str()).map(String::from)
3829 });
3830 columns_per_key[i].push(val);
3831 }
3832 }
3833 let field_series: Vec<Series> = keys
3834 .iter()
3835 .zip(columns_per_key.iter())
3836 .map(|(k, vals)| Series::new(k.as_str().into(), vals.clone()))
3837 .collect();
3838 let out_df = DataFrame::new_infer_height(field_series.into_iter().map(|s| s.into()).collect())?;
3839 let out_struct = out_df.into_struct(name.as_str().into());
3840 Ok(Some(Column::new(name, out_struct.into_series())))
3841}
3842
3843pub fn apply_from_csv(column: Column) -> PolarsResult<Option<Column>> {
3845 let name = column.field().into_owned().name;
3846 let series = column.take_materialized_series();
3847 let ca = series
3848 .str()
3849 .map_err(|e| PolarsError::ComputeError(format!("from_csv: {e}").into()))?;
3850 const MAX_COLS: usize = 32;
3851 let mut columns: Vec<Vec<Option<String>>> = (0..MAX_COLS).map(|_| Vec::new()).collect();
3852 for opt_s in ca.into_iter() {
3853 let parts: Vec<&str> = opt_s
3854 .map(|s| s.split(',').collect::<Vec<_>>())
3855 .unwrap_or_default();
3856 for (i, col) in columns.iter_mut().enumerate().take(MAX_COLS) {
3857 let v = parts.get(i).map(|p| (*p).to_string());
3858 col.push(v);
3859 }
3860 }
3861 let field_series: Vec<Series> = (0..MAX_COLS)
3862 .map(|i| Series::new(format!("_c{i}").into(), columns[i].clone()))
3863 .collect();
3864 let out_df = DataFrame::new_infer_height(field_series.into_iter().map(|s| s.into()).collect())?;
3865 let out_series = out_df.into_struct(name.as_str().into()).into_series();
3866 Ok(Some(Column::new(name, out_series)))
3867}
3868
3869pub fn apply_to_csv(column: Column) -> PolarsResult<Option<Column>> {
3871 let name = column.field().into_owned().name;
3872 let series = column.take_materialized_series();
3873 let out = series.cast(&DataType::String)?;
3874 Ok(Some(Column::new(name, out)))
3875}
3876
3877pub fn apply_parse_url(
3880 column: Column,
3881 part: &str,
3882 key: Option<&str>,
3883) -> PolarsResult<Option<Column>> {
3884 use url::Url;
3885 let name = column.field().into_owned().name;
3886 let series = column.take_materialized_series();
3887 let ca = series
3888 .str()
3889 .map_err(|e| PolarsError::ComputeError(format!("parse_url: {e}").into()))?;
3890 let part_upper = part.trim().to_uppercase();
3891 let key_owned = key.map(String::from);
3892 let out = StringChunked::from_iter_options(
3893 name.as_str().into(),
3894 ca.into_iter().map(|opt_s| {
3895 opt_s.and_then(|s| {
3896 let u = Url::parse(s).ok()?;
3897 let out: Option<String> = match part_upper.as_str() {
3898 "PROTOCOL" | "PROT" => Some(u.scheme().to_string()),
3899 "HOST" => u.host_str().map(String::from),
3900 "PATH" | "FILE" | "PATHNAME" => Some(u.path().to_string()),
3901 "QUERY" | "REF" | "QUERYSTRING" => {
3902 if let Some(ref k) = key_owned {
3903 u.query_pairs()
3904 .find(|(name, _)| name.as_ref() == k.as_str())
3905 .map(|(_, value)| value.into_owned())
3906 } else {
3907 u.query().map(String::from)
3908 }
3909 }
3910 "USERINFO" => Some(format!("{}:{}", u.username(), u.password().unwrap_or(""))),
3911 "AUTHORITY" => u.host_str().map(|h| h.to_string()),
3912 _ => None,
3913 };
3914 out
3915 })
3916 }),
3917 );
3918 Ok(Some(Column::new(name, out.into_series())))
3919}
3920
3921pub fn apply_hash_one(column: Column) -> PolarsResult<Option<Column>> {
3923 let name = column.field().into_owned().name;
3924 let series = column.take_materialized_series();
3925 let out = Int64Chunked::from_iter_options(name.as_str().into(), series_to_hash_iter(series));
3926 Ok(Some(Column::new(name, out.into_series())))
3927}
3928
3929pub fn apply_hash_struct(column: Column) -> PolarsResult<Option<Column>> {
3931 let name = column.field().into_owned().name;
3932 let series = column.take_materialized_series();
3933 let out = Int64Chunked::from_iter_options(name.as_str().into(), series_to_hash_iter(series));
3934 Ok(Some(Column::new(name, out.into_series())))
3935}
3936
3937fn series_to_hash_iter(series: Series) -> impl Iterator<Item = Option<i64>> {
3938 use std::io::Cursor;
3939 (0..series.len()).map(move |i| {
3940 let av = series.get(i).ok()?;
3941 let bytes = any_value_to_hash_bytes(&av);
3942 let h = murmur3::murmur3_32(&mut Cursor::new(bytes), 0).ok()?;
3943 Some(h as i32 as i64)
3945 })
3946}
3947
3948fn any_value_to_hash_bytes(av: &polars::datatypes::AnyValue) -> Vec<u8> {
3949 use polars::datatypes::AnyValue;
3950 let mut buf = Vec::new();
3951 match av {
3952 AnyValue::Null => buf.push(0),
3953 AnyValue::Boolean(v) => buf.push(*v as u8),
3954 AnyValue::Int32(v) => buf.extend_from_slice(&v.to_le_bytes()),
3955 AnyValue::Int64(v) => buf.extend_from_slice(&v.to_le_bytes()),
3956 AnyValue::UInt32(v) => buf.extend_from_slice(&v.to_le_bytes()),
3957 AnyValue::UInt64(v) => buf.extend_from_slice(&v.to_le_bytes()),
3958 AnyValue::Float32(v) => buf.extend_from_slice(&v.to_bits().to_le_bytes()),
3959 AnyValue::Float64(v) => buf.extend_from_slice(&v.to_bits().to_le_bytes()),
3960 AnyValue::String(v) => buf.extend_from_slice(v.as_bytes()),
3961 AnyValue::Binary(v) => buf.extend_from_slice(v),
3962 _ => buf.extend_from_slice(av.to_string().as_bytes()),
3963 }
3964 buf
3965}
3966
3967pub fn apply_sequence(column: Column) -> PolarsResult<Option<Column>> {
3970 use polars::chunked_array::builder::get_list_builder;
3971 let name = column.field().into_owned().name;
3972 let series = column.take_materialized_series();
3973 let st = series.struct_().map_err(|e| {
3974 PolarsError::ComputeError(format!("sequence: expected struct column: {e}").into())
3975 })?;
3976 let start_s = st
3977 .field_by_name("0")
3978 .map_err(|e| PolarsError::ComputeError(format!("sequence field 0: {e}").into()))?;
3979 let stop_s = st
3980 .field_by_name("1")
3981 .map_err(|e| PolarsError::ComputeError(format!("sequence field 1: {e}").into()))?;
3982 let step_s = st.field_by_name("2").ok(); let start_series = start_s
3984 .cast(&DataType::Int64)
3985 .map_err(|e| PolarsError::ComputeError(format!("sequence start: {e}").into()))?;
3986 let stop_series = stop_s
3987 .cast(&DataType::Int64)
3988 .map_err(|e| PolarsError::ComputeError(format!("sequence stop: {e}").into()))?;
3989 let step_series_opt: Option<Series> = step_s
3990 .as_ref()
3991 .map(|s| s.cast(&DataType::Int64))
3992 .transpose()
3993 .map_err(|e| PolarsError::ComputeError(format!("sequence step: {e}").into()))?;
3994 let start_ca = start_series
3995 .i64()
3996 .map_err(|e| PolarsError::ComputeError(format!("sequence: {e}").into()))?;
3997 let stop_ca = stop_series
3998 .i64()
3999 .map_err(|e| PolarsError::ComputeError(format!("sequence: {e}").into()))?;
4000 let step_ca = step_series_opt.as_ref().and_then(|s| s.i64().ok());
4001 let n = start_ca.len();
4002 let mut builder = get_list_builder(&DataType::Int64, 64, n, name.as_str().into());
4003 for i in 0..n {
4004 let start_v = start_ca.get(i);
4005 let stop_v = stop_ca.get(i);
4006 let step_v: Option<i64> = step_ca.as_ref().and_then(|ca| ca.get(i)).or(Some(1));
4007 match (start_v, stop_v, step_v) {
4008 (Some(s), Some(st), Some(step)) if step != 0 => {
4009 let mut vals: Vec<i64> = Vec::new();
4010 if step > 0 {
4011 let mut v = s;
4012 while v <= st {
4013 vals.push(v);
4014 v += step;
4015 }
4016 } else {
4017 let mut v = s;
4018 while v >= st {
4019 vals.push(v);
4020 v += step;
4021 }
4022 }
4023 let series = Series::new("".into(), vals);
4024 builder.append_series(&series)?;
4025 }
4026 _ => builder.append_null(),
4027 }
4028 }
4029 Ok(Some(Column::new(name, builder.finish().into_series())))
4030}
4031
4032pub fn apply_struct_with_field(
4034 struct_col: Column,
4035 value_col: Column,
4036 field_name: &str,
4037) -> PolarsResult<Option<Column>> {
4038 use polars::chunked_array::StructChunked;
4039 let name = struct_col.field().into_owned().name;
4040 let struct_series = struct_col.take_materialized_series();
4041 let st = struct_series.struct_().map_err(|e| {
4042 PolarsError::ComputeError(format!("with_field: expected struct column: {e}").into())
4043 })?;
4044 let len = st.len();
4045 let fields_series = st.fields_as_series();
4046 let value_series = value_col.take_materialized_series();
4047 let mut new_fields: Vec<Series> = Vec::with_capacity(fields_series.len() + 1);
4048 let mut replaced = false;
4049 for s in &fields_series {
4050 let fname = s.name().as_str();
4051 let new_s = if fname == field_name {
4052 replaced = true;
4053 let mut v = value_series.clone();
4054 v.rename(PlSmallStr::from(field_name));
4055 v
4056 } else {
4057 s.clone()
4058 };
4059 new_fields.push(new_s);
4060 }
4061 if !replaced {
4062 new_fields.push(value_series);
4063 }
4064 let out = StructChunked::from_series(name.as_str().into(), len, new_fields.iter())
4065 .map_err(|e| PolarsError::ComputeError(format!("with_field: build struct: {e}").into()))?;
4066 Ok(Some(Column::new(name, out.into_series())))
4067}
4068
4069pub fn apply_shuffle(column: Column) -> PolarsResult<Option<Column>> {
4071 use polars::chunked_array::builder::get_list_builder;
4072 use rand::seq::SliceRandom;
4073 let name = column.field().into_owned().name;
4074 let series = column.take_materialized_series();
4075 let list_ca = series
4076 .list()
4077 .map_err(|e| PolarsError::ComputeError(format!("shuffle: {e}").into()))?;
4078 let inner_dtype = list_ca.inner_dtype().clone();
4079 let mut builder = get_list_builder(&inner_dtype, 64, list_ca.len(), name.as_str().into());
4080 for opt_list in list_ca.amortized_iter() {
4081 match opt_list {
4082 None => builder.append_null(),
4083 Some(amort) => {
4084 let list_s = amort.as_ref();
4085 let n = list_s.len();
4086 let mut indices: Vec<u32> = (0..n as u32).collect();
4087 indices.shuffle(&mut rand::thread_rng());
4088 let idx_ca = UInt32Chunked::from_vec("".into(), indices);
4089 let taken = list_s
4090 .take(&idx_ca)
4091 .map_err(|e| PolarsError::ComputeError(format!("shuffle take: {e}").into()))?;
4092 builder.append_series(&taken)?;
4093 }
4094 }
4095 }
4096 Ok(Some(Column::new(name, builder.finish().into_series())))
4097}
4098
4099const BITMAP_BYTES: usize = 4096;
4103
4104fn parse_str_to_date(s: &str) -> Option<chrono::NaiveDate> {
4106 use chrono::{NaiveDate, NaiveDateTime};
4107 let s = s.trim();
4108 NaiveDate::parse_from_str(s, "%Y-%m-%d")
4109 .ok()
4110 .or_else(|| {
4111 if s.len() >= 10 {
4112 NaiveDate::parse_from_str(&s[0..10], "%Y-%m-%d").ok()
4113 } else {
4114 None
4115 }
4116 })
4117 .or_else(|| {
4118 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
4119 .ok()
4120 .map(|dt| dt.date())
4121 })
4122}
4123
4124fn parse_str_to_int(s: &str) -> Option<i64> {
4126 let s = s.trim();
4127 if s.is_empty() {
4128 return None;
4129 }
4130 s.parse::<i64>().ok()
4131}
4132
4133pub fn apply_string_to_int(
4135 column: Column,
4136 strict: bool,
4137 target: DataType,
4138) -> PolarsResult<Option<Column>> {
4139 let name = column.field().into_owned().name;
4140 let series = column.take_materialized_series();
4141 let out: Series = match series.dtype() {
4142 DataType::String => {
4143 let ca = series
4144 .str()
4145 .map_err(|e| PolarsError::ComputeError(format!("string to int: {e}").into()))?;
4146 let mut results: Vec<Option<i64>> = Vec::with_capacity(ca.len());
4147 for opt_s in ca.into_iter() {
4148 let v = opt_s.and_then(parse_str_to_int);
4149 if strict {
4150 if let Some(s) = opt_s {
4151 if v.is_none() {
4152 return Err(PolarsError::ComputeError(
4153 format!(
4154 "conversion from `str` to `{}` failed in column '{}' for value \"{s}\"",
4155 target,
4156 name.as_str()
4157 )
4158 .into(),
4159 ));
4160 }
4161 }
4162 }
4163 results.push(v);
4164 }
4165 match &target {
4166 DataType::Int32 => {
4167 let vals: Vec<Option<i32>> = results
4168 .into_iter()
4169 .map(|o| {
4170 o.and_then(|n| {
4171 (n >= i64::from(i32::MIN) && n <= i64::from(i32::MAX))
4172 .then_some(n as i32)
4173 })
4174 })
4175 .collect();
4176 let chunked =
4177 Int32Chunked::from_iter_options(name.as_str().into(), vals.into_iter());
4178 chunked.into_series()
4179 }
4180 DataType::Int64 => {
4181 let chunked =
4182 Int64Chunked::from_iter_options(name.as_str().into(), results.into_iter());
4183 chunked.into_series()
4184 }
4185 _ => unreachable!("target is Int32 or Int64"),
4186 }
4187 }
4188 DataType::Int32 | DataType::Int64 => series.cast(&target)?,
4189 _ => {
4190 if strict {
4191 return Err(PolarsError::ComputeError(
4192 format!(
4193 "casting from {} to {} not supported",
4194 series.dtype(),
4195 target
4196 )
4197 .into(),
4198 ));
4199 }
4200 Series::new_null(name.clone(), series.len()).cast(&target)?
4201 }
4202 };
4203 Ok(Some(Column::new(name, out)))
4204}
4205
4206fn parse_str_to_bool(s: &str, strict: bool) -> Option<bool> {
4209 let lower = s.trim().to_lowercase();
4210 match lower.as_str() {
4211 "true" | "1" | "yes" => Some(true),
4212 "false" | "0" | "no" => Some(false),
4213 _ if strict => None, _ => None,
4215 }
4216}
4217
4218pub fn apply_string_to_boolean(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
4221 let name = column.field().into_owned().name;
4222 let series = column.take_materialized_series();
4223 let out: BooleanChunked = match series.dtype() {
4224 DataType::String => {
4225 let ca = series
4226 .str()
4227 .map_err(|e| PolarsError::ComputeError(format!("string to boolean: {e}").into()))?;
4228 let mut results = Vec::with_capacity(ca.len());
4229 for opt_s in ca.into_iter() {
4230 let v = match opt_s {
4231 None => None,
4232 Some(s) => {
4233 let parsed = parse_str_to_bool(s, strict);
4234 if strict && parsed.is_none() {
4235 return Err(PolarsError::ComputeError(
4236 format!("casting from string to boolean failed for value '{s}'")
4237 .into(),
4238 ));
4239 }
4240 parsed
4241 }
4242 };
4243 results.push(v);
4244 }
4245 BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter())
4246 }
4247 DataType::Boolean => {
4248 let ca = series
4249 .bool()
4250 .map_err(|e| PolarsError::ComputeError(format!("boolean: {e}").into()))?;
4251 BooleanChunked::from_iter_options(name.as_str().into(), ca.into_iter())
4252 }
4253 DataType::Int8 => {
4254 let ca = series
4255 .i8()
4256 .map_err(|e| PolarsError::ComputeError(format!("i8: {e}").into()))?;
4257 BooleanChunked::from_iter_options(
4258 name.as_str().into(),
4259 ca.into_iter().map(|o| o.map(|v| v != 0)),
4260 )
4261 }
4262 DataType::Int16 => {
4263 let ca = series
4264 .i16()
4265 .map_err(|e| PolarsError::ComputeError(format!("i16: {e}").into()))?;
4266 BooleanChunked::from_iter_options(
4267 name.as_str().into(),
4268 ca.into_iter().map(|o| o.map(|v| v != 0)),
4269 )
4270 }
4271 DataType::Int32 => {
4272 let ca = series
4273 .i32()
4274 .map_err(|e| PolarsError::ComputeError(format!("i32: {e}").into()))?;
4275 BooleanChunked::from_iter_options(
4276 name.as_str().into(),
4277 ca.into_iter().map(|o| o.map(|v| v != 0)),
4278 )
4279 }
4280 DataType::Int64 => {
4281 let ca = series
4282 .i64()
4283 .map_err(|e| PolarsError::ComputeError(format!("i64: {e}").into()))?;
4284 BooleanChunked::from_iter_options(
4285 name.as_str().into(),
4286 ca.into_iter().map(|o| o.map(|v| v != 0)),
4287 )
4288 }
4289 DataType::UInt8 => {
4290 let ca = series
4291 .u8()
4292 .map_err(|e| PolarsError::ComputeError(format!("u8: {e}").into()))?;
4293 BooleanChunked::from_iter_options(
4294 name.as_str().into(),
4295 ca.into_iter().map(|o| o.map(|v| v != 0)),
4296 )
4297 }
4298 DataType::UInt16 => {
4299 let ca = series
4300 .u16()
4301 .map_err(|e| PolarsError::ComputeError(format!("u16: {e}").into()))?;
4302 BooleanChunked::from_iter_options(
4303 name.as_str().into(),
4304 ca.into_iter().map(|o| o.map(|v| v != 0)),
4305 )
4306 }
4307 DataType::UInt32 => {
4308 let ca = series
4309 .u32()
4310 .map_err(|e| PolarsError::ComputeError(format!("u32: {e}").into()))?;
4311 BooleanChunked::from_iter_options(
4312 name.as_str().into(),
4313 ca.into_iter().map(|o| o.map(|v| v != 0)),
4314 )
4315 }
4316 DataType::UInt64 => {
4317 let ca = series
4318 .u64()
4319 .map_err(|e| PolarsError::ComputeError(format!("u64: {e}").into()))?;
4320 BooleanChunked::from_iter_options(
4321 name.as_str().into(),
4322 ca.into_iter().map(|o| o.map(|v| v != 0)),
4323 )
4324 }
4325 DataType::Float32 => {
4326 let ca = series
4327 .f32()
4328 .map_err(|e| PolarsError::ComputeError(format!("f32: {e}").into()))?;
4329 BooleanChunked::from_iter_options(
4330 name.as_str().into(),
4331 ca.into_iter().map(|o| o.map(|v| v != 0.0)),
4332 )
4333 }
4334 DataType::Float64 => {
4335 let ca = series
4336 .f64()
4337 .map_err(|e| PolarsError::ComputeError(format!("f64: {e}").into()))?;
4338 BooleanChunked::from_iter_options(
4339 name.as_str().into(),
4340 ca.into_iter().map(|o| o.map(|v| v != 0.0)),
4341 )
4342 }
4343 _ => {
4344 if strict {
4345 return Err(PolarsError::ComputeError(
4346 format!("casting from {} to boolean not supported", series.dtype()).into(),
4347 ));
4348 }
4349 BooleanChunked::from_iter_options(name.as_str().into(), (0..series.len()).map(|_| None))
4350 }
4351 };
4352 Ok(Some(Column::new(name, out.into_series())))
4353}
4354
4355fn parse_str_to_double(s: &str) -> Option<f64> {
4357 let s = s.trim();
4358 if s.is_empty() {
4359 return None;
4360 }
4361 s.parse::<f64>().ok()
4362}
4363
4364pub fn apply_string_to_double(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
4367 let name = column.field().into_owned().name;
4368 let series = column.take_materialized_series();
4369 let out: Series = match series.dtype() {
4370 DataType::String => {
4371 let ca = series
4372 .str()
4373 .map_err(|e| PolarsError::ComputeError(format!("string to double: {e}").into()))?;
4374 let mut results: Vec<Option<f64>> = Vec::with_capacity(ca.len());
4375 for opt_s in ca.into_iter() {
4376 let v = opt_s.and_then(parse_str_to_double);
4377 if strict {
4378 if let Some(s) = opt_s {
4379 if v.is_none() {
4380 return Err(PolarsError::ComputeError(
4381 format!(
4382 "conversion from `str` to `double` failed in column '{}' for value \"{s}\"",
4383 name.as_str()
4384 )
4385 .into(),
4386 ));
4387 }
4388 }
4389 }
4390 results.push(v);
4391 }
4392 Float64Chunked::from_iter_options(name.as_str().into(), results.into_iter())
4393 .into_series()
4394 }
4395 DataType::Float32 | DataType::Float64 => series.cast(&DataType::Float64)?,
4396 DataType::Int32 | DataType::Int64 | DataType::UInt32 | DataType::UInt64 => {
4397 series.cast(&DataType::Float64)?
4398 }
4399 DataType::Null => {
4400 Float64Chunked::from_iter_options(
4402 name.as_str().into(),
4403 (0..series.len()).map(|_| None::<f64>),
4404 )
4405 .into_series()
4406 }
4407 _ => {
4408 if strict {
4409 return Err(PolarsError::ComputeError(
4410 format!("casting from {} to double not supported", series.dtype()).into(),
4411 ));
4412 }
4413 Float64Chunked::from_iter_options(
4414 name.as_str().into(),
4415 (0..series.len()).map(|_| None::<f64>),
4416 )
4417 .into_series()
4418 }
4419 };
4420 Ok(Some(Column::new(name, out)))
4421}
4422
4423pub fn apply_string_to_date(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
4425 let name = column.field().into_owned().name;
4426 let series = column.take_materialized_series();
4427 let epoch = robin_sparkless_core::date_utils::epoch_naive_date();
4428 let out: Series = match series.dtype() {
4429 DataType::String => {
4430 let ca = series
4431 .str()
4432 .map_err(|e| PolarsError::ComputeError(format!("string to date: {e}").into()))?;
4433 let mut results = Vec::with_capacity(ca.len());
4434 for opt_s in ca.into_iter() {
4435 let v =
4436 opt_s.and_then(|s| parse_str_to_date(s).map(|d| (d - epoch).num_days() as i32));
4437 if strict {
4438 if let Some(s) = opt_s {
4439 if v.is_none() {
4440 return Err(PolarsError::ComputeError(
4441 format!(
4442 "conversion from `str` to `date` failed in column '{}' for value \"{s}\"",
4443 name.as_str()
4444 )
4445 .into(),
4446 ));
4447 }
4448 }
4449 }
4450 results.push(v);
4451 }
4452 let chunked =
4453 Int32Chunked::from_iter_options(name.as_str().into(), results.into_iter());
4454 chunked.into_series().cast(&DataType::Date)?
4455 }
4456 DataType::Date => series,
4457 DataType::Datetime(_, _) => series.cast(&DataType::Date)?,
4458 DataType::Null => {
4459 let days = Int32Chunked::from_iter_options(
4461 name.as_str().into(),
4462 (0..series.len()).map(|_| None::<i32>),
4463 );
4464 days.into_series().cast(&DataType::Date)?
4465 }
4466 _ => {
4467 if strict {
4468 return Err(PolarsError::ComputeError(
4469 format!("casting from {} to date not supported", series.dtype()).into(),
4470 ));
4471 }
4472 let days = Int32Chunked::from_iter_options(
4473 name.as_str().into(),
4474 (0..series.len()).map(|_| None::<i32>),
4475 );
4476 days.into_series().cast(&DataType::Date)?
4477 }
4478 };
4479 Ok(Some(Column::new(name, out)))
4480}
4481
4482pub fn apply_string_to_date_format(
4484 column: Column,
4485 format: Option<&str>,
4486 strict: bool,
4487) -> PolarsResult<Option<Column>> {
4488 let name = column.field().into_owned().name;
4489 let series = column.take_materialized_series();
4490 let epoch = robin_sparkless_core::date_utils::epoch_naive_date();
4491 let out: Series = match series.dtype() {
4492 DataType::String => {
4493 let ca = series
4494 .str()
4495 .map_err(|e| PolarsError::ComputeError(format!("string to date: {e}").into()))?;
4496 let fmt = format.map(pyspark_format_to_chrono);
4497 let mut results = Vec::with_capacity(ca.len());
4498 for opt_s in ca.into_iter() {
4499 let v = opt_s.and_then(|s| {
4500 let parsed = if let Some(ref chrono_fmt) = fmt {
4501 chrono::NaiveDate::parse_from_str(s.trim(), chrono_fmt).ok()
4502 } else {
4503 parse_str_to_date(s)
4504 };
4505 parsed.map(|d| (d.signed_duration_since(epoch).num_days()) as i32)
4506 });
4507 if strict {
4508 if let Some(s) = opt_s {
4509 if v.is_none() {
4510 return Err(PolarsError::ComputeError(
4511 format!(
4512 "to_date failed in column '{}' for value \"{s}\"",
4513 name.as_str()
4514 )
4515 .into(),
4516 ));
4517 }
4518 }
4519 }
4520 results.push(v);
4521 }
4522 let chunked =
4523 Int32Chunked::from_iter_options(name.as_str().into(), results.into_iter());
4524 chunked.into_series().cast(&DataType::Date)?
4525 }
4526 DataType::Date => series,
4527 DataType::Datetime(_, _) => series.cast(&DataType::Date)?,
4528 DataType::Null => {
4529 let days = Int32Chunked::from_iter_options(
4530 name.as_str().into(),
4531 (0..series.len()).map(|_| None::<i32>),
4532 );
4533 days.into_series().cast(&DataType::Date)?
4534 }
4535 _ => {
4536 if strict {
4537 return Err(PolarsError::ComputeError(
4538 format!("to_date from {} not supported", series.dtype()).into(),
4539 ));
4540 }
4541 let days = Int32Chunked::from_iter_options(
4542 name.as_str().into(),
4543 (0..series.len()).map(|_| None::<i32>),
4544 );
4545 days.into_series().cast(&DataType::Date)?
4546 }
4547 };
4548 Ok(Some(Column::new(name, out)))
4549}
4550
4551pub fn apply_bitmap_count(column: Column) -> PolarsResult<Option<Column>> {
4553 let name = column.field().into_owned().name;
4554 let series = column.take_materialized_series();
4555 let ca = series
4556 .binary()
4557 .map_err(|e| PolarsError::ComputeError(format!("bitmap_count: {e}").into()))?;
4558 let out = Int64Chunked::from_iter_options(
4559 name.as_str().into(),
4560 ca.into_iter().map(|opt_b| {
4561 opt_b.map(|b| b.iter().map(|&byte| byte.count_ones() as i64).sum::<i64>())
4562 }),
4563 );
4564 Ok(Some(Column::new(name, out.into_series())))
4565}
4566
4567pub fn apply_bitmap_construct_agg(column: Column) -> PolarsResult<Option<Column>> {
4569 let name = column.field().into_owned().name;
4570 let series = column.take_materialized_series();
4571 let list_ca = series
4572 .list()
4573 .map_err(|e| PolarsError::ComputeError(format!("bitmap_construct_agg: {e}").into()))?;
4574 let out: BinaryChunked = list_ca
4575 .amortized_iter()
4576 .map(|opt_list| {
4577 opt_list.and_then(|list_series| {
4578 let mut buf = vec![0u8; BITMAP_BYTES];
4579 let ca = list_series.as_ref().i64().ok()?;
4580 for pos in ca.into_iter().flatten() {
4581 let pos = pos as usize;
4582 if pos < BITMAP_BYTES * 8 {
4583 let byte_idx = pos / 8;
4584 let bit_idx = pos % 8;
4585 buf[byte_idx] |= 1 << bit_idx;
4586 }
4587 }
4588 Some(bytes::Bytes::from(buf))
4589 })
4590 })
4591 .collect();
4592 Ok(Some(Column::new(name, out.into_series())))
4593}
4594
4595pub fn apply_bitmap_or_agg(column: Column) -> PolarsResult<Option<Column>> {
4597 let name = column.field().into_owned().name;
4598 let series = column.take_materialized_series();
4599 let list_ca = series
4600 .list()
4601 .map_err(|e| PolarsError::ComputeError(format!("bitmap_or_agg: {e}").into()))?;
4602 let out: BinaryChunked = list_ca
4603 .amortized_iter()
4604 .map(|opt_list| {
4605 opt_list.and_then(|list_series| {
4606 let list_c = list_series.as_ref().as_list();
4607 let mut buf = vec![0u8; BITMAP_BYTES];
4608 for opt_bin in list_c.amortized_iter().flatten() {
4609 let bin_ca: &BinaryChunked = opt_bin.as_ref().binary().ok()?;
4610 for b in bin_ca.into_iter().flatten() {
4611 let b: &[u8] = b;
4612 for (i, &byte) in b.iter().take(BITMAP_BYTES).enumerate() {
4613 buf[i] |= byte;
4614 }
4615 }
4616 }
4617 Some(bytes::Bytes::from(buf))
4618 })
4619 })
4620 .collect();
4621 Ok(Some(Column::new(name, out.into_series())))
4622}
4623
4624pub fn apply_to_timestamp_ltz_format(
4628 column: Column,
4629 format: Option<&str>,
4630 strict: bool,
4631) -> PolarsResult<Option<Column>> {
4632 use chrono::offset::TimeZone;
4633 use chrono::{Local, NaiveDateTime, Utc};
4634 use polars::datatypes::TimeUnit;
4635 let chrono_fmt = format
4636 .map(pyspark_format_to_chrono)
4637 .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
4638 let name = column.field().into_owned().name;
4639 let series = column.take_materialized_series();
4640 let ca = series
4641 .str()
4642 .map_err(|e| PolarsError::ComputeError(format!("to_timestamp_ltz: {e}").into()))?;
4643 let out = Int64Chunked::from_iter_options(
4644 name.as_str().into(),
4645 ca.into_iter().map(|opt_s| {
4646 opt_s.and_then(|s| {
4647 NaiveDateTime::parse_from_str(s, &chrono_fmt)
4648 .ok()
4649 .and_then(|ndt| {
4650 Local
4651 .from_local_datetime(&ndt)
4652 .single()
4653 .map(|dt| dt.with_timezone(&Utc).timestamp_micros())
4654 })
4655 })
4656 }),
4657 );
4658 let out_series = out
4659 .into_series()
4660 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
4661 let _ = strict;
4662 Ok(Some(Column::new(name, out_series)))
4663}
4664
4665pub fn apply_to_timestamp_ntz_format(
4667 column: Column,
4668 format: Option<&str>,
4669 strict: bool,
4670) -> PolarsResult<Option<Column>> {
4671 use chrono::NaiveDateTime;
4672 use polars::datatypes::TimeUnit;
4673 let chrono_fmt = format
4674 .map(pyspark_format_to_chrono)
4675 .unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
4676 let name = column.field().into_owned().name;
4677 let series = column.take_materialized_series();
4678 let ca = series
4679 .str()
4680 .map_err(|e| PolarsError::ComputeError(format!("to_timestamp_ntz: {e}").into()))?;
4681 let out = Int64Chunked::from_iter_options(
4682 name.as_str().into(),
4683 ca.into_iter().map(|opt_s| {
4684 opt_s.and_then(|s| {
4685 NaiveDateTime::parse_from_str(s, &chrono_fmt)
4686 .ok()
4687 .map(|ndt| ndt.and_utc().timestamp_micros())
4688 })
4689 }),
4690 );
4691 let out_series = out
4692 .into_series()
4693 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
4694 let _ = strict;
4695 Ok(Some(Column::new(name, out_series)))
4696}