1use super::types::parse_type_name;
6use crate::column::Column;
7use polars::prelude::*;
8
9pub fn count(col: &Column) -> Column {
11 Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
12}
13
14pub fn sum(col: &Column) -> Column {
16 Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
17}
18
19pub fn avg(col: &Column) -> Column {
21 Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
22}
23
24pub fn mean(col: &Column) -> Column {
26 avg(col)
27}
28
29pub fn max(col: &Column) -> Column {
31 Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
32}
33
34pub fn min(col: &Column) -> Column {
36 Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
37}
38
39pub fn first(col: &Column, ignorenulls: bool) -> Column {
41 let _ = ignorenulls;
42 Column::from_expr(col.expr().clone().first(), None)
43}
44
45pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
47 let _ = ignorenulls;
48 Column::from_expr(col.expr().clone().first(), None)
49}
50
51pub fn count_if(col: &Column) -> Column {
53 use polars::prelude::DataType;
54 Column::from_expr(
55 col.expr().clone().cast(DataType::Int64).sum(),
56 Some("count_if".to_string()),
57 )
58}
59
60pub fn try_sum(col: &Column) -> Column {
62 Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
63}
64
65pub fn try_avg(col: &Column) -> Column {
67 Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
68}
69
70pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
72 use polars::prelude::{SortOptions, as_struct};
73 let st = as_struct(vec![
74 ord_col.expr().clone().alias("_ord"),
75 value_col.expr().clone().alias("_val"),
76 ]);
77 let e = st
78 .sort(SortOptions::default().with_order_descending(true))
79 .first()
80 .struct_()
81 .field_by_name("_val");
82 Column::from_expr(e, None)
83}
84
85pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
87 use polars::prelude::{SortOptions, as_struct};
88 let st = as_struct(vec![
89 ord_col.expr().clone().alias("_ord"),
90 value_col.expr().clone().alias("_val"),
91 ]);
92 let e = st
93 .sort(SortOptions::default())
94 .first()
95 .struct_()
96 .field_by_name("_val");
97 Column::from_expr(e, None)
98}
99
100pub fn collect_list(col: &Column) -> Column {
102 Column::from_expr(
103 col.expr().clone().implode(),
104 Some("collect_list".to_string()),
105 )
106}
107
108pub fn collect_set(col: &Column) -> Column {
110 Column::from_expr(
111 col.expr().clone().unique().implode(),
112 Some("collect_set".to_string()),
113 )
114}
115
116pub fn bool_and(col: &Column) -> Column {
118 Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
119}
120
121pub fn every(col: &Column) -> Column {
123 Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
124}
125
126pub fn stddev(col: &Column) -> Column {
128 Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
129}
130
131pub fn variance(col: &Column) -> Column {
133 Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
134}
135
136pub fn stddev_pop(col: &Column) -> Column {
138 Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
139}
140
141pub fn stddev_samp(col: &Column) -> Column {
143 stddev(col)
144}
145
146pub fn std(col: &Column) -> Column {
148 stddev(col)
149}
150
151pub fn var_pop(col: &Column) -> Column {
153 Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
154}
155
156pub fn var_samp(col: &Column) -> Column {
158 variance(col)
159}
160
161pub fn median(col: &Column) -> Column {
163 use polars::prelude::QuantileMethod;
164 Column::from_expr(
165 col.expr()
166 .clone()
167 .quantile(lit(0.5), QuantileMethod::Linear),
168 Some("median".to_string()),
169 )
170}
171
172pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
174 use polars::prelude::QuantileMethod;
175 Column::from_expr(
176 col.expr()
177 .clone()
178 .quantile(lit(percentage), QuantileMethod::Linear),
179 Some(format!("approx_percentile({percentage})")),
180 )
181}
182
183pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
185 approx_percentile(col, percentage, accuracy)
186}
187
188pub fn mode(col: &Column) -> Column {
190 col.clone().mode()
191}
192
193pub fn count_distinct(col: &Column) -> Column {
195 use polars::prelude::DataType;
196 Column::from_expr(
197 col.expr().clone().n_unique().cast(DataType::Int64),
198 Some("count_distinct".to_string()),
199 )
200}
201
202pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
204 use polars::prelude::DataType;
205 Column::from_expr(
206 col.expr().clone().n_unique().cast(DataType::Int64),
207 Some("approx_count_distinct".to_string()),
208 )
209}
210
211pub fn kurtosis(col: &Column) -> Column {
213 Column::from_expr(
214 col.expr()
215 .clone()
216 .cast(DataType::Float64)
217 .kurtosis(true, true),
218 Some("kurtosis".to_string()),
219 )
220}
221
222pub fn skewness(col: &Column) -> Column {
224 Column::from_expr(
225 col.expr().clone().cast(DataType::Float64).skew(true),
226 Some("skewness".to_string()),
227 )
228}
229
230fn covar_pop_expr_impl(e1: Expr, e2: Expr) -> Expr {
231 use polars::prelude::len;
232 let c1 = e1.clone().cast(DataType::Float64);
233 let c2 = e2.clone().cast(DataType::Float64);
234 let n = len().cast(DataType::Float64);
235 let sum_ab = (c1.clone() * c2.clone()).sum();
236 let sum_a = e1.sum().cast(DataType::Float64);
237 let sum_b = e2.sum().cast(DataType::Float64);
238 (sum_ab - sum_a * sum_b / n.clone()) / n
239}
240
241fn corr_expr_impl(e1: Expr, e2: Expr) -> Expr {
242 use polars::prelude::{len, lit, when};
243 let c1 = e1.clone().cast(DataType::Float64);
244 let c2 = e2.clone().cast(DataType::Float64);
245 let n = len().cast(DataType::Float64);
246 let n1 = (len() - lit(1)).cast(DataType::Float64);
247 let sum_ab = (c1.clone() * c2.clone()).sum();
248 let sum_a = e1.sum().cast(DataType::Float64);
249 let sum_b = e2.sum().cast(DataType::Float64);
250 let sum_a2 = (c1.clone() * c1).sum();
251 let sum_b2 = (c2.clone() * c2).sum();
252 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
253 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
254 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
255 let std_a = var_a.sqrt();
256 let std_b = var_b.sqrt();
257 when(len().gt(lit(1)))
258 .then(cov_samp / (std_a * std_b))
259 .otherwise(lit(f64::NAN))
260}
261
262fn covar_samp_expr_impl(e1: Expr, e2: Expr) -> Expr {
263 use polars::prelude::{len, lit, when};
264 let c1 = e1.clone().cast(DataType::Float64);
265 let c2 = e2.clone().cast(DataType::Float64);
266 let n = len().cast(DataType::Float64);
267 let sum_ab = (c1.clone() * c2.clone()).sum();
268 let sum_a = e1.sum().cast(DataType::Float64);
269 let sum_b = e2.sum().cast(DataType::Float64);
270 when(len().gt(lit(1)))
271 .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
272 .otherwise(lit(f64::NAN))
273}
274
275pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
277 use polars::prelude::col as pl_col;
278 covar_pop_expr_impl(pl_col(col1), pl_col(col2))
279}
280
281pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
283 Column::from_expr(
284 covar_pop_expr_impl(col1.expr().clone(), col2.expr().clone()),
285 Some("covar_pop".to_string()),
286 )
287}
288
289pub fn corr(col1: &Column, col2: &Column) -> Column {
291 Column::from_expr(
292 corr_expr_impl(col1.expr().clone(), col2.expr().clone()),
293 Some("corr".to_string()),
294 )
295}
296
297pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
299 use polars::prelude::col as pl_col;
300 covar_samp_expr_impl(pl_col(col1), pl_col(col2))
301}
302
303pub fn corr_expr(col1: &str, col2: &str) -> Expr {
305 use polars::prelude::col as pl_col;
306 corr_expr_impl(pl_col(col1), pl_col(col2))
307}
308
309fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312 use polars::prelude::col as pl_col;
313 let y = pl_col(y_col).cast(DataType::Float64);
314 let x = pl_col(x_col).cast(DataType::Float64);
315 let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316 let n = y
317 .clone()
318 .filter(cond.clone())
319 .count()
320 .cast(DataType::Float64);
321 let sum_x = x.clone().filter(cond.clone()).sum();
322 let sum_y = y.clone().filter(cond.clone()).sum();
323 let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324 let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325 let sum_xy = (x * y).filter(cond).sum();
326 (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331 let (n, ..) = regr_cond_and_sums(y_col, x_col);
332 n
333}
334
335pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337 use polars::prelude::{lit, when};
338 let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339 when(n.clone().gt(lit(0.0)))
340 .then(sum_x / n)
341 .otherwise(lit(f64::NAN))
342}
343
344pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346 use polars::prelude::{lit, when};
347 let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348 when(n.clone().gt(lit(0.0)))
349 .then(sum_y / n)
350 .otherwise(lit(f64::NAN))
351}
352
353pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355 use polars::prelude::{lit, when};
356 let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357 when(n.clone().gt(lit(0.0)))
358 .then(sum_xx - sum_x.clone() * sum_x / n)
359 .otherwise(lit(f64::NAN))
360}
361
362pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364 use polars::prelude::{lit, when};
365 let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366 when(n.clone().gt(lit(0.0)))
367 .then(sum_yy - sum_y.clone() * sum_y / n)
368 .otherwise(lit(f64::NAN))
369}
370
371pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373 use polars::prelude::{lit, when};
374 let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375 when(n.clone().gt(lit(0.0)))
376 .then(sum_xy - sum_x * sum_y / n)
377 .otherwise(lit(f64::NAN))
378}
379
380pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382 use polars::prelude::{lit, when};
383 let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384 let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385 let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387 .then(regr_sxy / regr_sxx)
388 .otherwise(lit(f64::NAN))
389}
390
391pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393 use polars::prelude::{lit, when};
394 let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396 let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397 let slope = regr_sxy.clone() / regr_sxx.clone();
398 let avg_y = sum_y / n.clone();
399 let avg_x = sum_x / n.clone();
400 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401 .then(avg_y - slope * avg_x)
402 .otherwise(lit(f64::NAN))
403}
404
405pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407 use polars::prelude::{lit, when};
408 let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410 let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411 let regr_sxy = sum_xy - sum_x * sum_y / n;
412 when(
413 regr_sxx
414 .clone()
415 .gt(lit(0.0))
416 .and(regr_syy.clone().gt(lit(0.0))),
417 )
418 .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419 .otherwise(lit(f64::NAN))
420}
421
422pub fn when(condition: &Column) -> WhenBuilder {
434 WhenBuilder::new(condition.expr().clone())
435}
436
437pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439 use polars::prelude::*;
440 let null_expr = lit(NULL);
441 let expr = polars::prelude::when(condition.expr().clone())
442 .then(value.expr().clone())
443 .otherwise(null_expr);
444 crate::column::Column::from_expr(expr, None)
445}
446
447pub struct WhenBuilder {
449 condition: Expr,
450}
451
452impl WhenBuilder {
453 fn new(condition: Expr) -> Self {
454 WhenBuilder { condition }
455 }
456
457 pub fn then(self, value: &Column) -> ThenBuilder {
459 use polars::prelude::*;
460 let when_then = when(self.condition).then(value.expr().clone());
461 ThenBuilder::new(when_then)
462 }
463
464 pub fn otherwise(self, _value: &Column) -> Column {
469 panic!(
472 "when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)"
473 );
474 }
475}
476
477pub struct ThenBuilder {
479 state: WhenThenState,
480}
481
482enum WhenThenState {
483 Single(Box<polars::prelude::Then>),
484 Chained(Box<polars::prelude::ChainedThen>),
485}
486
487pub struct ChainedWhenBuilder {
489 inner: polars::prelude::ChainedWhen,
490}
491
492impl ThenBuilder {
493 fn new(when_then: polars::prelude::Then) -> Self {
494 ThenBuilder {
495 state: WhenThenState::Single(Box::new(when_then)),
496 }
497 }
498
499 fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
500 ThenBuilder {
501 state: WhenThenState::Chained(Box::new(chained)),
502 }
503 }
504
505 pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
507 let chained_when = match self.state {
508 WhenThenState::Single(t) => t.when(condition.expr().clone()),
509 WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
510 };
511 ChainedWhenBuilder {
512 inner: chained_when,
513 }
514 }
515
516 pub fn otherwise(self, value: &Column) -> Column {
518 let expr = match self.state {
519 WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
520 WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
521 };
522 crate::column::Column::from_expr(expr, None)
523 }
524}
525
526impl ChainedWhenBuilder {
527 pub fn then(self, value: &Column) -> ThenBuilder {
529 ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
530 }
531}
532
533pub fn upper(column: &Column) -> Column {
535 column.clone().upper()
536}
537
538pub fn lower(column: &Column) -> Column {
540 column.clone().lower()
541}
542
543pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
545 column.clone().substr(start, length)
546}
547
548pub fn length(column: &Column) -> Column {
550 column.clone().length()
551}
552
553pub fn trim(column: &Column) -> Column {
555 column.clone().trim()
556}
557
558pub fn ltrim(column: &Column) -> Column {
560 column.clone().ltrim()
561}
562
563pub fn rtrim(column: &Column) -> Column {
565 column.clone().rtrim()
566}
567
568pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
570 column.clone().btrim(trim_str)
571}
572
573pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
575 column.clone().locate(substr, pos)
576}
577
578pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
580 column.clone().conv(from_base, to_base)
581}
582
583pub fn hex(column: &Column) -> Column {
585 column.clone().hex()
586}
587
588pub fn unhex(column: &Column) -> Column {
590 column.clone().unhex()
591}
592
593pub fn encode(column: &Column, charset: &str) -> Column {
595 column.clone().encode(charset)
596}
597
598pub fn decode(column: &Column, charset: &str) -> Column {
600 column.clone().decode(charset)
601}
602
603pub fn to_binary(column: &Column, fmt: &str) -> Column {
605 column.clone().to_binary(fmt)
606}
607
608pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
610 column.clone().try_to_binary(fmt)
611}
612
613pub fn aes_encrypt(column: &Column, key: &str) -> Column {
615 column.clone().aes_encrypt(key)
616}
617
618pub fn aes_decrypt(column: &Column, key: &str) -> Column {
620 column.clone().aes_decrypt(key)
621}
622
623pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
625 column.clone().try_aes_decrypt(key)
626}
627
628pub fn bin(column: &Column) -> Column {
630 column.clone().bin()
631}
632
633pub fn getbit(column: &Column, pos: i64) -> Column {
635 column.clone().getbit(pos)
636}
637
638pub fn bit_and(left: &Column, right: &Column) -> Column {
640 left.clone().bit_and(right)
641}
642
643pub fn bit_or(left: &Column, right: &Column) -> Column {
645 left.clone().bit_or(right)
646}
647
648pub fn bit_xor(left: &Column, right: &Column) -> Column {
650 left.clone().bit_xor(right)
651}
652
653pub fn bit_count(column: &Column) -> Column {
655 column.clone().bit_count()
656}
657
658pub fn bitwise_not(column: &Column) -> Column {
660 column.clone().bitwise_not()
661}
662
663pub fn bitmap_bit_position(column: &Column) -> Column {
667 use polars::prelude::DataType;
668 let expr = column.expr().clone().cast(DataType::Int32);
669 Column::from_expr(expr, None)
670}
671
672pub fn bitmap_bucket_number(column: &Column) -> Column {
674 use polars::prelude::DataType;
675 let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
676 Column::from_expr(expr, None)
677}
678
679pub fn bitmap_count(column: &Column) -> Column {
681 use polars::prelude::{DataType, Field};
682 let expr = column.expr().clone().map(
683 |s| crate::column::expect_col(crate::udfs::apply_bitmap_count(s)),
684 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
685 );
686 Column::from_expr(expr, None)
687}
688
689pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
692 use polars::prelude::{DataType, Field};
693 column.expr().clone().implode().map(
694 |s| crate::column::expect_col(crate::udfs::apply_bitmap_construct_agg(s)),
695 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
696 )
697}
698
699pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
701 use polars::prelude::{DataType, Field};
702 column.expr().clone().implode().map(
703 |s| crate::column::expect_col(crate::udfs::apply_bitmap_or_agg(s)),
704 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
705 )
706}
707
708pub fn bit_get(column: &Column, pos: i64) -> Column {
710 getbit(column, pos)
711}
712
713pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
716 column.clone().assert_true(err_msg)
717}
718
719pub fn raise_error(message: &str) -> Column {
721 let msg = message.to_string();
722 let expr = lit(0i64).map(
723 move |_col| -> PolarsResult<polars::prelude::Column> {
724 Err(PolarsError::ComputeError(msg.clone().into()))
725 },
726 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
727 );
728 Column::from_expr(expr, Some("raise_error".to_string()))
729}
730
731pub fn spark_partition_id() -> Column {
733 Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
734}
735
736pub fn input_file_name() -> Column {
738 Column::from_expr(lit(""), Some("input_file_name".to_string()))
739}
740
741pub fn monotonically_increasing_id() -> Column {
744 Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
745}
746
747pub fn current_catalog() -> Column {
749 Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
750}
751
752pub fn current_database() -> Column {
754 Column::from_expr(lit("default"), Some("current_database".to_string()))
755}
756
757pub fn current_schema() -> Column {
759 Column::from_expr(lit("default"), Some("current_schema".to_string()))
760}
761
762pub fn current_user() -> Column {
764 Column::from_expr(lit("unknown"), Some("current_user".to_string()))
765}
766
767pub fn user() -> Column {
769 Column::from_expr(lit("unknown"), Some("user".to_string()))
770}
771
772pub fn rand(seed: Option<u64>) -> Column {
775 Column::from_rand(seed)
776}
777
778pub fn randn(seed: Option<u64>) -> Column {
781 Column::from_randn(seed)
782}
783
784pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
787 use polars::prelude::Column as PlColumn;
788
789 let (registry, case_sensitive) =
790 crate::udf_context::get_thread_udf_context().ok_or_else(|| {
791 PolarsError::InvalidOperation(
792 "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
793 )
794 })?;
795
796 let udf = registry
798 .get_rust_udf(name, case_sensitive)?
799 .ok_or_else(|| {
800 PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
801 })?;
802
803 let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
804 let output_type = DataType::String; let expr = if exprs.len() == 1 {
807 let udf = udf.clone();
808 exprs.into_iter().next().unwrap().map(
809 move |c| {
810 let s = c.take_materialized_series();
811 udf.apply(&[s]).map(|out| PlColumn::new("_".into(), out))
812 },
813 move |_schema, field| Ok(Field::new(field.name().clone(), output_type.clone())),
814 )
815 } else {
816 let udf = udf.clone();
817 let first = exprs[0].clone();
818 let rest: Vec<Expr> = exprs[1..].to_vec();
819 first.map_many(
820 move |columns| {
821 let series: Vec<Series> = columns
822 .iter_mut()
823 .map(|c| std::mem::take(c).take_materialized_series())
824 .collect();
825 udf.apply(&series).map(|out| PlColumn::new("_".into(), out))
826 },
827 &rest,
828 move |_schema, fields| Ok(Field::new(fields[0].name().clone(), output_type.clone())),
829 )
830 };
831
832 Ok(Column::from_expr(expr, Some(format!("{name}()"))))
833}
834
835pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
837 left.clone().arrays_overlap(right)
838}
839
840pub fn arrays_zip(left: &Column, right: &Column) -> Column {
842 left.clone().arrays_zip(right)
843}
844
845pub fn explode_outer(column: &Column) -> Column {
847 column.clone().explode_outer()
848}
849
850pub fn posexplode_outer(column: &Column) -> (Column, Column) {
852 column.clone().posexplode_outer()
853}
854
855pub fn array_agg(column: &Column) -> Column {
857 column.clone().array_agg()
858}
859
860pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
862 column.clone().transform_keys(key_expr)
863}
864
865pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
867 column.clone().transform_values(value_expr)
868}
869
870pub fn str_to_map(
872 column: &Column,
873 pair_delim: Option<&str>,
874 key_value_delim: Option<&str>,
875) -> Column {
876 let pd = pair_delim.unwrap_or(",");
877 let kvd = key_value_delim.unwrap_or(":");
878 column.clone().str_to_map(pd, kvd)
879}
880
881pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
883 column.clone().regexp_extract(pattern, group_index)
884}
885
886pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
888 column.clone().regexp_replace(pattern, replacement)
889}
890
891pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
893 column.clone().split(delimiter, limit)
894}
895
896pub fn initcap(column: &Column) -> Column {
898 column.clone().initcap()
899}
900
901pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
903 column.clone().regexp_extract_all(pattern)
904}
905
906pub fn regexp_like(column: &Column, pattern: &str) -> Column {
908 column.clone().regexp_like(pattern)
909}
910
911pub fn regexp_count(column: &Column, pattern: &str) -> Column {
913 column.clone().regexp_count(pattern)
914}
915
916pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
918 column.clone().regexp_substr(pattern)
919}
920
921pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
923 column.clone().split_part(delimiter, part_num)
924}
925
926pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
928 column.clone().regexp_instr(pattern, group_idx)
929}
930
931pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
933 str_column.clone().find_in_set(set_column)
934}
935
936pub fn format_string(format: &str, columns: &[&Column]) -> Column {
938 use polars::prelude::*;
939 if columns.is_empty() {
940 panic!("format_string needs at least one column");
941 }
942 let format_owned = format.to_string();
943 let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
944 let expr = columns[0].expr().clone().map_many(
945 move |cols| {
946 crate::column::expect_col(crate::udfs::apply_format_string(cols, &format_owned))
947 },
948 &args,
949 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::String)),
950 );
951 crate::column::Column::from_expr(expr, None)
952}
953
954pub fn printf(format: &str, columns: &[&Column]) -> Column {
956 format_string(format, columns)
957}
958
959pub fn repeat(column: &Column, n: i32) -> Column {
961 column.clone().repeat(n)
962}
963
964pub fn reverse(column: &Column) -> Column {
966 column.clone().reverse()
967}
968
969pub fn instr(column: &Column, substr: &str) -> Column {
971 column.clone().instr(substr)
972}
973
974pub fn position(substr: &str, column: &Column) -> Column {
976 column.clone().instr(substr)
977}
978
979pub fn ascii(column: &Column) -> Column {
981 column.clone().ascii()
982}
983
984pub fn format_number(column: &Column, decimals: u32) -> Column {
986 column.clone().format_number(decimals)
987}
988
989pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
991 column.clone().overlay(replace, pos, length)
992}
993
994pub fn char(column: &Column) -> Column {
996 column.clone().char()
997}
998
999pub fn chr(column: &Column) -> Column {
1001 column.clone().chr()
1002}
1003
1004pub fn base64(column: &Column) -> Column {
1006 column.clone().base64()
1007}
1008
1009pub fn unbase64(column: &Column) -> Column {
1011 column.clone().unbase64()
1012}
1013
1014pub fn sha1(column: &Column) -> Column {
1016 column.clone().sha1()
1017}
1018
1019pub fn sha2(column: &Column, bit_length: i32) -> Column {
1021 column.clone().sha2(bit_length)
1022}
1023
1024pub fn md5(column: &Column) -> Column {
1026 column.clone().md5()
1027}
1028
1029pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1031 column.clone().lpad(length, pad)
1032}
1033
1034pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1036 column.clone().rpad(length, pad)
1037}
1038
1039pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1041 column.clone().translate(from_str, to_str)
1042}
1043
1044pub fn mask(
1046 column: &Column,
1047 upper_char: Option<char>,
1048 lower_char: Option<char>,
1049 digit_char: Option<char>,
1050 other_char: Option<char>,
1051) -> Column {
1052 column
1053 .clone()
1054 .mask(upper_char, lower_char, digit_char, other_char)
1055}
1056
1057pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1059 column.clone().substring_index(delimiter, count)
1060}
1061
1062pub fn left(column: &Column, n: i64) -> Column {
1064 column.clone().left(n)
1065}
1066
1067pub fn right(column: &Column, n: i64) -> Column {
1069 column.clone().right(n)
1070}
1071
1072pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1074 column.clone().replace(search, replacement)
1075}
1076
1077pub fn startswith(column: &Column, prefix: &str) -> Column {
1079 column.clone().startswith(prefix)
1080}
1081
1082pub fn endswith(column: &Column, suffix: &str) -> Column {
1084 column.clone().endswith(suffix)
1085}
1086
1087pub fn contains(column: &Column, substring: &str) -> Column {
1089 column.clone().contains(substring)
1090}
1091
1092pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1095 column.clone().like(pattern, escape_char)
1096}
1097
1098pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1101 column.clone().ilike(pattern, escape_char)
1102}
1103
1104pub fn rlike(column: &Column, pattern: &str) -> Column {
1106 column.clone().regexp_like(pattern)
1107}
1108
1109pub fn regexp(column: &Column, pattern: &str) -> Column {
1111 rlike(column, pattern)
1112}
1113
1114pub fn soundex(column: &Column) -> Column {
1116 column.clone().soundex()
1117}
1118
1119pub fn levenshtein(column: &Column, other: &Column) -> Column {
1121 column.clone().levenshtein(other)
1122}
1123
1124pub fn crc32(column: &Column) -> Column {
1126 column.clone().crc32()
1127}
1128
1129pub fn xxhash64(column: &Column) -> Column {
1131 column.clone().xxhash64()
1132}
1133
1134pub fn abs(column: &Column) -> Column {
1136 column.clone().abs()
1137}
1138
1139pub fn ceil(column: &Column) -> Column {
1141 column.clone().ceil()
1142}
1143
1144pub fn floor(column: &Column) -> Column {
1146 column.clone().floor()
1147}
1148
1149pub fn round(column: &Column, decimals: u32) -> Column {
1151 column.clone().round(decimals)
1152}
1153
1154pub fn bround(column: &Column, scale: i32) -> Column {
1156 column.clone().bround(scale)
1157}
1158
1159pub fn negate(column: &Column) -> Column {
1161 column.clone().negate()
1162}
1163
1164pub fn negative(column: &Column) -> Column {
1166 negate(column)
1167}
1168
1169pub fn positive(column: &Column) -> Column {
1171 column.clone()
1172}
1173
1174pub fn cot(column: &Column) -> Column {
1176 column.clone().cot()
1177}
1178
1179pub fn csc(column: &Column) -> Column {
1181 column.clone().csc()
1182}
1183
1184pub fn sec(column: &Column) -> Column {
1186 column.clone().sec()
1187}
1188
1189pub fn e() -> Column {
1191 Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1192}
1193
1194pub fn pi() -> Column {
1196 Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1197}
1198
1199pub fn sqrt(column: &Column) -> Column {
1201 column.clone().sqrt()
1202}
1203
1204pub fn pow(column: &Column, exp: i64) -> Column {
1206 column.clone().pow(exp)
1207}
1208
1209pub fn exp(column: &Column) -> Column {
1211 column.clone().exp()
1212}
1213
1214pub fn log(column: &Column) -> Column {
1216 column.clone().log()
1217}
1218
1219pub fn log_with_base(column: &Column, base: f64) -> Column {
1221 crate::column::Column::from_expr(column.expr().clone().log(lit(base)), None)
1222}
1223
1224pub fn sin(column: &Column) -> Column {
1226 column.clone().sin()
1227}
1228
1229pub fn cos(column: &Column) -> Column {
1231 column.clone().cos()
1232}
1233
1234pub fn tan(column: &Column) -> Column {
1236 column.clone().tan()
1237}
1238
1239pub fn asin(column: &Column) -> Column {
1241 column.clone().asin()
1242}
1243
1244pub fn acos(column: &Column) -> Column {
1246 column.clone().acos()
1247}
1248
1249pub fn atan(column: &Column) -> Column {
1251 column.clone().atan()
1252}
1253
1254pub fn atan2(y: &Column, x: &Column) -> Column {
1256 y.clone().atan2(x)
1257}
1258
1259pub fn degrees(column: &Column) -> Column {
1261 column.clone().degrees()
1262}
1263
1264pub fn radians(column: &Column) -> Column {
1266 column.clone().radians()
1267}
1268
1269pub fn signum(column: &Column) -> Column {
1271 column.clone().signum()
1272}
1273
1274pub fn sign(column: &Column) -> Column {
1276 signum(column)
1277}
1278
1279fn cast_impl(column: &Column, type_name: &str, strict: bool) -> Result<Column, String> {
1280 let dtype = parse_type_name(type_name)?;
1281 if dtype == DataType::Boolean {
1282 let expr = column.expr().clone().map(
1283 move |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, strict)),
1284 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1285 );
1286 return Ok(Column::from_expr(expr, None));
1287 }
1288 if dtype == DataType::Date {
1289 let expr = column.expr().clone().map(
1290 move |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, strict)),
1291 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1292 );
1293 return Ok(Column::from_expr(expr, None));
1294 }
1295 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1296 let target = dtype.clone();
1297 let expr = column.expr().clone().map(
1298 move |col| {
1299 crate::column::expect_col(crate::udfs::apply_string_to_int(
1300 col,
1301 strict,
1302 target.clone(),
1303 ))
1304 },
1305 move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1306 );
1307 return Ok(Column::from_expr(expr, None));
1308 }
1309 if dtype == DataType::Float64 {
1310 let expr = column.expr().clone().map(
1311 move |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, strict)),
1312 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1313 );
1314 return Ok(Column::from_expr(expr, None));
1315 }
1316 let expr = if strict {
1317 column.expr().clone().strict_cast(dtype)
1318 } else {
1319 column.expr().clone().cast(dtype)
1320 };
1321 Ok(Column::from_expr(expr, None))
1322}
1323
1324pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1328 cast_impl(column, type_name, true)
1329}
1330
1331pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1335 cast_impl(column, type_name, false)
1336}
1337
1338pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1342 match format {
1343 Some(fmt) => Ok(column
1344 .clone()
1345 .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1346 None => cast(column, "string"),
1347 }
1348}
1349
1350pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1352 to_char(column, format)
1353}
1354
1355pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1358 cast(column, "double")
1359}
1360
1361pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1364 try_cast(column, "double")
1365}
1366
1367pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1370 use polars::prelude::{DataType, Field, TimeUnit};
1371 let fmt_owned = format.map(|s| s.to_string());
1372 let expr = column.expr().clone().map(
1373 move |s| {
1374 crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1375 s,
1376 fmt_owned.as_deref(),
1377 true,
1378 ))
1379 },
1380 |_schema, field| {
1381 Ok(Field::new(
1382 field.name().clone(),
1383 DataType::Datetime(TimeUnit::Microseconds, None),
1384 ))
1385 },
1386 );
1387 Ok(crate::column::Column::from_expr(expr, None))
1388}
1389
1390pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1393 use polars::prelude::*;
1394 let fmt_owned = format.map(|s| s.to_string());
1395 let expr = column.expr().clone().map(
1396 move |s| {
1397 crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1398 s,
1399 fmt_owned.as_deref(),
1400 false,
1401 ))
1402 },
1403 |_schema, field| {
1404 Ok(Field::new(
1405 field.name().clone(),
1406 DataType::Datetime(TimeUnit::Microseconds, None),
1407 ))
1408 },
1409 );
1410 Ok(crate::column::Column::from_expr(expr, None))
1411}
1412
1413pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1415 use polars::prelude::{DataType, Field, TimeUnit};
1416 match format {
1417 None => crate::cast(column, "timestamp"),
1418 Some(fmt) => {
1419 let fmt_owned = fmt.to_string();
1420 let expr = column.expr().clone().map(
1421 move |s| {
1422 crate::column::expect_col(crate::udfs::apply_to_timestamp_ltz_format(
1423 s,
1424 Some(&fmt_owned),
1425 true,
1426 ))
1427 },
1428 |_schema, field| {
1429 Ok(Field::new(
1430 field.name().clone(),
1431 DataType::Datetime(TimeUnit::Microseconds, None),
1432 ))
1433 },
1434 );
1435 Ok(crate::column::Column::from_expr(expr, None))
1436 }
1437 }
1438}
1439
1440pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1442 use polars::prelude::{DataType, Field, TimeUnit};
1443 match format {
1444 None => crate::cast(column, "timestamp"),
1445 Some(fmt) => {
1446 let fmt_owned = fmt.to_string();
1447 let expr = column.expr().clone().map(
1448 move |s| {
1449 crate::column::expect_col(crate::udfs::apply_to_timestamp_ntz_format(
1450 s,
1451 Some(&fmt_owned),
1452 true,
1453 ))
1454 },
1455 |_schema, field| {
1456 Ok(Field::new(
1457 field.name().clone(),
1458 DataType::Datetime(TimeUnit::Microseconds, None),
1459 ))
1460 },
1461 );
1462 Ok(crate::column::Column::from_expr(expr, None))
1463 }
1464 }
1465}
1466
1467pub fn try_divide(left: &Column, right: &Column) -> Column {
1469 use polars::prelude::*;
1470 let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1471 let null_expr = lit(NULL);
1472 let div_expr =
1473 left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1474 let expr = polars::prelude::when(zero_cond)
1475 .then(null_expr)
1476 .otherwise(div_expr);
1477 crate::column::Column::from_expr(expr, None)
1478}
1479
1480pub fn try_add(left: &Column, right: &Column) -> Column {
1482 let args = [right.expr().clone()];
1483 let expr = left.expr().clone().map_many(
1484 |cols| crate::column::expect_col(crate::udfs::apply_try_add(cols)),
1485 &args,
1486 |_schema, fields| Ok(fields[0].clone()),
1487 );
1488 Column::from_expr(expr, None)
1489}
1490
1491pub fn try_subtract(left: &Column, right: &Column) -> Column {
1493 let args = [right.expr().clone()];
1494 let expr = left.expr().clone().map_many(
1495 |cols| crate::column::expect_col(crate::udfs::apply_try_subtract(cols)),
1496 &args,
1497 |_schema, fields| Ok(fields[0].clone()),
1498 );
1499 Column::from_expr(expr, None)
1500}
1501
1502pub fn try_multiply(left: &Column, right: &Column) -> Column {
1504 let args = [right.expr().clone()];
1505 let expr = left.expr().clone().map_many(
1506 |cols| crate::column::expect_col(crate::udfs::apply_try_multiply(cols)),
1507 &args,
1508 |_schema, fields| Ok(fields[0].clone()),
1509 );
1510 Column::from_expr(expr, None)
1511}
1512
1513pub fn try_element_at(column: &Column, index: i64) -> Column {
1515 column.clone().element_at(index)
1516}
1517
1518pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1520 if num_bucket <= 0 {
1521 panic!(
1522 "width_bucket: num_bucket must be positive, got {}",
1523 num_bucket
1524 );
1525 }
1526 use polars::prelude::*;
1527 let v = value.expr().clone().cast(DataType::Float64);
1528 let min_expr = lit(min_val);
1529 let max_expr = lit(max_val);
1530 let nb = num_bucket as f64;
1531 let width = (max_val - min_val) / nb;
1532 let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1533 let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1534 let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1535 let expr = polars::prelude::when(v.clone().lt(min_expr))
1536 .then(lit(0i64))
1537 .when(v.gt_eq(max_expr))
1538 .then(lit(num_bucket + 1))
1539 .otherwise(bucket_clamped);
1540 crate::column::Column::from_expr(expr, None)
1541}
1542
1543pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1546 use polars::prelude::*;
1547 if columns.is_empty() {
1548 panic!("elt requires at least one column");
1549 }
1550 let idx_expr = index.expr().clone();
1551 let null_expr = lit(NULL);
1552 let mut expr = null_expr;
1553 for (i, c) in columns.iter().enumerate().rev() {
1554 let n = (i + 1) as i64;
1555 expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1556 .then(c.expr().clone())
1557 .otherwise(expr);
1558 }
1559 crate::column::Column::from_expr(expr, None)
1560}
1561
1562pub fn bit_length(column: &Column) -> Column {
1564 column.clone().bit_length()
1565}
1566
1567pub fn octet_length(column: &Column) -> Column {
1569 column.clone().octet_length()
1570}
1571
1572pub fn char_length(column: &Column) -> Column {
1574 column.clone().char_length()
1575}
1576
1577pub fn character_length(column: &Column) -> Column {
1579 column.clone().character_length()
1580}
1581
1582pub fn typeof_(column: &Column) -> Column {
1584 column.clone().typeof_()
1585}
1586
1587pub fn isnan(column: &Column) -> Column {
1589 column.clone().is_nan()
1590}
1591
1592pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1594 if columns.is_empty() {
1595 return Err("greatest requires at least one column".to_string());
1596 }
1597 if columns.len() == 1 {
1598 return Ok((*columns[0]).clone());
1599 }
1600 let mut expr = columns[0].expr().clone();
1601 for c in columns.iter().skip(1) {
1602 let args = [c.expr().clone()];
1603 expr = expr.map_many(
1604 |cols| crate::column::expect_col(crate::udfs::apply_greatest2(cols)),
1605 &args,
1606 |_schema, fields| Ok(fields[0].clone()),
1607 );
1608 }
1609 Ok(Column::from_expr(expr, None))
1610}
1611
1612pub fn least(columns: &[&Column]) -> Result<Column, String> {
1614 if columns.is_empty() {
1615 return Err("least requires at least one column".to_string());
1616 }
1617 if columns.len() == 1 {
1618 return Ok((*columns[0]).clone());
1619 }
1620 let mut expr = columns[0].expr().clone();
1621 for c in columns.iter().skip(1) {
1622 let args = [c.expr().clone()];
1623 expr = expr.map_many(
1624 |cols| crate::column::expect_col(crate::udfs::apply_least2(cols)),
1625 &args,
1626 |_schema, fields| Ok(fields[0].clone()),
1627 );
1628 }
1629 Ok(Column::from_expr(expr, None))
1630}
1631
1632pub fn year(column: &Column) -> Column {
1634 column.clone().year()
1635}
1636
1637pub fn month(column: &Column) -> Column {
1639 column.clone().month()
1640}
1641
1642pub fn day(column: &Column) -> Column {
1644 column.clone().day()
1645}
1646
1647pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1649 let fmt = format.map(|s| s.to_string());
1650 let expr = column.expr().clone().map(
1651 move |col| {
1652 crate::column::expect_col(crate::udfs::apply_string_to_date_format(
1653 col,
1654 fmt.as_deref(),
1655 false,
1656 ))
1657 },
1658 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1659 );
1660 Ok(Column::from_expr(expr, None))
1661}
1662
1663pub fn date_format(column: &Column, format: &str) -> Column {
1665 column
1666 .clone()
1667 .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1668}
1669
1670pub fn current_date() -> Column {
1672 use polars::prelude::*;
1673 let today = chrono::Utc::now().date_naive();
1674 let days = (today - robin_sparkless_core::date_utils::epoch_naive_date()).num_days() as i32;
1675 crate::column::Column::from_expr(
1676 Expr::Literal(LiteralValue::Scalar(Scalar::new_date(days))),
1677 None,
1678 )
1679}
1680
1681pub fn current_timestamp() -> Column {
1683 use polars::prelude::*;
1684 let ts = chrono::Utc::now().timestamp_micros();
1685 crate::column::Column::from_expr(
1686 Expr::Literal(LiteralValue::Scalar(Scalar::new_datetime(
1687 ts,
1688 TimeUnit::Microseconds,
1689 None,
1690 ))),
1691 None,
1692 )
1693}
1694
1695pub fn curdate() -> Column {
1697 current_date()
1698}
1699
1700pub fn now() -> Column {
1702 current_timestamp()
1703}
1704
1705pub fn localtimestamp() -> Column {
1707 current_timestamp()
1708}
1709
1710pub fn date_diff(end: &Column, start: &Column) -> Column {
1712 datediff(end, start)
1713}
1714
1715pub fn dateadd(column: &Column, n: i32) -> Column {
1717 date_add(column, n)
1718}
1719
1720pub fn extract(column: &Column, field: &str) -> Column {
1722 column.clone().extract(field)
1723}
1724
1725pub fn date_part(column: &Column, field: &str) -> Column {
1727 extract(column, field)
1728}
1729
1730pub fn datepart(column: &Column, field: &str) -> Column {
1732 extract(column, field)
1733}
1734
1735pub fn unix_micros(column: &Column) -> Column {
1737 column.clone().unix_micros()
1738}
1739
1740pub fn unix_millis(column: &Column) -> Column {
1742 column.clone().unix_millis()
1743}
1744
1745pub fn unix_seconds(column: &Column) -> Column {
1747 column.clone().unix_seconds()
1748}
1749
1750pub fn dayname(column: &Column) -> Column {
1752 column.clone().dayname()
1753}
1754
1755pub fn weekday(column: &Column) -> Column {
1757 column.clone().weekday()
1758}
1759
1760pub fn hour(column: &Column) -> Column {
1762 column.clone().hour()
1763}
1764
1765pub fn minute(column: &Column) -> Column {
1767 column.clone().minute()
1768}
1769
1770pub fn second(column: &Column) -> Column {
1772 column.clone().second()
1773}
1774
1775pub fn date_add(column: &Column, n: i32) -> Column {
1777 column.clone().date_add(n)
1778}
1779
1780pub fn date_sub(column: &Column, n: i32) -> Column {
1782 column.clone().date_sub(n)
1783}
1784
1785pub fn datediff(end: &Column, start: &Column) -> Column {
1787 start.clone().datediff(end)
1788}
1789
1790pub fn last_day(column: &Column) -> Column {
1792 column.clone().last_day()
1793}
1794
1795pub fn trunc(column: &Column, format: &str) -> Column {
1797 column.clone().trunc(format)
1798}
1799
1800pub fn date_trunc(format: &str, column: &Column) -> Column {
1802 trunc(column, format)
1803}
1804
1805pub fn quarter(column: &Column) -> Column {
1807 column.clone().quarter()
1808}
1809
1810pub fn weekofyear(column: &Column) -> Column {
1812 column.clone().weekofyear()
1813}
1814
1815pub fn dayofweek(column: &Column) -> Column {
1817 column.clone().dayofweek()
1818}
1819
1820pub fn dayofyear(column: &Column) -> Column {
1822 column.clone().dayofyear()
1823}
1824
1825pub fn add_months(column: &Column, n: i32) -> Column {
1827 column.clone().add_months(n)
1828}
1829
1830pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1833 end.clone().months_between(start, round_off)
1834}
1835
1836pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1838 column.clone().next_day(day_of_week)
1839}
1840
1841pub fn unix_timestamp_now() -> Column {
1843 use polars::prelude::*;
1844 let secs = chrono::Utc::now().timestamp();
1845 crate::column::Column::from_expr(lit(secs), None)
1846}
1847
1848pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1850 column.clone().unix_timestamp(format)
1851}
1852
1853pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1855 unix_timestamp(column, format)
1856}
1857
1858pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1860 column.clone().from_unixtime(format)
1861}
1862
1863pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1865 use polars::prelude::*;
1866 let args = [month.expr().clone(), day.expr().clone()];
1867 let expr = year.expr().clone().map_many(
1868 |cols| crate::column::expect_col(crate::udfs::apply_make_date(cols)),
1869 &args,
1870 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Date)),
1871 );
1872 crate::column::Column::from_expr(expr, None)
1873}
1874
1875pub fn make_timestamp(
1878 year: &Column,
1879 month: &Column,
1880 day: &Column,
1881 hour: &Column,
1882 minute: &Column,
1883 sec: &Column,
1884 timezone: Option<&str>,
1885) -> Column {
1886 use polars::prelude::*;
1887 let tz_owned = timezone.map(|s| s.to_string());
1888 let args = [
1889 month.expr().clone(),
1890 day.expr().clone(),
1891 hour.expr().clone(),
1892 minute.expr().clone(),
1893 sec.expr().clone(),
1894 ];
1895 let expr = year.expr().clone().map_many(
1896 move |cols| {
1897 crate::column::expect_col(crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()))
1898 },
1899 &args,
1900 |_schema, fields| {
1901 Ok(Field::new(
1902 fields[0].name().clone(),
1903 DataType::Datetime(TimeUnit::Microseconds, None),
1904 ))
1905 },
1906 );
1907 crate::column::Column::from_expr(expr, None)
1908}
1909
1910pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1912 ts.clone().timestampadd(unit, amount)
1913}
1914
1915pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1917 start.clone().timestampdiff(unit, end)
1918}
1919
1920pub fn days(n: i64) -> Column {
1922 make_interval(0, 0, 0, n, 0, 0, 0)
1923}
1924
1925pub fn hours(n: i64) -> Column {
1927 make_interval(0, 0, 0, 0, n, 0, 0)
1928}
1929
1930pub fn minutes(n: i64) -> Column {
1932 make_interval(0, 0, 0, 0, 0, n, 0)
1933}
1934
1935pub fn months(n: i64) -> Column {
1937 make_interval(0, n, 0, 0, 0, 0, 0)
1938}
1939
1940pub fn years(n: i64) -> Column {
1942 make_interval(n, 0, 0, 0, 0, 0, 0)
1943}
1944
1945pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1947 column.clone().from_utc_timestamp(tz)
1948}
1949
1950pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1952 column.clone().to_utc_timestamp(tz)
1953}
1954
1955pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1957 let source_tz = source_tz.to_string();
1958 let target_tz = target_tz.to_string();
1959 let expr = column.expr().clone().map(
1960 move |s| {
1961 crate::column::expect_col(crate::udfs::apply_convert_timezone(
1962 s, &source_tz, &target_tz,
1963 ))
1964 },
1965 |_schema, field| Ok(field.clone()),
1966 );
1967 crate::column::Column::from_expr(expr, None)
1968}
1969
1970pub fn current_timezone() -> Column {
1972 use polars::prelude::*;
1973 crate::column::Column::from_expr(lit("UTC"), None)
1974}
1975
1976pub fn make_interval(
1978 years: i64,
1979 months: i64,
1980 weeks: i64,
1981 days: i64,
1982 hours: i64,
1983 mins: i64,
1984 secs: i64,
1985) -> Column {
1986 use polars::prelude::*;
1987 let total_days = years * 365 + months * 30 + weeks * 7 + days;
1989 let args = DurationArgs::new()
1990 .with_days(lit(total_days))
1991 .with_hours(lit(hours))
1992 .with_minutes(lit(mins))
1993 .with_seconds(lit(secs));
1994 let dur = duration(args);
1995 crate::column::Column::from_expr(dur, None)
1996}
1997
1998pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2000 use polars::prelude::*;
2001 let args = DurationArgs::new()
2002 .with_days(lit(days))
2003 .with_hours(lit(hours))
2004 .with_minutes(lit(minutes))
2005 .with_seconds(lit(seconds));
2006 let dur = duration(args);
2007 crate::column::Column::from_expr(dur, None)
2008}
2009
2010pub fn make_ym_interval(years: i32, months: i32) -> Column {
2012 use polars::prelude::*;
2013 let total_months = years * 12 + months;
2014 crate::column::Column::from_expr(lit(total_months), None)
2015}
2016
2017pub fn make_timestamp_ntz(
2019 year: &Column,
2020 month: &Column,
2021 day: &Column,
2022 hour: &Column,
2023 minute: &Column,
2024 sec: &Column,
2025) -> Column {
2026 make_timestamp(year, month, day, hour, minute, sec, None)
2027}
2028
2029pub fn timestamp_seconds(column: &Column) -> Column {
2031 column.clone().timestamp_seconds()
2032}
2033
2034pub fn timestamp_millis(column: &Column) -> Column {
2036 column.clone().timestamp_millis()
2037}
2038
2039pub fn timestamp_micros(column: &Column) -> Column {
2041 column.clone().timestamp_micros()
2042}
2043
2044pub fn unix_date(column: &Column) -> Column {
2046 column.clone().unix_date()
2047}
2048
2049pub fn date_from_unix_date(column: &Column) -> Column {
2051 column.clone().date_from_unix_date()
2052}
2053
2054pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2056 dividend.clone().pmod(divisor)
2057}
2058
2059pub fn factorial(column: &Column) -> Column {
2061 column.clone().factorial()
2062}
2063
2064pub fn concat(columns: &[&Column]) -> Column {
2067 use polars::prelude::*;
2068 if columns.is_empty() {
2069 panic!("concat requires at least one column");
2070 }
2071 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2072 crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2073}
2074
2075pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2078 use polars::prelude::*;
2079 if columns.is_empty() {
2080 panic!("concat_ws requires at least one column");
2081 }
2082 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2083 crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2084}
2085
2086pub fn row_number(column: &Column) -> Column {
2096 column.clone().row_number(false)
2097}
2098
2099pub fn rank(column: &Column, descending: bool) -> Column {
2101 column.clone().rank(descending)
2102}
2103
2104pub fn dense_rank(column: &Column, descending: bool) -> Column {
2106 column.clone().dense_rank(descending)
2107}
2108
2109pub fn lag(column: &Column, n: i64) -> Column {
2111 column.clone().lag(n)
2112}
2113
2114pub fn lead(column: &Column, n: i64) -> Column {
2116 column.clone().lead(n)
2117}
2118
2119pub fn first_value(column: &Column) -> Column {
2121 column.clone().first_value()
2122}
2123
2124pub fn last_value(column: &Column) -> Column {
2126 column.clone().last_value()
2127}
2128
2129pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2131 column.clone().percent_rank(partition_by, descending)
2132}
2133
2134pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2136 column.clone().cume_dist(partition_by, descending)
2137}
2138
2139pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2141 column.clone().ntile(n, partition_by, descending)
2142}
2143
2144pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2146 column.clone().nth_value(n, partition_by, descending)
2147}
2148
2149pub fn coalesce(columns: &[&Column]) -> Column {
2160 use polars::prelude::*;
2161 if columns.is_empty() {
2162 panic!("coalesce requires at least one column");
2163 }
2164 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2165 let expr = coalesce(&exprs);
2166 crate::column::Column::from_expr(expr, None)
2167}
2168
2169pub fn nvl(column: &Column, value: &Column) -> Column {
2171 coalesce(&[column, value])
2172}
2173
2174pub fn ifnull(column: &Column, value: &Column) -> Column {
2176 nvl(column, value)
2177}
2178
2179pub fn nullif(column: &Column, value: &Column) -> Column {
2181 use polars::prelude::*;
2182 let cond = column.expr().clone().eq(value.expr().clone());
2183 let null_lit = lit(NULL);
2184 let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2185 crate::column::Column::from_expr(expr, None)
2186}
2187
2188pub fn nanvl(column: &Column, value: &Column) -> Column {
2190 use polars::prelude::*;
2191 let cond = column.expr().clone().is_nan();
2192 let expr = when(cond)
2193 .then(value.expr().clone())
2194 .otherwise(column.expr().clone());
2195 crate::column::Column::from_expr(expr, None)
2196}
2197
2198pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2200 use polars::prelude::*;
2201 let cond = col1.expr().clone().is_not_null();
2202 let expr = when(cond)
2203 .then(col2.expr().clone())
2204 .otherwise(col3.expr().clone());
2205 crate::column::Column::from_expr(expr, None)
2206}
2207
2208pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2210 substring(column, start, length)
2211}
2212
2213pub fn power(column: &Column, exp: i64) -> Column {
2215 pow(column, exp)
2216}
2217
2218pub fn ln(column: &Column) -> Column {
2220 log(column)
2221}
2222
2223pub fn ceiling(column: &Column) -> Column {
2225 ceil(column)
2226}
2227
2228pub fn lcase(column: &Column) -> Column {
2230 lower(column)
2231}
2232
2233pub fn ucase(column: &Column) -> Column {
2235 upper(column)
2236}
2237
2238pub fn dayofmonth(column: &Column) -> Column {
2240 day(column)
2241}
2242
2243pub fn to_degrees(column: &Column) -> Column {
2245 degrees(column)
2246}
2247
2248pub fn to_radians(column: &Column) -> Column {
2250 radians(column)
2251}
2252
2253pub fn cosh(column: &Column) -> Column {
2255 column.clone().cosh()
2256}
2257pub fn sinh(column: &Column) -> Column {
2259 column.clone().sinh()
2260}
2261pub fn tanh(column: &Column) -> Column {
2263 column.clone().tanh()
2264}
2265pub fn acosh(column: &Column) -> Column {
2267 column.clone().acosh()
2268}
2269pub fn asinh(column: &Column) -> Column {
2271 column.clone().asinh()
2272}
2273pub fn atanh(column: &Column) -> Column {
2275 column.clone().atanh()
2276}
2277pub fn cbrt(column: &Column) -> Column {
2279 column.clone().cbrt()
2280}
2281pub fn expm1(column: &Column) -> Column {
2283 column.clone().expm1()
2284}
2285pub fn log1p(column: &Column) -> Column {
2287 column.clone().log1p()
2288}
2289pub fn log10(column: &Column) -> Column {
2291 column.clone().log10()
2292}
2293pub fn log2(column: &Column) -> Column {
2295 column.clone().log2()
2296}
2297pub fn rint(column: &Column) -> Column {
2299 column.clone().rint()
2300}
2301pub fn hypot(x: &Column, y: &Column) -> Column {
2303 let xx = x.expr().clone() * x.expr().clone();
2304 let yy = y.expr().clone() * y.expr().clone();
2305 crate::column::Column::from_expr((xx + yy).sqrt(), None)
2306}
2307
2308pub fn isnull(column: &Column) -> Column {
2310 column.clone().is_null()
2311}
2312
2313pub fn isnotnull(column: &Column) -> Column {
2315 column.clone().is_not_null()
2316}
2317
2318pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2321 use polars::prelude::*;
2322 if columns.is_empty() {
2323 let empty_inner = Series::new("".into(), Vec::<i64>::new());
2326 let list_series = ListChunked::from_iter([Some(empty_inner)])
2327 .with_name("array".into())
2328 .into_series();
2329 let expr = lit(list_series).first();
2330 return Ok(crate::column::Column::from_expr(expr, None));
2331 }
2332 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2333 let expr = concat_list(exprs)
2334 .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2335 Ok(crate::column::Column::from_expr(expr, None))
2336}
2337
2338pub fn array_size(column: &Column) -> Column {
2340 column.clone().array_size()
2341}
2342
2343pub fn size(column: &Column) -> Column {
2345 column.clone().array_size()
2346}
2347
2348pub fn cardinality(column: &Column) -> Column {
2350 column.clone().cardinality()
2351}
2352
2353pub fn array_contains(column: &Column, value: &Column) -> Column {
2355 column.clone().array_contains(value.expr().clone())
2356}
2357
2358pub fn array_join(column: &Column, separator: &str) -> Column {
2360 column.clone().array_join(separator)
2361}
2362
2363pub fn array_max(column: &Column) -> Column {
2365 column.clone().array_max()
2366}
2367
2368pub fn array_min(column: &Column) -> Column {
2370 column.clone().array_min()
2371}
2372
2373pub fn element_at(column: &Column, index: i64) -> Column {
2375 column.clone().element_at(index)
2376}
2377
2378pub fn array_sort(column: &Column) -> Column {
2380 column.clone().array_sort()
2381}
2382
2383pub fn array_distinct(column: &Column) -> Column {
2385 column.clone().array_distinct()
2386}
2387
2388pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2390 column.clone().array_slice(start, length)
2391}
2392
2393pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2396 use polars::prelude::{DataType, Field, as_struct, lit};
2397 let step_expr = step
2398 .map(|c| c.expr().clone().alias("2"))
2399 .unwrap_or_else(|| lit(1i64).alias("2"));
2400 let struct_expr = as_struct(vec![
2401 start.expr().clone().alias("0"),
2402 stop.expr().clone().alias("1"),
2403 step_expr,
2404 ]);
2405 let out_dtype = DataType::List(Box::new(DataType::Int64));
2406 let expr = struct_expr.map(
2407 |s| crate::column::expect_col(crate::udfs::apply_sequence(s)),
2408 move |_schema, field| Ok(Field::new(field.name().clone(), out_dtype.clone())),
2409 );
2410 crate::column::Column::from_expr(expr, None)
2411}
2412
2413pub fn shuffle(column: &Column) -> Column {
2415 let expr = column.expr().clone().map(
2416 |s| crate::column::expect_col(crate::udfs::apply_shuffle(s)),
2417 |_schema, field| Ok(field.clone()),
2418 );
2419 crate::column::Column::from_expr(expr, None)
2420}
2421
2422pub fn inline(column: &Column) -> Column {
2425 column.clone().explode()
2426}
2427
2428pub fn inline_outer(column: &Column) -> Column {
2430 column.clone().explode_outer()
2431}
2432
2433pub fn explode(column: &Column) -> Column {
2435 column.clone().explode()
2436}
2437
2438pub fn array_position(column: &Column, value: &Column) -> Column {
2441 column.clone().array_position(value.expr().clone())
2442}
2443
2444pub fn array_compact(column: &Column) -> Column {
2446 column.clone().array_compact()
2447}
2448
2449pub fn array_remove(column: &Column, value: &Column) -> Column {
2452 column.clone().array_remove(value.expr().clone())
2453}
2454
2455pub fn array_repeat(column: &Column, n: i64) -> Column {
2457 column.clone().array_repeat(n)
2458}
2459
2460pub fn array_flatten(column: &Column) -> Column {
2462 column.clone().array_flatten()
2463}
2464
2465pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2467 column.clone().array_exists(predicate)
2468}
2469
2470pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2472 column.clone().array_forall(predicate)
2473}
2474
2475pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2477 column.clone().array_filter(predicate)
2478}
2479
2480pub fn array_transform(column: &Column, f: Expr) -> Column {
2482 column.clone().array_transform(f)
2483}
2484
2485pub fn array_sum(column: &Column) -> Column {
2487 column.clone().array_sum()
2488}
2489
2490pub fn aggregate(column: &Column, zero: &Column) -> Column {
2492 column.clone().array_aggregate(zero)
2493}
2494
2495pub fn array_mean(column: &Column) -> Column {
2497 column.clone().array_mean()
2498}
2499
2500pub fn posexplode(column: &Column) -> (Column, Column) {
2503 column.clone().posexplode()
2504}
2505
2506pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2510 use polars::chunked_array::StructChunked;
2511 use polars::prelude::{IntoSeries, ListChunked, as_struct, concat_list, lit};
2512 if key_values.is_empty() {
2513 let key_s = Series::new("key".into(), Vec::<String>::new());
2515 let value_s = Series::new("value".into(), Vec::<String>::new());
2516 let fields: [&Series; 2] = [&key_s, &value_s];
2517 let empty_struct = StructChunked::from_series(
2518 polars::prelude::PlSmallStr::EMPTY,
2519 0,
2520 fields.iter().copied(),
2521 )
2522 .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2523 .into_series();
2524 let list_series = ListChunked::from_iter([Some(empty_struct)])
2525 .with_name("create_map".into())
2526 .into_series();
2527 let expr = lit(list_series).first();
2528 return Ok(crate::column::Column::from_expr(expr, None));
2529 }
2530 let mut struct_exprs: Vec<Expr> = Vec::new();
2531 for i in (0..key_values.len()).step_by(2) {
2532 if i + 1 < key_values.len() {
2533 let k = key_values[i].expr().clone().alias("key");
2534 let v = key_values[i + 1].expr().clone().alias("value");
2535 struct_exprs.push(as_struct(vec![k, v]));
2536 }
2537 }
2538 let expr = concat_list(struct_exprs)
2539 .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2540 Ok(crate::column::Column::from_expr(expr, None))
2541}
2542
2543pub fn map_keys(column: &Column) -> Column {
2545 column.clone().map_keys()
2546}
2547
2548pub fn map_values(column: &Column) -> Column {
2550 column.clone().map_values()
2551}
2552
2553pub fn map_entries(column: &Column) -> Column {
2555 column.clone().map_entries()
2556}
2557
2558pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2560 keys.clone().map_from_arrays(values)
2561}
2562
2563pub fn map_concat(a: &Column, b: &Column) -> Column {
2565 a.clone().map_concat(b)
2566}
2567
2568pub fn map_from_entries(column: &Column) -> Column {
2570 column.clone().map_from_entries()
2571}
2572
2573pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2575 map_col.clone().map_contains_key(key)
2576}
2577
2578pub fn get(map_col: &Column, key: &Column) -> Column {
2580 map_col.clone().get(key)
2581}
2582
2583pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2585 map_col.clone().map_filter(predicate)
2586}
2587
2588pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2590 map1.clone().map_zip_with(map2, merge)
2591}
2592
2593pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2595 use polars::prelude::col;
2596 let left_field = col("").struct_().field_by_name("left");
2597 let right_field = col("").struct_().field_by_name("right");
2598 let merge = crate::column::Column::from_expr(
2599 coalesce(&[
2600 &crate::column::Column::from_expr(left_field, None),
2601 &crate::column::Column::from_expr(right_field, None),
2602 ])
2603 .into_expr(),
2604 None,
2605 );
2606 left.clone().zip_with(right, merge.into_expr())
2607}
2608
2609pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2611 use polars::prelude::col;
2612 let v1 = col("").struct_().field_by_name("value1");
2613 let v2 = col("").struct_().field_by_name("value2");
2614 let merge = coalesce(&[
2615 &crate::column::Column::from_expr(v1, None),
2616 &crate::column::Column::from_expr(v2, None),
2617 ])
2618 .into_expr();
2619 map1.clone().map_zip_with(map2, merge)
2620}
2621
2622pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2624 use polars::prelude::{col, lit};
2625 let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2626 map_col.clone().map_filter(pred)
2627}
2628
2629pub fn struct_(columns: &[&Column]) -> Column {
2632 use polars::prelude::as_struct;
2633 if columns.is_empty() {
2634 panic!("struct requires at least one column");
2635 }
2636 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2637 crate::column::Column::from_expr(as_struct(exprs), None)
2638}
2639
2640pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2643 use polars::prelude::as_struct;
2644 if pairs.is_empty() {
2645 panic!("named_struct requires at least one (name, column) pair");
2646 }
2647 let exprs: Vec<Expr> = pairs
2648 .iter()
2649 .map(|(name, col)| col.expr().clone().alias(*name))
2650 .collect();
2651 crate::column::Column::from_expr(as_struct(exprs), None)
2652}
2653
2654pub fn array_append(array: &Column, elem: &Column) -> Column {
2656 array.clone().array_append(elem)
2657}
2658
2659pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2661 array.clone().array_prepend(elem)
2662}
2663
2664pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2666 array.clone().array_insert(pos, elem)
2667}
2668
2669pub fn array_except(a: &Column, b: &Column) -> Column {
2671 a.clone().array_except(b)
2672}
2673
2674pub fn array_intersect(a: &Column, b: &Column) -> Column {
2676 a.clone().array_intersect(b)
2677}
2678
2679pub fn array_union(a: &Column, b: &Column) -> Column {
2681 a.clone().array_union(b)
2682}
2683
2684pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2686 left.clone().zip_with(right, merge)
2687}
2688
2689pub fn get_json_object(column: &Column, path: &str) -> Column {
2691 column.clone().get_json_object(path)
2692}
2693
2694pub fn json_object_keys(column: &Column) -> Column {
2696 column.clone().json_object_keys()
2697}
2698
2699pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2701 column.clone().json_tuple(keys)
2702}
2703
2704pub fn from_csv(column: &Column) -> Column {
2706 column.clone().from_csv()
2707}
2708
2709pub fn to_csv(column: &Column) -> Column {
2711 column.clone().to_csv()
2712}
2713
2714pub fn schema_of_csv(_column: &Column) -> Column {
2716 Column::from_expr(
2717 lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2718 Some("schema_of_csv".to_string()),
2719 )
2720}
2721
2722pub fn schema_of_json(_column: &Column) -> Column {
2724 Column::from_expr(
2725 lit("STRUCT<>".to_string()),
2726 Some("schema_of_json".to_string()),
2727 )
2728}
2729
2730pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2732 column.clone().from_json(schema)
2733}
2734
2735pub fn to_json(column: &Column) -> Column {
2737 column.clone().to_json()
2738}
2739
2740pub fn isin(column: &Column, other: &Column) -> Column {
2742 column.clone().isin(other)
2743}
2744
2745pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2747 let s = Series::from_iter(values.iter().cloned());
2748 Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2749}
2750
2751pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2753 let s: Series = Series::from_iter(values.iter().copied());
2754 Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2755}
2756
2757pub fn url_decode(column: &Column) -> Column {
2759 column.clone().url_decode()
2760}
2761
2762pub fn url_encode(column: &Column) -> Column {
2764 column.clone().url_encode()
2765}
2766
2767pub fn shift_left(column: &Column, n: i32) -> Column {
2769 column.clone().shift_left(n)
2770}
2771
2772pub fn shift_right(column: &Column, n: i32) -> Column {
2774 column.clone().shift_right(n)
2775}
2776
2777pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2779 column.clone().shift_right_unsigned(n)
2780}
2781
2782pub fn version() -> Column {
2784 Column::from_expr(
2785 lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2786 None,
2787 )
2788}
2789
2790pub fn equal_null(left: &Column, right: &Column) -> Column {
2792 left.clone().eq_null_safe(right)
2793}
2794
2795pub fn json_array_length(column: &Column, path: &str) -> Column {
2797 column.clone().json_array_length(path)
2798}
2799
2800pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2803 column.clone().parse_url(part, key)
2804}
2805
2806pub fn hash(columns: &[&Column]) -> Column {
2808 use polars::prelude::*;
2809 if columns.is_empty() {
2810 return crate::column::Column::from_expr(lit(0i64), None);
2811 }
2812 if columns.len() == 1 {
2813 return columns[0].clone().hash();
2814 }
2815 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2816 let struct_expr = polars::prelude::as_struct(exprs);
2817 let name = columns[0].name().to_string();
2818 let expr = struct_expr.map(
2819 |s| crate::column::expect_col(crate::udfs::apply_hash_struct(s)),
2820 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2821 );
2822 crate::column::Column::from_expr(expr, Some(name))
2823}
2824
2825pub fn stack(columns: &[&Column]) -> Column {
2827 struct_(columns)
2828}
2829
2830#[cfg(test)]
2831mod tests {
2832 use super::*;
2833 use crate::functions::{col, lit_bool, lit_f64, lit_i32, lit_i64, lit_str};
2834 use polars::prelude::{IntoLazy, df};
2835
2836 #[test]
2837 fn test_col_creates_column() {
2838 let column = col("test");
2839 assert_eq!(column.name(), "test");
2840 }
2841
2842 #[test]
2843 fn test_lit_i32() {
2844 let column = lit_i32(42);
2845 assert_eq!(column.name(), "<expr>");
2847 }
2848
2849 #[test]
2850 fn test_lit_i64() {
2851 let column = lit_i64(123456789012345i64);
2852 assert_eq!(column.name(), "<expr>");
2853 }
2854
2855 #[test]
2856 fn test_lit_f64() {
2857 let column = lit_f64(std::f64::consts::PI);
2858 assert_eq!(column.name(), "<expr>");
2859 }
2860
2861 #[test]
2862 fn test_lit_bool() {
2863 let column = lit_bool(true);
2864 assert_eq!(column.name(), "<expr>");
2865 }
2866
2867 #[test]
2868 fn test_lit_str() {
2869 let column = lit_str("hello");
2870 assert_eq!(column.name(), "<expr>");
2871 }
2872
2873 #[test]
2874 fn test_create_map_empty() {
2875 let empty_col = create_map(&[]).unwrap();
2877 let df = df!("id" => &[1i64, 2i64]).unwrap();
2878 let out = df
2879 .lazy()
2880 .with_columns([empty_col.into_expr().alias("m")])
2881 .collect()
2882 .unwrap();
2883 assert_eq!(out.height(), 2);
2884 let m = out.column("m").unwrap();
2885 assert_eq!(m.len(), 2);
2886 let list = m.list().unwrap();
2887 for i in 0..2 {
2888 let row = list.get(i).unwrap();
2889 assert_eq!(row.len(), 0);
2890 }
2891 }
2892
2893 #[test]
2894 fn test_count_aggregation() {
2895 let column = col("value");
2896 let result = count(&column);
2897 assert_eq!(result.name(), "count");
2898 }
2899
2900 #[test]
2901 fn test_sum_aggregation() {
2902 let column = col("value");
2903 let result = sum(&column);
2904 assert_eq!(result.name(), "sum");
2905 }
2906
2907 #[test]
2908 fn test_avg_aggregation() {
2909 let column = col("value");
2910 let result = avg(&column);
2911 assert_eq!(result.name(), "avg");
2912 }
2913
2914 #[test]
2915 fn test_max_aggregation() {
2916 let column = col("value");
2917 let result = max(&column);
2918 assert_eq!(result.name(), "max");
2919 }
2920
2921 #[test]
2922 fn test_min_aggregation() {
2923 let column = col("value");
2924 let result = min(&column);
2925 assert_eq!(result.name(), "min");
2926 }
2927
2928 #[test]
2929 fn test_when_then_otherwise() {
2930 let df = df!(
2932 "age" => &[15, 25, 35]
2933 )
2934 .unwrap();
2935
2936 let age_col = col("age");
2938 let condition = age_col.gt(polars::prelude::lit(18));
2939 let result = when(&condition)
2940 .then(&lit_str("adult"))
2941 .otherwise(&lit_str("minor"));
2942
2943 let result_df = df
2945 .lazy()
2946 .with_column(result.into_expr().alias("status"))
2947 .collect()
2948 .unwrap();
2949
2950 let status_col = result_df.column("status").unwrap();
2952 let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2953
2954 assert_eq!(values[0], Some("minor")); assert_eq!(values[1], Some("adult")); assert_eq!(values[2], Some("adult")); }
2958
2959 #[test]
2960 fn test_coalesce_returns_first_non_null() {
2961 let df = df!(
2963 "a" => &[Some(1), None, None],
2964 "b" => &[None, Some(2), None],
2965 "c" => &[None, None, Some(3)]
2966 )
2967 .unwrap();
2968
2969 let col_a = col("a");
2970 let col_b = col("b");
2971 let col_c = col("c");
2972 let result = coalesce(&[&col_a, &col_b, &col_c]);
2973
2974 let result_df = df
2976 .lazy()
2977 .with_column(result.into_expr().alias("coalesced"))
2978 .collect()
2979 .unwrap();
2980
2981 let coalesced_col = result_df.column("coalesced").unwrap();
2983 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2984
2985 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(2)); assert_eq!(values[2], Some(3)); }
2989
2990 #[test]
2991 fn test_coalesce_with_literal_fallback() {
2992 let df = df!(
2994 "a" => &[Some(1), None],
2995 "b" => &[None::<i32>, None::<i32>]
2996 )
2997 .unwrap();
2998
2999 let col_a = col("a");
3000 let col_b = col("b");
3001 let fallback = lit_i32(0);
3002 let result = coalesce(&[&col_a, &col_b, &fallback]);
3003
3004 let result_df = df
3006 .lazy()
3007 .with_column(result.into_expr().alias("coalesced"))
3008 .collect()
3009 .unwrap();
3010
3011 let coalesced_col = result_df.column("coalesced").unwrap();
3013 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3014
3015 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(0)); }
3018
3019 #[test]
3020 #[should_panic(expected = "coalesce requires at least one column")]
3021 fn test_coalesce_empty_panics() {
3022 let columns: [&Column; 0] = [];
3023 let _ = coalesce(&columns);
3024 }
3025
3026 #[test]
3027 fn test_cast_double_string_column_strict_ok() {
3028 let df = df!(
3030 "s" => &["123", " 45.5 ", "0"]
3031 )
3032 .unwrap();
3033
3034 let s_col = col("s");
3035 let cast_col = cast(&s_col, "double").unwrap();
3036
3037 let out = df
3038 .lazy()
3039 .with_column(cast_col.into_expr().alias("v"))
3040 .collect()
3041 .unwrap();
3042
3043 let v = out.column("v").unwrap();
3044 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3045 assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3046 }
3047
3048 #[test]
3049 fn test_try_cast_double_string_column_invalid_to_null() {
3050 let df = df!(
3052 "s" => &["123", " 45.5 ", "abc", ""]
3053 )
3054 .unwrap();
3055
3056 let s_col = col("s");
3057 let try_cast_col = try_cast(&s_col, "double").unwrap();
3058
3059 let out = df
3060 .lazy()
3061 .with_column(try_cast_col.into_expr().alias("v"))
3062 .collect()
3063 .unwrap();
3064
3065 let v = out.column("v").unwrap();
3066 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3067 assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3068 }
3069
3070 #[test]
3071 fn test_to_number_and_try_to_number_numerics_and_strings() {
3072 let df = df!(
3074 "i" => &[1i32, 2, 3],
3075 "f" => &[1.5f64, 2.5, 3.5],
3076 "s" => &["10", "20.5", "xyz"]
3077 )
3078 .unwrap();
3079
3080 let i_col = col("i");
3081 let f_col = col("f");
3082 let s_col = col("s");
3083
3084 let to_number_i = to_number(&i_col, None).unwrap();
3085 let to_number_f = to_number(&f_col, None).unwrap();
3086 let try_to_number_s = try_to_number(&s_col, None).unwrap();
3087
3088 let out = df
3089 .lazy()
3090 .with_columns([
3091 to_number_i.into_expr().alias("i_num"),
3092 to_number_f.into_expr().alias("f_num"),
3093 try_to_number_s.into_expr().alias("s_num"),
3094 ])
3095 .collect()
3096 .unwrap();
3097
3098 let i_num = out.column("i_num").unwrap();
3099 let f_num = out.column("f_num").unwrap();
3100 let s_num = out.column("s_num").unwrap();
3101
3102 let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3103 let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3104 let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3105
3106 assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3107 assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3108 assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3109 }
3110}