1use crate::column::Column;
2use polars::prelude::*;
3
4#[derive(Debug, Clone)]
9pub struct SortOrder {
10 pub(crate) expr: Expr,
11 pub descending: bool,
12 pub nulls_last: bool,
13}
14
15impl SortOrder {
16 pub fn expr(&self) -> &Expr {
17 &self.expr
18 }
19}
20
21pub fn asc(column: &Column) -> SortOrder {
23 SortOrder {
24 expr: column.expr().clone(),
25 descending: false,
26 nulls_last: false,
27 }
28}
29
30pub fn asc_nulls_first(column: &Column) -> SortOrder {
32 SortOrder {
33 expr: column.expr().clone(),
34 descending: false,
35 nulls_last: false,
36 }
37}
38
39pub fn asc_nulls_last(column: &Column) -> SortOrder {
41 SortOrder {
42 expr: column.expr().clone(),
43 descending: false,
44 nulls_last: true,
45 }
46}
47
48pub fn desc(column: &Column) -> SortOrder {
50 SortOrder {
51 expr: column.expr().clone(),
52 descending: true,
53 nulls_last: true,
54 }
55}
56
57pub fn desc_nulls_first(column: &Column) -> SortOrder {
59 SortOrder {
60 expr: column.expr().clone(),
61 descending: true,
62 nulls_last: false,
63 }
64}
65
66pub fn desc_nulls_last(column: &Column) -> SortOrder {
68 SortOrder {
69 expr: column.expr().clone(),
70 descending: true,
71 nulls_last: true,
72 }
73}
74
75pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80 let s = name.trim().to_lowercase();
81 if s.starts_with("decimal(") && s.contains(')') {
82 return Ok(DataType::Float64);
83 }
84 Ok(match s.as_str() {
85 "int" | "integer" => DataType::Int32,
86 "long" | "bigint" => DataType::Int64,
87 "float" => DataType::Float32,
88 "double" => DataType::Float64,
89 "string" | "str" => DataType::String,
90 "boolean" | "bool" => DataType::Boolean,
91 "date" => DataType::Date,
92 "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
93 _ => return Err(format!("unknown type name: {name}")),
94 })
95}
96
97pub fn col(name: &str) -> Column {
99 Column::new(name.to_string())
100}
101
102pub fn grouping(column: &Column) -> Column {
104 let _ = column;
105 Column::from_expr(lit(0i32), Some("grouping".to_string()))
106}
107
108pub fn grouping_id(_columns: &[Column]) -> Column {
110 Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
111}
112
113pub fn lit_i32(value: i32) -> Column {
115 let expr: Expr = lit(value);
116 Column::from_expr(expr, None)
117}
118
119pub fn lit_i64(value: i64) -> Column {
120 let expr: Expr = lit(value);
121 Column::from_expr(expr, None)
122}
123
124pub fn lit_f64(value: f64) -> Column {
125 let expr: Expr = lit(value);
126 Column::from_expr(expr, None)
127}
128
129pub fn lit_bool(value: bool) -> Column {
130 let expr: Expr = lit(value);
131 Column::from_expr(expr, None)
132}
133
134pub fn lit_str(value: &str) -> Column {
135 let expr: Expr = lit(value);
136 Column::from_expr(expr, None)
137}
138
139pub fn lit_null(dtype: &str) -> Result<Column, String> {
142 Column::lit_null(dtype)
143}
144
145pub fn count(col: &Column) -> Column {
147 Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
148}
149
150pub fn sum(col: &Column) -> Column {
152 Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
153}
154
155pub fn avg(col: &Column) -> Column {
157 Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
158}
159
160pub fn mean(col: &Column) -> Column {
162 avg(col)
163}
164
165pub fn max(col: &Column) -> Column {
167 Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
168}
169
170pub fn min(col: &Column) -> Column {
172 Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
173}
174
175pub fn first(col: &Column, ignorenulls: bool) -> Column {
177 let _ = ignorenulls;
178 Column::from_expr(col.expr().clone().first(), None)
179}
180
181pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
183 let _ = ignorenulls;
184 Column::from_expr(col.expr().clone().first(), None)
185}
186
187pub fn count_if(col: &Column) -> Column {
189 use polars::prelude::DataType;
190 Column::from_expr(
191 col.expr().clone().cast(DataType::Int64).sum(),
192 Some("count_if".to_string()),
193 )
194}
195
196pub fn try_sum(col: &Column) -> Column {
198 Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
199}
200
201pub fn try_avg(col: &Column) -> Column {
203 Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
204}
205
206pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
208 use polars::prelude::{SortOptions, as_struct};
209 let st = as_struct(vec![
210 ord_col.expr().clone().alias("_ord"),
211 value_col.expr().clone().alias("_val"),
212 ]);
213 let e = st
214 .sort(SortOptions::default().with_order_descending(true))
215 .first()
216 .struct_()
217 .field_by_name("_val");
218 Column::from_expr(e, None)
219}
220
221pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
223 use polars::prelude::{SortOptions, as_struct};
224 let st = as_struct(vec![
225 ord_col.expr().clone().alias("_ord"),
226 value_col.expr().clone().alias("_val"),
227 ]);
228 let e = st
229 .sort(SortOptions::default())
230 .first()
231 .struct_()
232 .field_by_name("_val");
233 Column::from_expr(e, None)
234}
235
236pub fn collect_list(col: &Column) -> Column {
238 Column::from_expr(
239 col.expr().clone().implode(),
240 Some("collect_list".to_string()),
241 )
242}
243
244pub fn collect_set(col: &Column) -> Column {
246 Column::from_expr(
247 col.expr().clone().unique().implode(),
248 Some("collect_set".to_string()),
249 )
250}
251
252pub fn bool_and(col: &Column) -> Column {
254 Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
255}
256
257pub fn every(col: &Column) -> Column {
259 Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
260}
261
262pub fn stddev(col: &Column) -> Column {
264 Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
265}
266
267pub fn variance(col: &Column) -> Column {
269 Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
270}
271
272pub fn stddev_pop(col: &Column) -> Column {
274 Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
275}
276
277pub fn stddev_samp(col: &Column) -> Column {
279 stddev(col)
280}
281
282pub fn std(col: &Column) -> Column {
284 stddev(col)
285}
286
287pub fn var_pop(col: &Column) -> Column {
289 Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
290}
291
292pub fn var_samp(col: &Column) -> Column {
294 variance(col)
295}
296
297pub fn median(col: &Column) -> Column {
299 use polars::prelude::QuantileMethod;
300 Column::from_expr(
301 col.expr()
302 .clone()
303 .quantile(lit(0.5), QuantileMethod::Linear),
304 Some("median".to_string()),
305 )
306}
307
308pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
310 use polars::prelude::QuantileMethod;
311 Column::from_expr(
312 col.expr()
313 .clone()
314 .quantile(lit(percentage), QuantileMethod::Linear),
315 Some(format!("approx_percentile({percentage})")),
316 )
317}
318
319pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
321 approx_percentile(col, percentage, accuracy)
322}
323
324pub fn mode(col: &Column) -> Column {
326 col.clone().mode()
327}
328
329pub fn count_distinct(col: &Column) -> Column {
331 use polars::prelude::DataType;
332 Column::from_expr(
333 col.expr().clone().n_unique().cast(DataType::Int64),
334 Some("count_distinct".to_string()),
335 )
336}
337
338pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
340 use polars::prelude::DataType;
341 Column::from_expr(
342 col.expr().clone().n_unique().cast(DataType::Int64),
343 Some("approx_count_distinct".to_string()),
344 )
345}
346
347pub fn kurtosis(col: &Column) -> Column {
349 Column::from_expr(
350 col.expr()
351 .clone()
352 .cast(DataType::Float64)
353 .kurtosis(true, true),
354 Some("kurtosis".to_string()),
355 )
356}
357
358pub fn skewness(col: &Column) -> Column {
360 Column::from_expr(
361 col.expr().clone().cast(DataType::Float64).skew(true),
362 Some("skewness".to_string()),
363 )
364}
365
366pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
368 use polars::prelude::{col as pl_col, len};
369 let c1 = pl_col(col1).cast(DataType::Float64);
370 let c2 = pl_col(col2).cast(DataType::Float64);
371 let n = len().cast(DataType::Float64);
372 let sum_ab = (c1.clone() * c2.clone()).sum();
373 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
374 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
375 (sum_ab - sum_a * sum_b / n.clone()) / n
376}
377
378pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
380 use polars::prelude::len;
381 let c1 = col1.expr().clone().cast(DataType::Float64);
382 let c2 = col2.expr().clone().cast(DataType::Float64);
383 let n = len().cast(DataType::Float64);
384 let sum_ab = (c1.clone() * c2.clone()).sum();
385 let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
386 let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
387 let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
388 Column::from_expr(e, Some("covar_pop".to_string()))
389}
390
391pub fn corr(col1: &Column, col2: &Column) -> Column {
393 use polars::prelude::{len, lit, when};
394 let c1 = col1.expr().clone().cast(DataType::Float64);
395 let c2 = col2.expr().clone().cast(DataType::Float64);
396 let n = len().cast(DataType::Float64);
397 let n1 = (len() - lit(1)).cast(DataType::Float64);
398 let sum_ab = (c1.clone() * c2.clone()).sum();
399 let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
400 let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
401 let sum_a2 = (c1.clone() * c1).sum();
402 let sum_b2 = (c2.clone() * c2).sum();
403 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
404 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
405 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
406 let std_a = var_a.sqrt();
407 let std_b = var_b.sqrt();
408 let e = when(len().gt(lit(1)))
409 .then(cov_samp / (std_a * std_b))
410 .otherwise(lit(f64::NAN));
411 Column::from_expr(e, Some("corr".to_string()))
412}
413
414pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
416 use polars::prelude::{col as pl_col, len, lit, when};
417 let c1 = pl_col(col1).cast(DataType::Float64);
418 let c2 = pl_col(col2).cast(DataType::Float64);
419 let n = len().cast(DataType::Float64);
420 let sum_ab = (c1.clone() * c2.clone()).sum();
421 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
422 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
423 when(len().gt(lit(1)))
424 .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
425 .otherwise(lit(f64::NAN))
426}
427
428pub fn corr_expr(col1: &str, col2: &str) -> Expr {
430 use polars::prelude::{col as pl_col, len, lit, when};
431 let c1 = pl_col(col1).cast(DataType::Float64);
432 let c2 = pl_col(col2).cast(DataType::Float64);
433 let n = len().cast(DataType::Float64);
434 let n1 = (len() - lit(1)).cast(DataType::Float64);
435 let sum_ab = (c1.clone() * c2.clone()).sum();
436 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
437 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
438 let sum_a2 = (c1.clone() * c1).sum();
439 let sum_b2 = (c2.clone() * c2).sum();
440 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
441 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
442 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
443 let std_a = var_a.sqrt();
444 let std_b = var_b.sqrt();
445 when(len().gt(lit(1)))
446 .then(cov_samp / (std_a * std_b))
447 .otherwise(lit(f64::NAN))
448}
449
450fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
453 use polars::prelude::col as pl_col;
454 let y = pl_col(y_col).cast(DataType::Float64);
455 let x = pl_col(x_col).cast(DataType::Float64);
456 let cond = y.clone().is_not_null().and(x.clone().is_not_null());
457 let n = y
458 .clone()
459 .filter(cond.clone())
460 .count()
461 .cast(DataType::Float64);
462 let sum_x = x.clone().filter(cond.clone()).sum();
463 let sum_y = y.clone().filter(cond.clone()).sum();
464 let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
465 let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
466 let sum_xy = (x * y).filter(cond).sum();
467 (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
468}
469
470pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
472 let (n, ..) = regr_cond_and_sums(y_col, x_col);
473 n
474}
475
476pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
478 use polars::prelude::{lit, when};
479 let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
480 when(n.clone().gt(lit(0.0)))
481 .then(sum_x / n)
482 .otherwise(lit(f64::NAN))
483}
484
485pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
487 use polars::prelude::{lit, when};
488 let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
489 when(n.clone().gt(lit(0.0)))
490 .then(sum_y / n)
491 .otherwise(lit(f64::NAN))
492}
493
494pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
496 use polars::prelude::{lit, when};
497 let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
498 when(n.clone().gt(lit(0.0)))
499 .then(sum_xx - sum_x.clone() * sum_x / n)
500 .otherwise(lit(f64::NAN))
501}
502
503pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
505 use polars::prelude::{lit, when};
506 let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
507 when(n.clone().gt(lit(0.0)))
508 .then(sum_yy - sum_y.clone() * sum_y / n)
509 .otherwise(lit(f64::NAN))
510}
511
512pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
514 use polars::prelude::{lit, when};
515 let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
516 when(n.clone().gt(lit(0.0)))
517 .then(sum_xy - sum_x * sum_y / n)
518 .otherwise(lit(f64::NAN))
519}
520
521pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
523 use polars::prelude::{lit, when};
524 let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
525 let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
526 let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
527 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
528 .then(regr_sxy / regr_sxx)
529 .otherwise(lit(f64::NAN))
530}
531
532pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
534 use polars::prelude::{lit, when};
535 let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
536 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
537 let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
538 let slope = regr_sxy.clone() / regr_sxx.clone();
539 let avg_y = sum_y / n.clone();
540 let avg_x = sum_x / n.clone();
541 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
542 .then(avg_y - slope * avg_x)
543 .otherwise(lit(f64::NAN))
544}
545
546pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
548 use polars::prelude::{lit, when};
549 let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
550 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
551 let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
552 let regr_sxy = sum_xy - sum_x * sum_y / n;
553 when(
554 regr_sxx
555 .clone()
556 .gt(lit(0.0))
557 .and(regr_syy.clone().gt(lit(0.0))),
558 )
559 .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
560 .otherwise(lit(f64::NAN))
561}
562
563pub fn when(condition: &Column) -> WhenBuilder {
575 WhenBuilder::new(condition.expr().clone())
576}
577
578pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
580 use polars::prelude::*;
581 let null_expr = lit(NULL);
582 let expr = polars::prelude::when(condition.expr().clone())
583 .then(value.expr().clone())
584 .otherwise(null_expr);
585 crate::column::Column::from_expr(expr, None)
586}
587
588pub struct WhenBuilder {
590 condition: Expr,
591}
592
593impl WhenBuilder {
594 fn new(condition: Expr) -> Self {
595 WhenBuilder { condition }
596 }
597
598 pub fn then(self, value: &Column) -> ThenBuilder {
600 use polars::prelude::*;
601 let when_then = when(self.condition).then(value.expr().clone());
602 ThenBuilder::new(when_then)
603 }
604
605 pub fn otherwise(self, _value: &Column) -> Column {
610 panic!(
613 "when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)"
614 );
615 }
616}
617
618pub struct ThenBuilder {
620 state: WhenThenState,
621}
622
623enum WhenThenState {
624 Single(Box<polars::prelude::Then>),
625 Chained(Box<polars::prelude::ChainedThen>),
626}
627
628pub struct ChainedWhenBuilder {
630 inner: polars::prelude::ChainedWhen,
631}
632
633impl ThenBuilder {
634 fn new(when_then: polars::prelude::Then) -> Self {
635 ThenBuilder {
636 state: WhenThenState::Single(Box::new(when_then)),
637 }
638 }
639
640 fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
641 ThenBuilder {
642 state: WhenThenState::Chained(Box::new(chained)),
643 }
644 }
645
646 pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
648 let chained_when = match self.state {
649 WhenThenState::Single(t) => t.when(condition.expr().clone()),
650 WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
651 };
652 ChainedWhenBuilder {
653 inner: chained_when,
654 }
655 }
656
657 pub fn otherwise(self, value: &Column) -> Column {
659 let expr = match self.state {
660 WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
661 WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
662 };
663 crate::column::Column::from_expr(expr, None)
664 }
665}
666
667impl ChainedWhenBuilder {
668 pub fn then(self, value: &Column) -> ThenBuilder {
670 ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
671 }
672}
673
674pub fn upper(column: &Column) -> Column {
676 column.clone().upper()
677}
678
679pub fn lower(column: &Column) -> Column {
681 column.clone().lower()
682}
683
684pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
686 column.clone().substr(start, length)
687}
688
689pub fn length(column: &Column) -> Column {
691 column.clone().length()
692}
693
694pub fn trim(column: &Column) -> Column {
696 column.clone().trim()
697}
698
699pub fn ltrim(column: &Column) -> Column {
701 column.clone().ltrim()
702}
703
704pub fn rtrim(column: &Column) -> Column {
706 column.clone().rtrim()
707}
708
709pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
711 column.clone().btrim(trim_str)
712}
713
714pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
716 column.clone().locate(substr, pos)
717}
718
719pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
721 column.clone().conv(from_base, to_base)
722}
723
724pub fn hex(column: &Column) -> Column {
726 column.clone().hex()
727}
728
729pub fn unhex(column: &Column) -> Column {
731 column.clone().unhex()
732}
733
734pub fn encode(column: &Column, charset: &str) -> Column {
736 column.clone().encode(charset)
737}
738
739pub fn decode(column: &Column, charset: &str) -> Column {
741 column.clone().decode(charset)
742}
743
744pub fn to_binary(column: &Column, fmt: &str) -> Column {
746 column.clone().to_binary(fmt)
747}
748
749pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
751 column.clone().try_to_binary(fmt)
752}
753
754pub fn aes_encrypt(column: &Column, key: &str) -> Column {
756 column.clone().aes_encrypt(key)
757}
758
759pub fn aes_decrypt(column: &Column, key: &str) -> Column {
761 column.clone().aes_decrypt(key)
762}
763
764pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
766 column.clone().try_aes_decrypt(key)
767}
768
769pub fn bin(column: &Column) -> Column {
771 column.clone().bin()
772}
773
774pub fn getbit(column: &Column, pos: i64) -> Column {
776 column.clone().getbit(pos)
777}
778
779pub fn bit_and(left: &Column, right: &Column) -> Column {
781 left.clone().bit_and(right)
782}
783
784pub fn bit_or(left: &Column, right: &Column) -> Column {
786 left.clone().bit_or(right)
787}
788
789pub fn bit_xor(left: &Column, right: &Column) -> Column {
791 left.clone().bit_xor(right)
792}
793
794pub fn bit_count(column: &Column) -> Column {
796 column.clone().bit_count()
797}
798
799pub fn bitwise_not(column: &Column) -> Column {
801 column.clone().bitwise_not()
802}
803
804pub fn bitmap_bit_position(column: &Column) -> Column {
808 use polars::prelude::DataType;
809 let expr = column.expr().clone().cast(DataType::Int32);
810 Column::from_expr(expr, None)
811}
812
813pub fn bitmap_bucket_number(column: &Column) -> Column {
815 use polars::prelude::DataType;
816 let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
817 Column::from_expr(expr, None)
818}
819
820pub fn bitmap_count(column: &Column) -> Column {
822 use polars::prelude::{DataType, Field};
823 let expr = column.expr().clone().map(
824 |s| crate::column::expect_col(crate::udfs::apply_bitmap_count(s)),
825 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
826 );
827 Column::from_expr(expr, None)
828}
829
830pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
833 use polars::prelude::{DataType, Field};
834 column.expr().clone().implode().map(
835 |s| crate::column::expect_col(crate::udfs::apply_bitmap_construct_agg(s)),
836 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
837 )
838}
839
840pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
842 use polars::prelude::{DataType, Field};
843 column.expr().clone().implode().map(
844 |s| crate::column::expect_col(crate::udfs::apply_bitmap_or_agg(s)),
845 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
846 )
847}
848
849pub fn bit_get(column: &Column, pos: i64) -> Column {
851 getbit(column, pos)
852}
853
854pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
857 column.clone().assert_true(err_msg)
858}
859
860pub fn raise_error(message: &str) -> Column {
862 let msg = message.to_string();
863 let expr = lit(0i64).map(
864 move |_col| -> PolarsResult<polars::prelude::Column> {
865 Err(PolarsError::ComputeError(msg.clone().into()))
866 },
867 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
868 );
869 Column::from_expr(expr, Some("raise_error".to_string()))
870}
871
872pub fn spark_partition_id() -> Column {
874 Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
875}
876
877pub fn input_file_name() -> Column {
879 Column::from_expr(lit(""), Some("input_file_name".to_string()))
880}
881
882pub fn monotonically_increasing_id() -> Column {
885 Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
886}
887
888pub fn current_catalog() -> Column {
890 Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
891}
892
893pub fn current_database() -> Column {
895 Column::from_expr(lit("default"), Some("current_database".to_string()))
896}
897
898pub fn current_schema() -> Column {
900 Column::from_expr(lit("default"), Some("current_schema".to_string()))
901}
902
903pub fn current_user() -> Column {
905 Column::from_expr(lit("unknown"), Some("current_user".to_string()))
906}
907
908pub fn user() -> Column {
910 Column::from_expr(lit("unknown"), Some("user".to_string()))
911}
912
913pub fn rand(seed: Option<u64>) -> Column {
916 Column::from_rand(seed)
917}
918
919pub fn randn(seed: Option<u64>) -> Column {
922 Column::from_randn(seed)
923}
924
925pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
928 use polars::prelude::Column as PlColumn;
929
930 let (registry, case_sensitive) =
931 crate::udf_context::get_thread_udf_context().ok_or_else(|| {
932 PolarsError::InvalidOperation(
933 "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
934 )
935 })?;
936
937 let udf = registry.get_rust_udf(name, case_sensitive).ok_or_else(|| {
939 PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
940 })?;
941
942 let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
943 let output_type = DataType::String; let expr = if exprs.len() == 1 {
946 let udf = udf.clone();
947 exprs.into_iter().next().unwrap().map(
948 move |c| {
949 let s = c.take_materialized_series();
950 udf.apply(&[s]).map(|out| PlColumn::new("_".into(), out))
951 },
952 move |_schema, field| Ok(Field::new(field.name().clone(), output_type.clone())),
953 )
954 } else {
955 let udf = udf.clone();
956 let first = exprs[0].clone();
957 let rest: Vec<Expr> = exprs[1..].to_vec();
958 first.map_many(
959 move |columns| {
960 let series: Vec<Series> = columns
961 .iter_mut()
962 .map(|c| std::mem::take(c).take_materialized_series())
963 .collect();
964 udf.apply(&series).map(|out| PlColumn::new("_".into(), out))
965 },
966 &rest,
967 move |_schema, fields| Ok(Field::new(fields[0].name().clone(), output_type.clone())),
968 )
969 };
970
971 Ok(Column::from_expr(expr, Some(format!("{name}()"))))
972}
973
974pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
976 left.clone().arrays_overlap(right)
977}
978
979pub fn arrays_zip(left: &Column, right: &Column) -> Column {
981 left.clone().arrays_zip(right)
982}
983
984pub fn explode_outer(column: &Column) -> Column {
986 column.clone().explode_outer()
987}
988
989pub fn posexplode_outer(column: &Column) -> (Column, Column) {
991 column.clone().posexplode_outer()
992}
993
994pub fn array_agg(column: &Column) -> Column {
996 column.clone().array_agg()
997}
998
999pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
1001 column.clone().transform_keys(key_expr)
1002}
1003
1004pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
1006 column.clone().transform_values(value_expr)
1007}
1008
1009pub fn str_to_map(
1011 column: &Column,
1012 pair_delim: Option<&str>,
1013 key_value_delim: Option<&str>,
1014) -> Column {
1015 let pd = pair_delim.unwrap_or(",");
1016 let kvd = key_value_delim.unwrap_or(":");
1017 column.clone().str_to_map(pd, kvd)
1018}
1019
1020pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
1022 column.clone().regexp_extract(pattern, group_index)
1023}
1024
1025pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
1027 column.clone().regexp_replace(pattern, replacement)
1028}
1029
1030pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
1032 column.clone().split(delimiter, limit)
1033}
1034
1035pub fn initcap(column: &Column) -> Column {
1037 column.clone().initcap()
1038}
1039
1040pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
1042 column.clone().regexp_extract_all(pattern)
1043}
1044
1045pub fn regexp_like(column: &Column, pattern: &str) -> Column {
1047 column.clone().regexp_like(pattern)
1048}
1049
1050pub fn regexp_count(column: &Column, pattern: &str) -> Column {
1052 column.clone().regexp_count(pattern)
1053}
1054
1055pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
1057 column.clone().regexp_substr(pattern)
1058}
1059
1060pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
1062 column.clone().split_part(delimiter, part_num)
1063}
1064
1065pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
1067 column.clone().regexp_instr(pattern, group_idx)
1068}
1069
1070pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
1072 str_column.clone().find_in_set(set_column)
1073}
1074
1075pub fn format_string(format: &str, columns: &[&Column]) -> Column {
1077 use polars::prelude::*;
1078 if columns.is_empty() {
1079 panic!("format_string needs at least one column");
1080 }
1081 let format_owned = format.to_string();
1082 let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
1083 let expr = columns[0].expr().clone().map_many(
1084 move |cols| {
1085 crate::column::expect_col(crate::udfs::apply_format_string(cols, &format_owned))
1086 },
1087 &args,
1088 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::String)),
1089 );
1090 crate::column::Column::from_expr(expr, None)
1091}
1092
1093pub fn printf(format: &str, columns: &[&Column]) -> Column {
1095 format_string(format, columns)
1096}
1097
1098pub fn repeat(column: &Column, n: i32) -> Column {
1100 column.clone().repeat(n)
1101}
1102
1103pub fn reverse(column: &Column) -> Column {
1105 column.clone().reverse()
1106}
1107
1108pub fn instr(column: &Column, substr: &str) -> Column {
1110 column.clone().instr(substr)
1111}
1112
1113pub fn position(substr: &str, column: &Column) -> Column {
1115 column.clone().instr(substr)
1116}
1117
1118pub fn ascii(column: &Column) -> Column {
1120 column.clone().ascii()
1121}
1122
1123pub fn format_number(column: &Column, decimals: u32) -> Column {
1125 column.clone().format_number(decimals)
1126}
1127
1128pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
1130 column.clone().overlay(replace, pos, length)
1131}
1132
1133pub fn char(column: &Column) -> Column {
1135 column.clone().char()
1136}
1137
1138pub fn chr(column: &Column) -> Column {
1140 column.clone().chr()
1141}
1142
1143pub fn base64(column: &Column) -> Column {
1145 column.clone().base64()
1146}
1147
1148pub fn unbase64(column: &Column) -> Column {
1150 column.clone().unbase64()
1151}
1152
1153pub fn sha1(column: &Column) -> Column {
1155 column.clone().sha1()
1156}
1157
1158pub fn sha2(column: &Column, bit_length: i32) -> Column {
1160 column.clone().sha2(bit_length)
1161}
1162
1163pub fn md5(column: &Column) -> Column {
1165 column.clone().md5()
1166}
1167
1168pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1170 column.clone().lpad(length, pad)
1171}
1172
1173pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1175 column.clone().rpad(length, pad)
1176}
1177
1178pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1180 column.clone().translate(from_str, to_str)
1181}
1182
1183pub fn mask(
1185 column: &Column,
1186 upper_char: Option<char>,
1187 lower_char: Option<char>,
1188 digit_char: Option<char>,
1189 other_char: Option<char>,
1190) -> Column {
1191 column
1192 .clone()
1193 .mask(upper_char, lower_char, digit_char, other_char)
1194}
1195
1196pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1198 column.clone().substring_index(delimiter, count)
1199}
1200
1201pub fn left(column: &Column, n: i64) -> Column {
1203 column.clone().left(n)
1204}
1205
1206pub fn right(column: &Column, n: i64) -> Column {
1208 column.clone().right(n)
1209}
1210
1211pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1213 column.clone().replace(search, replacement)
1214}
1215
1216pub fn startswith(column: &Column, prefix: &str) -> Column {
1218 column.clone().startswith(prefix)
1219}
1220
1221pub fn endswith(column: &Column, suffix: &str) -> Column {
1223 column.clone().endswith(suffix)
1224}
1225
1226pub fn contains(column: &Column, substring: &str) -> Column {
1228 column.clone().contains(substring)
1229}
1230
1231pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1234 column.clone().like(pattern, escape_char)
1235}
1236
1237pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1240 column.clone().ilike(pattern, escape_char)
1241}
1242
1243pub fn rlike(column: &Column, pattern: &str) -> Column {
1245 column.clone().regexp_like(pattern)
1246}
1247
1248pub fn regexp(column: &Column, pattern: &str) -> Column {
1250 rlike(column, pattern)
1251}
1252
1253pub fn soundex(column: &Column) -> Column {
1255 column.clone().soundex()
1256}
1257
1258pub fn levenshtein(column: &Column, other: &Column) -> Column {
1260 column.clone().levenshtein(other)
1261}
1262
1263pub fn crc32(column: &Column) -> Column {
1265 column.clone().crc32()
1266}
1267
1268pub fn xxhash64(column: &Column) -> Column {
1270 column.clone().xxhash64()
1271}
1272
1273pub fn abs(column: &Column) -> Column {
1275 column.clone().abs()
1276}
1277
1278pub fn ceil(column: &Column) -> Column {
1280 column.clone().ceil()
1281}
1282
1283pub fn floor(column: &Column) -> Column {
1285 column.clone().floor()
1286}
1287
1288pub fn round(column: &Column, decimals: u32) -> Column {
1290 column.clone().round(decimals)
1291}
1292
1293pub fn bround(column: &Column, scale: i32) -> Column {
1295 column.clone().bround(scale)
1296}
1297
1298pub fn negate(column: &Column) -> Column {
1300 column.clone().negate()
1301}
1302
1303pub fn negative(column: &Column) -> Column {
1305 negate(column)
1306}
1307
1308pub fn positive(column: &Column) -> Column {
1310 column.clone()
1311}
1312
1313pub fn cot(column: &Column) -> Column {
1315 column.clone().cot()
1316}
1317
1318pub fn csc(column: &Column) -> Column {
1320 column.clone().csc()
1321}
1322
1323pub fn sec(column: &Column) -> Column {
1325 column.clone().sec()
1326}
1327
1328pub fn e() -> Column {
1330 Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1331}
1332
1333pub fn pi() -> Column {
1335 Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1336}
1337
1338pub fn sqrt(column: &Column) -> Column {
1340 column.clone().sqrt()
1341}
1342
1343pub fn pow(column: &Column, exp: i64) -> Column {
1345 column.clone().pow(exp)
1346}
1347
1348pub fn exp(column: &Column) -> Column {
1350 column.clone().exp()
1351}
1352
1353pub fn log(column: &Column) -> Column {
1355 column.clone().log()
1356}
1357
1358pub fn log_with_base(column: &Column, base: f64) -> Column {
1360 crate::column::Column::from_expr(column.expr().clone().log(lit(base)), None)
1361}
1362
1363pub fn sin(column: &Column) -> Column {
1365 column.clone().sin()
1366}
1367
1368pub fn cos(column: &Column) -> Column {
1370 column.clone().cos()
1371}
1372
1373pub fn tan(column: &Column) -> Column {
1375 column.clone().tan()
1376}
1377
1378pub fn asin(column: &Column) -> Column {
1380 column.clone().asin()
1381}
1382
1383pub fn acos(column: &Column) -> Column {
1385 column.clone().acos()
1386}
1387
1388pub fn atan(column: &Column) -> Column {
1390 column.clone().atan()
1391}
1392
1393pub fn atan2(y: &Column, x: &Column) -> Column {
1395 y.clone().atan2(x)
1396}
1397
1398pub fn degrees(column: &Column) -> Column {
1400 column.clone().degrees()
1401}
1402
1403pub fn radians(column: &Column) -> Column {
1405 column.clone().radians()
1406}
1407
1408pub fn signum(column: &Column) -> Column {
1410 column.clone().signum()
1411}
1412
1413pub fn sign(column: &Column) -> Column {
1415 signum(column)
1416}
1417
1418pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1422 let dtype = parse_type_name(type_name)?;
1423 if dtype == DataType::Boolean {
1424 let expr = column.expr().clone().map(
1425 |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, true)),
1426 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1427 );
1428 return Ok(Column::from_expr(expr, None));
1429 }
1430 if dtype == DataType::Date {
1431 let expr = column.expr().clone().map(
1432 |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, true)),
1433 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1434 );
1435 return Ok(Column::from_expr(expr, None));
1436 }
1437 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1438 let target = dtype.clone();
1439 let expr = column.expr().clone().map(
1441 move |col| {
1442 crate::column::expect_col(crate::udfs::apply_string_to_int(
1443 col,
1444 true,
1445 target.clone(),
1446 ))
1447 },
1448 move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1449 );
1450 return Ok(Column::from_expr(expr, None));
1451 }
1452 if dtype == DataType::Float64 {
1453 let expr = column.expr().clone().map(
1455 |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, true)),
1456 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1457 );
1458 return Ok(Column::from_expr(expr, None));
1459 }
1460 Ok(Column::from_expr(
1461 column.expr().clone().strict_cast(dtype),
1462 None,
1463 ))
1464}
1465
1466pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1470 let dtype = parse_type_name(type_name)?;
1471 if dtype == DataType::Boolean {
1472 let expr = column.expr().clone().map(
1473 |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, false)),
1474 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1475 );
1476 return Ok(Column::from_expr(expr, None));
1477 }
1478 if dtype == DataType::Date {
1479 let expr = column.expr().clone().map(
1480 |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, false)),
1481 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1482 );
1483 return Ok(Column::from_expr(expr, None));
1484 }
1485 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1486 let target = dtype.clone();
1487 let expr = column.expr().clone().map(
1488 move |col| {
1489 crate::column::expect_col(crate::udfs::apply_string_to_int(
1490 col,
1491 false,
1492 target.clone(),
1493 ))
1494 },
1495 move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1496 );
1497 return Ok(Column::from_expr(expr, None));
1498 }
1499 if dtype == DataType::Float64 {
1500 let expr = column.expr().clone().map(
1501 |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, false)),
1502 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1503 );
1504 return Ok(Column::from_expr(expr, None));
1505 }
1506 Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1507}
1508
1509pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1513 match format {
1514 Some(fmt) => Ok(column
1515 .clone()
1516 .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1517 None => cast(column, "string"),
1518 }
1519}
1520
1521pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1523 to_char(column, format)
1524}
1525
1526pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1529 cast(column, "double")
1530}
1531
1532pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1535 try_cast(column, "double")
1536}
1537
1538pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1541 use polars::prelude::{DataType, Field, TimeUnit};
1542 let fmt_owned = format.map(|s| s.to_string());
1543 let expr = column.expr().clone().map(
1544 move |s| {
1545 crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1546 s,
1547 fmt_owned.as_deref(),
1548 true,
1549 ))
1550 },
1551 |_schema, field| {
1552 Ok(Field::new(
1553 field.name().clone(),
1554 DataType::Datetime(TimeUnit::Microseconds, None),
1555 ))
1556 },
1557 );
1558 Ok(crate::column::Column::from_expr(expr, None))
1559}
1560
1561pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1564 use polars::prelude::*;
1565 let fmt_owned = format.map(|s| s.to_string());
1566 let expr = column.expr().clone().map(
1567 move |s| {
1568 crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1569 s,
1570 fmt_owned.as_deref(),
1571 false,
1572 ))
1573 },
1574 |_schema, field| {
1575 Ok(Field::new(
1576 field.name().clone(),
1577 DataType::Datetime(TimeUnit::Microseconds, None),
1578 ))
1579 },
1580 );
1581 Ok(crate::column::Column::from_expr(expr, None))
1582}
1583
1584pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1586 use polars::prelude::{DataType, Field, TimeUnit};
1587 match format {
1588 None => crate::cast(column, "timestamp"),
1589 Some(fmt) => {
1590 let fmt_owned = fmt.to_string();
1591 let expr = column.expr().clone().map(
1592 move |s| {
1593 crate::column::expect_col(crate::udfs::apply_to_timestamp_ltz_format(
1594 s,
1595 Some(&fmt_owned),
1596 true,
1597 ))
1598 },
1599 |_schema, field| {
1600 Ok(Field::new(
1601 field.name().clone(),
1602 DataType::Datetime(TimeUnit::Microseconds, None),
1603 ))
1604 },
1605 );
1606 Ok(crate::column::Column::from_expr(expr, None))
1607 }
1608 }
1609}
1610
1611pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1613 use polars::prelude::{DataType, Field, TimeUnit};
1614 match format {
1615 None => crate::cast(column, "timestamp"),
1616 Some(fmt) => {
1617 let fmt_owned = fmt.to_string();
1618 let expr = column.expr().clone().map(
1619 move |s| {
1620 crate::column::expect_col(crate::udfs::apply_to_timestamp_ntz_format(
1621 s,
1622 Some(&fmt_owned),
1623 true,
1624 ))
1625 },
1626 |_schema, field| {
1627 Ok(Field::new(
1628 field.name().clone(),
1629 DataType::Datetime(TimeUnit::Microseconds, None),
1630 ))
1631 },
1632 );
1633 Ok(crate::column::Column::from_expr(expr, None))
1634 }
1635 }
1636}
1637
1638pub fn try_divide(left: &Column, right: &Column) -> Column {
1640 use polars::prelude::*;
1641 let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1642 let null_expr = lit(NULL);
1643 let div_expr =
1644 left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1645 let expr = polars::prelude::when(zero_cond)
1646 .then(null_expr)
1647 .otherwise(div_expr);
1648 crate::column::Column::from_expr(expr, None)
1649}
1650
1651pub fn try_add(left: &Column, right: &Column) -> Column {
1653 let args = [right.expr().clone()];
1654 let expr = left.expr().clone().map_many(
1655 |cols| crate::column::expect_col(crate::udfs::apply_try_add(cols)),
1656 &args,
1657 |_schema, fields| Ok(fields[0].clone()),
1658 );
1659 Column::from_expr(expr, None)
1660}
1661
1662pub fn try_subtract(left: &Column, right: &Column) -> Column {
1664 let args = [right.expr().clone()];
1665 let expr = left.expr().clone().map_many(
1666 |cols| crate::column::expect_col(crate::udfs::apply_try_subtract(cols)),
1667 &args,
1668 |_schema, fields| Ok(fields[0].clone()),
1669 );
1670 Column::from_expr(expr, None)
1671}
1672
1673pub fn try_multiply(left: &Column, right: &Column) -> Column {
1675 let args = [right.expr().clone()];
1676 let expr = left.expr().clone().map_many(
1677 |cols| crate::column::expect_col(crate::udfs::apply_try_multiply(cols)),
1678 &args,
1679 |_schema, fields| Ok(fields[0].clone()),
1680 );
1681 Column::from_expr(expr, None)
1682}
1683
1684pub fn try_element_at(column: &Column, index: i64) -> Column {
1686 column.clone().element_at(index)
1687}
1688
1689pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1691 if num_bucket <= 0 {
1692 panic!(
1693 "width_bucket: num_bucket must be positive, got {}",
1694 num_bucket
1695 );
1696 }
1697 use polars::prelude::*;
1698 let v = value.expr().clone().cast(DataType::Float64);
1699 let min_expr = lit(min_val);
1700 let max_expr = lit(max_val);
1701 let nb = num_bucket as f64;
1702 let width = (max_val - min_val) / nb;
1703 let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1704 let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1705 let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1706 let expr = polars::prelude::when(v.clone().lt(min_expr))
1707 .then(lit(0i64))
1708 .when(v.gt_eq(max_expr))
1709 .then(lit(num_bucket + 1))
1710 .otherwise(bucket_clamped);
1711 crate::column::Column::from_expr(expr, None)
1712}
1713
1714pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1716 use polars::prelude::*;
1717 if columns.is_empty() {
1718 panic!("elt requires at least one column");
1719 }
1720 let idx_expr = index.expr().clone();
1721 let null_expr = lit(NULL);
1722 let mut expr = null_expr;
1723 for (i, c) in columns.iter().enumerate().rev() {
1724 let n = (i + 1) as i64;
1725 expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1726 .then(c.expr().clone())
1727 .otherwise(expr);
1728 }
1729 crate::column::Column::from_expr(expr, None)
1730}
1731
1732pub fn bit_length(column: &Column) -> Column {
1734 column.clone().bit_length()
1735}
1736
1737pub fn octet_length(column: &Column) -> Column {
1739 column.clone().octet_length()
1740}
1741
1742pub fn char_length(column: &Column) -> Column {
1744 column.clone().char_length()
1745}
1746
1747pub fn character_length(column: &Column) -> Column {
1749 column.clone().character_length()
1750}
1751
1752pub fn typeof_(column: &Column) -> Column {
1754 column.clone().typeof_()
1755}
1756
1757pub fn isnan(column: &Column) -> Column {
1759 column.clone().is_nan()
1760}
1761
1762pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1764 if columns.is_empty() {
1765 return Err("greatest requires at least one column".to_string());
1766 }
1767 if columns.len() == 1 {
1768 return Ok((*columns[0]).clone());
1769 }
1770 let mut expr = columns[0].expr().clone();
1771 for c in columns.iter().skip(1) {
1772 let args = [c.expr().clone()];
1773 expr = expr.map_many(
1774 |cols| crate::column::expect_col(crate::udfs::apply_greatest2(cols)),
1775 &args,
1776 |_schema, fields| Ok(fields[0].clone()),
1777 );
1778 }
1779 Ok(Column::from_expr(expr, None))
1780}
1781
1782pub fn least(columns: &[&Column]) -> Result<Column, String> {
1784 if columns.is_empty() {
1785 return Err("least requires at least one column".to_string());
1786 }
1787 if columns.len() == 1 {
1788 return Ok((*columns[0]).clone());
1789 }
1790 let mut expr = columns[0].expr().clone();
1791 for c in columns.iter().skip(1) {
1792 let args = [c.expr().clone()];
1793 expr = expr.map_many(
1794 |cols| crate::column::expect_col(crate::udfs::apply_least2(cols)),
1795 &args,
1796 |_schema, fields| Ok(fields[0].clone()),
1797 );
1798 }
1799 Ok(Column::from_expr(expr, None))
1800}
1801
1802pub fn year(column: &Column) -> Column {
1804 column.clone().year()
1805}
1806
1807pub fn month(column: &Column) -> Column {
1809 column.clone().month()
1810}
1811
1812pub fn day(column: &Column) -> Column {
1814 column.clone().day()
1815}
1816
1817pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1819 let fmt = format.map(|s| s.to_string());
1820 let expr = column.expr().clone().map(
1821 move |col| {
1822 crate::column::expect_col(crate::udfs::apply_string_to_date_format(
1823 col,
1824 fmt.as_deref(),
1825 false,
1826 ))
1827 },
1828 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1829 );
1830 Ok(Column::from_expr(expr, None))
1831}
1832
1833pub fn date_format(column: &Column, format: &str) -> Column {
1835 column
1836 .clone()
1837 .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1838}
1839
1840pub fn current_date() -> Column {
1842 use polars::prelude::*;
1843 let today = chrono::Utc::now().date_naive();
1844 let days = (today - robin_sparkless_core::date_utils::epoch_naive_date()).num_days() as i32;
1845 crate::column::Column::from_expr(
1846 Expr::Literal(LiteralValue::Scalar(Scalar::new_date(days))),
1847 None,
1848 )
1849}
1850
1851pub fn current_timestamp() -> Column {
1853 use polars::prelude::*;
1854 let ts = chrono::Utc::now().timestamp_micros();
1855 crate::column::Column::from_expr(
1856 Expr::Literal(LiteralValue::Scalar(Scalar::new_datetime(
1857 ts,
1858 TimeUnit::Microseconds,
1859 None,
1860 ))),
1861 None,
1862 )
1863}
1864
1865pub fn curdate() -> Column {
1867 current_date()
1868}
1869
1870pub fn now() -> Column {
1872 current_timestamp()
1873}
1874
1875pub fn localtimestamp() -> Column {
1877 current_timestamp()
1878}
1879
1880pub fn date_diff(end: &Column, start: &Column) -> Column {
1882 datediff(end, start)
1883}
1884
1885pub fn dateadd(column: &Column, n: i32) -> Column {
1887 date_add(column, n)
1888}
1889
1890pub fn extract(column: &Column, field: &str) -> Column {
1892 column.clone().extract(field)
1893}
1894
1895pub fn date_part(column: &Column, field: &str) -> Column {
1897 extract(column, field)
1898}
1899
1900pub fn datepart(column: &Column, field: &str) -> Column {
1902 extract(column, field)
1903}
1904
1905pub fn unix_micros(column: &Column) -> Column {
1907 column.clone().unix_micros()
1908}
1909
1910pub fn unix_millis(column: &Column) -> Column {
1912 column.clone().unix_millis()
1913}
1914
1915pub fn unix_seconds(column: &Column) -> Column {
1917 column.clone().unix_seconds()
1918}
1919
1920pub fn dayname(column: &Column) -> Column {
1922 column.clone().dayname()
1923}
1924
1925pub fn weekday(column: &Column) -> Column {
1927 column.clone().weekday()
1928}
1929
1930pub fn hour(column: &Column) -> Column {
1932 column.clone().hour()
1933}
1934
1935pub fn minute(column: &Column) -> Column {
1937 column.clone().minute()
1938}
1939
1940pub fn second(column: &Column) -> Column {
1942 column.clone().second()
1943}
1944
1945pub fn date_add(column: &Column, n: i32) -> Column {
1947 column.clone().date_add(n)
1948}
1949
1950pub fn date_sub(column: &Column, n: i32) -> Column {
1952 column.clone().date_sub(n)
1953}
1954
1955pub fn datediff(end: &Column, start: &Column) -> Column {
1957 start.clone().datediff(end)
1958}
1959
1960pub fn last_day(column: &Column) -> Column {
1962 column.clone().last_day()
1963}
1964
1965pub fn trunc(column: &Column, format: &str) -> Column {
1967 column.clone().trunc(format)
1968}
1969
1970pub fn date_trunc(format: &str, column: &Column) -> Column {
1972 trunc(column, format)
1973}
1974
1975pub fn quarter(column: &Column) -> Column {
1977 column.clone().quarter()
1978}
1979
1980pub fn weekofyear(column: &Column) -> Column {
1982 column.clone().weekofyear()
1983}
1984
1985pub fn dayofweek(column: &Column) -> Column {
1987 column.clone().dayofweek()
1988}
1989
1990pub fn dayofyear(column: &Column) -> Column {
1992 column.clone().dayofyear()
1993}
1994
1995pub fn add_months(column: &Column, n: i32) -> Column {
1997 column.clone().add_months(n)
1998}
1999
2000pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
2003 end.clone().months_between(start, round_off)
2004}
2005
2006pub fn next_day(column: &Column, day_of_week: &str) -> Column {
2008 column.clone().next_day(day_of_week)
2009}
2010
2011pub fn unix_timestamp_now() -> Column {
2013 use polars::prelude::*;
2014 let secs = chrono::Utc::now().timestamp();
2015 crate::column::Column::from_expr(lit(secs), None)
2016}
2017
2018pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2020 column.clone().unix_timestamp(format)
2021}
2022
2023pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2025 unix_timestamp(column, format)
2026}
2027
2028pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
2030 column.clone().from_unixtime(format)
2031}
2032
2033pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
2035 use polars::prelude::*;
2036 let args = [month.expr().clone(), day.expr().clone()];
2037 let expr = year.expr().clone().map_many(
2038 |cols| crate::column::expect_col(crate::udfs::apply_make_date(cols)),
2039 &args,
2040 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Date)),
2041 );
2042 crate::column::Column::from_expr(expr, None)
2043}
2044
2045pub fn make_timestamp(
2048 year: &Column,
2049 month: &Column,
2050 day: &Column,
2051 hour: &Column,
2052 minute: &Column,
2053 sec: &Column,
2054 timezone: Option<&str>,
2055) -> Column {
2056 use polars::prelude::*;
2057 let tz_owned = timezone.map(|s| s.to_string());
2058 let args = [
2059 month.expr().clone(),
2060 day.expr().clone(),
2061 hour.expr().clone(),
2062 minute.expr().clone(),
2063 sec.expr().clone(),
2064 ];
2065 let expr = year.expr().clone().map_many(
2066 move |cols| {
2067 crate::column::expect_col(crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()))
2068 },
2069 &args,
2070 |_schema, fields| {
2071 Ok(Field::new(
2072 fields[0].name().clone(),
2073 DataType::Datetime(TimeUnit::Microseconds, None),
2074 ))
2075 },
2076 );
2077 crate::column::Column::from_expr(expr, None)
2078}
2079
2080pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
2082 ts.clone().timestampadd(unit, amount)
2083}
2084
2085pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
2087 start.clone().timestampdiff(unit, end)
2088}
2089
2090pub fn days(n: i64) -> Column {
2092 make_interval(0, 0, 0, n, 0, 0, 0)
2093}
2094
2095pub fn hours(n: i64) -> Column {
2097 make_interval(0, 0, 0, 0, n, 0, 0)
2098}
2099
2100pub fn minutes(n: i64) -> Column {
2102 make_interval(0, 0, 0, 0, 0, n, 0)
2103}
2104
2105pub fn months(n: i64) -> Column {
2107 make_interval(0, n, 0, 0, 0, 0, 0)
2108}
2109
2110pub fn years(n: i64) -> Column {
2112 make_interval(n, 0, 0, 0, 0, 0, 0)
2113}
2114
2115pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
2117 column.clone().from_utc_timestamp(tz)
2118}
2119
2120pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
2122 column.clone().to_utc_timestamp(tz)
2123}
2124
2125pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
2127 let source_tz = source_tz.to_string();
2128 let target_tz = target_tz.to_string();
2129 let expr = column.expr().clone().map(
2130 move |s| {
2131 crate::column::expect_col(crate::udfs::apply_convert_timezone(
2132 s, &source_tz, &target_tz,
2133 ))
2134 },
2135 |_schema, field| Ok(field.clone()),
2136 );
2137 crate::column::Column::from_expr(expr, None)
2138}
2139
2140pub fn current_timezone() -> Column {
2142 use polars::prelude::*;
2143 crate::column::Column::from_expr(lit("UTC"), None)
2144}
2145
2146pub fn make_interval(
2148 years: i64,
2149 months: i64,
2150 weeks: i64,
2151 days: i64,
2152 hours: i64,
2153 mins: i64,
2154 secs: i64,
2155) -> Column {
2156 use polars::prelude::*;
2157 let total_days = years * 365 + months * 30 + weeks * 7 + days;
2159 let args = DurationArgs::new()
2160 .with_days(lit(total_days))
2161 .with_hours(lit(hours))
2162 .with_minutes(lit(mins))
2163 .with_seconds(lit(secs));
2164 let dur = duration(args);
2165 crate::column::Column::from_expr(dur, None)
2166}
2167
2168pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2170 use polars::prelude::*;
2171 let args = DurationArgs::new()
2172 .with_days(lit(days))
2173 .with_hours(lit(hours))
2174 .with_minutes(lit(minutes))
2175 .with_seconds(lit(seconds));
2176 let dur = duration(args);
2177 crate::column::Column::from_expr(dur, None)
2178}
2179
2180pub fn make_ym_interval(years: i32, months: i32) -> Column {
2182 use polars::prelude::*;
2183 let total_months = years * 12 + months;
2184 crate::column::Column::from_expr(lit(total_months), None)
2185}
2186
2187pub fn make_timestamp_ntz(
2189 year: &Column,
2190 month: &Column,
2191 day: &Column,
2192 hour: &Column,
2193 minute: &Column,
2194 sec: &Column,
2195) -> Column {
2196 make_timestamp(year, month, day, hour, minute, sec, None)
2197}
2198
2199pub fn timestamp_seconds(column: &Column) -> Column {
2201 column.clone().timestamp_seconds()
2202}
2203
2204pub fn timestamp_millis(column: &Column) -> Column {
2206 column.clone().timestamp_millis()
2207}
2208
2209pub fn timestamp_micros(column: &Column) -> Column {
2211 column.clone().timestamp_micros()
2212}
2213
2214pub fn unix_date(column: &Column) -> Column {
2216 column.clone().unix_date()
2217}
2218
2219pub fn date_from_unix_date(column: &Column) -> Column {
2221 column.clone().date_from_unix_date()
2222}
2223
2224pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2226 dividend.clone().pmod(divisor)
2227}
2228
2229pub fn factorial(column: &Column) -> Column {
2231 column.clone().factorial()
2232}
2233
2234pub fn concat(columns: &[&Column]) -> Column {
2236 use polars::prelude::*;
2237 if columns.is_empty() {
2238 panic!("concat requires at least one column");
2239 }
2240 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2241 crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2242}
2243
2244pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2246 use polars::prelude::*;
2247 if columns.is_empty() {
2248 panic!("concat_ws requires at least one column");
2249 }
2250 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2251 crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2252}
2253
2254pub fn row_number(column: &Column) -> Column {
2264 column.clone().row_number(false)
2265}
2266
2267pub fn rank(column: &Column, descending: bool) -> Column {
2269 column.clone().rank(descending)
2270}
2271
2272pub fn dense_rank(column: &Column, descending: bool) -> Column {
2274 column.clone().dense_rank(descending)
2275}
2276
2277pub fn lag(column: &Column, n: i64) -> Column {
2279 column.clone().lag(n)
2280}
2281
2282pub fn lead(column: &Column, n: i64) -> Column {
2284 column.clone().lead(n)
2285}
2286
2287pub fn first_value(column: &Column) -> Column {
2289 column.clone().first_value()
2290}
2291
2292pub fn last_value(column: &Column) -> Column {
2294 column.clone().last_value()
2295}
2296
2297pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2299 column.clone().percent_rank(partition_by, descending)
2300}
2301
2302pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2304 column.clone().cume_dist(partition_by, descending)
2305}
2306
2307pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2309 column.clone().ntile(n, partition_by, descending)
2310}
2311
2312pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2314 column.clone().nth_value(n, partition_by, descending)
2315}
2316
2317pub fn coalesce(columns: &[&Column]) -> Column {
2327 use polars::prelude::*;
2328 if columns.is_empty() {
2329 panic!("coalesce requires at least one column");
2330 }
2331 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2332 let expr = coalesce(&exprs);
2333 crate::column::Column::from_expr(expr, None)
2334}
2335
2336pub fn nvl(column: &Column, value: &Column) -> Column {
2338 coalesce(&[column, value])
2339}
2340
2341pub fn ifnull(column: &Column, value: &Column) -> Column {
2343 nvl(column, value)
2344}
2345
2346pub fn nullif(column: &Column, value: &Column) -> Column {
2348 use polars::prelude::*;
2349 let cond = column.expr().clone().eq(value.expr().clone());
2350 let null_lit = lit(NULL);
2351 let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2352 crate::column::Column::from_expr(expr, None)
2353}
2354
2355pub fn nanvl(column: &Column, value: &Column) -> Column {
2357 use polars::prelude::*;
2358 let cond = column.expr().clone().is_nan();
2359 let expr = when(cond)
2360 .then(value.expr().clone())
2361 .otherwise(column.expr().clone());
2362 crate::column::Column::from_expr(expr, None)
2363}
2364
2365pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2367 use polars::prelude::*;
2368 let cond = col1.expr().clone().is_not_null();
2369 let expr = when(cond)
2370 .then(col2.expr().clone())
2371 .otherwise(col3.expr().clone());
2372 crate::column::Column::from_expr(expr, None)
2373}
2374
2375pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2377 substring(column, start, length)
2378}
2379
2380pub fn power(column: &Column, exp: i64) -> Column {
2382 pow(column, exp)
2383}
2384
2385pub fn ln(column: &Column) -> Column {
2387 log(column)
2388}
2389
2390pub fn ceiling(column: &Column) -> Column {
2392 ceil(column)
2393}
2394
2395pub fn lcase(column: &Column) -> Column {
2397 lower(column)
2398}
2399
2400pub fn ucase(column: &Column) -> Column {
2402 upper(column)
2403}
2404
2405pub fn dayofmonth(column: &Column) -> Column {
2407 day(column)
2408}
2409
2410pub fn to_degrees(column: &Column) -> Column {
2412 degrees(column)
2413}
2414
2415pub fn to_radians(column: &Column) -> Column {
2417 radians(column)
2418}
2419
2420pub fn cosh(column: &Column) -> Column {
2422 column.clone().cosh()
2423}
2424pub fn sinh(column: &Column) -> Column {
2426 column.clone().sinh()
2427}
2428pub fn tanh(column: &Column) -> Column {
2430 column.clone().tanh()
2431}
2432pub fn acosh(column: &Column) -> Column {
2434 column.clone().acosh()
2435}
2436pub fn asinh(column: &Column) -> Column {
2438 column.clone().asinh()
2439}
2440pub fn atanh(column: &Column) -> Column {
2442 column.clone().atanh()
2443}
2444pub fn cbrt(column: &Column) -> Column {
2446 column.clone().cbrt()
2447}
2448pub fn expm1(column: &Column) -> Column {
2450 column.clone().expm1()
2451}
2452pub fn log1p(column: &Column) -> Column {
2454 column.clone().log1p()
2455}
2456pub fn log10(column: &Column) -> Column {
2458 column.clone().log10()
2459}
2460pub fn log2(column: &Column) -> Column {
2462 column.clone().log2()
2463}
2464pub fn rint(column: &Column) -> Column {
2466 column.clone().rint()
2467}
2468pub fn hypot(x: &Column, y: &Column) -> Column {
2470 let xx = x.expr().clone() * x.expr().clone();
2471 let yy = y.expr().clone() * y.expr().clone();
2472 crate::column::Column::from_expr((xx + yy).sqrt(), None)
2473}
2474
2475pub fn isnull(column: &Column) -> Column {
2477 column.clone().is_null()
2478}
2479
2480pub fn isnotnull(column: &Column) -> Column {
2482 column.clone().is_not_null()
2483}
2484
2485pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2488 use polars::prelude::*;
2489 if columns.is_empty() {
2490 let empty_inner = Series::new("".into(), Vec::<i64>::new());
2493 let list_series = ListChunked::from_iter([Some(empty_inner)])
2494 .with_name("array".into())
2495 .into_series();
2496 let expr = lit(list_series).first();
2497 return Ok(crate::column::Column::from_expr(expr, None));
2498 }
2499 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2500 let expr = concat_list(exprs)
2501 .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2502 Ok(crate::column::Column::from_expr(expr, None))
2503}
2504
2505pub fn array_size(column: &Column) -> Column {
2507 column.clone().array_size()
2508}
2509
2510pub fn size(column: &Column) -> Column {
2512 column.clone().array_size()
2513}
2514
2515pub fn cardinality(column: &Column) -> Column {
2517 column.clone().cardinality()
2518}
2519
2520pub fn array_contains(column: &Column, value: &Column) -> Column {
2522 column.clone().array_contains(value.expr().clone())
2523}
2524
2525pub fn array_join(column: &Column, separator: &str) -> Column {
2527 column.clone().array_join(separator)
2528}
2529
2530pub fn array_max(column: &Column) -> Column {
2532 column.clone().array_max()
2533}
2534
2535pub fn array_min(column: &Column) -> Column {
2537 column.clone().array_min()
2538}
2539
2540pub fn element_at(column: &Column, index: i64) -> Column {
2542 column.clone().element_at(index)
2543}
2544
2545pub fn array_sort(column: &Column) -> Column {
2547 column.clone().array_sort()
2548}
2549
2550pub fn array_distinct(column: &Column) -> Column {
2552 column.clone().array_distinct()
2553}
2554
2555pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2557 column.clone().array_slice(start, length)
2558}
2559
2560pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2563 use polars::prelude::{DataType, Field, as_struct, lit};
2564 let step_expr = step
2565 .map(|c| c.expr().clone().alias("2"))
2566 .unwrap_or_else(|| lit(1i64).alias("2"));
2567 let struct_expr = as_struct(vec![
2568 start.expr().clone().alias("0"),
2569 stop.expr().clone().alias("1"),
2570 step_expr,
2571 ]);
2572 let out_dtype = DataType::List(Box::new(DataType::Int64));
2573 let expr = struct_expr.map(
2574 |s| crate::column::expect_col(crate::udfs::apply_sequence(s)),
2575 move |_schema, field| Ok(Field::new(field.name().clone(), out_dtype.clone())),
2576 );
2577 crate::column::Column::from_expr(expr, None)
2578}
2579
2580pub fn shuffle(column: &Column) -> Column {
2582 let expr = column.expr().clone().map(
2583 |s| crate::column::expect_col(crate::udfs::apply_shuffle(s)),
2584 |_schema, field| Ok(field.clone()),
2585 );
2586 crate::column::Column::from_expr(expr, None)
2587}
2588
2589pub fn inline(column: &Column) -> Column {
2592 column.clone().explode()
2593}
2594
2595pub fn inline_outer(column: &Column) -> Column {
2597 column.clone().explode_outer()
2598}
2599
2600pub fn explode(column: &Column) -> Column {
2602 column.clone().explode()
2603}
2604
2605pub fn array_position(column: &Column, value: &Column) -> Column {
2608 column.clone().array_position(value.expr().clone())
2609}
2610
2611pub fn array_compact(column: &Column) -> Column {
2613 column.clone().array_compact()
2614}
2615
2616pub fn array_remove(column: &Column, value: &Column) -> Column {
2619 column.clone().array_remove(value.expr().clone())
2620}
2621
2622pub fn array_repeat(column: &Column, n: i64) -> Column {
2624 column.clone().array_repeat(n)
2625}
2626
2627pub fn array_flatten(column: &Column) -> Column {
2629 column.clone().array_flatten()
2630}
2631
2632pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2634 column.clone().array_exists(predicate)
2635}
2636
2637pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2639 column.clone().array_forall(predicate)
2640}
2641
2642pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2644 column.clone().array_filter(predicate)
2645}
2646
2647pub fn array_transform(column: &Column, f: Expr) -> Column {
2649 column.clone().array_transform(f)
2650}
2651
2652pub fn array_sum(column: &Column) -> Column {
2654 column.clone().array_sum()
2655}
2656
2657pub fn aggregate(column: &Column, zero: &Column) -> Column {
2659 column.clone().array_aggregate(zero)
2660}
2661
2662pub fn array_mean(column: &Column) -> Column {
2664 column.clone().array_mean()
2665}
2666
2667pub fn posexplode(column: &Column) -> (Column, Column) {
2670 column.clone().posexplode()
2671}
2672
2673pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2677 use polars::chunked_array::StructChunked;
2678 use polars::prelude::{IntoSeries, ListChunked, as_struct, concat_list, lit};
2679 if key_values.is_empty() {
2680 let key_s = Series::new("key".into(), Vec::<String>::new());
2682 let value_s = Series::new("value".into(), Vec::<String>::new());
2683 let fields: [&Series; 2] = [&key_s, &value_s];
2684 let empty_struct = StructChunked::from_series(
2685 polars::prelude::PlSmallStr::EMPTY,
2686 0,
2687 fields.iter().copied(),
2688 )
2689 .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2690 .into_series();
2691 let list_series = ListChunked::from_iter([Some(empty_struct)])
2692 .with_name("create_map".into())
2693 .into_series();
2694 let expr = lit(list_series).first();
2695 return Ok(crate::column::Column::from_expr(expr, None));
2696 }
2697 let mut struct_exprs: Vec<Expr> = Vec::new();
2698 for i in (0..key_values.len()).step_by(2) {
2699 if i + 1 < key_values.len() {
2700 let k = key_values[i].expr().clone().alias("key");
2701 let v = key_values[i + 1].expr().clone().alias("value");
2702 struct_exprs.push(as_struct(vec![k, v]));
2703 }
2704 }
2705 let expr = concat_list(struct_exprs)
2706 .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2707 Ok(crate::column::Column::from_expr(expr, None))
2708}
2709
2710pub fn map_keys(column: &Column) -> Column {
2712 column.clone().map_keys()
2713}
2714
2715pub fn map_values(column: &Column) -> Column {
2717 column.clone().map_values()
2718}
2719
2720pub fn map_entries(column: &Column) -> Column {
2722 column.clone().map_entries()
2723}
2724
2725pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2727 keys.clone().map_from_arrays(values)
2728}
2729
2730pub fn map_concat(a: &Column, b: &Column) -> Column {
2732 a.clone().map_concat(b)
2733}
2734
2735pub fn map_from_entries(column: &Column) -> Column {
2737 column.clone().map_from_entries()
2738}
2739
2740pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2742 map_col.clone().map_contains_key(key)
2743}
2744
2745pub fn get(map_col: &Column, key: &Column) -> Column {
2747 map_col.clone().get(key)
2748}
2749
2750pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2752 map_col.clone().map_filter(predicate)
2753}
2754
2755pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2757 map1.clone().map_zip_with(map2, merge)
2758}
2759
2760pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2762 use polars::prelude::col;
2763 let left_field = col("").struct_().field_by_name("left");
2764 let right_field = col("").struct_().field_by_name("right");
2765 let merge = crate::column::Column::from_expr(
2766 coalesce(&[
2767 &crate::column::Column::from_expr(left_field, None),
2768 &crate::column::Column::from_expr(right_field, None),
2769 ])
2770 .into_expr(),
2771 None,
2772 );
2773 left.clone().zip_with(right, merge.into_expr())
2774}
2775
2776pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2778 use polars::prelude::col;
2779 let v1 = col("").struct_().field_by_name("value1");
2780 let v2 = col("").struct_().field_by_name("value2");
2781 let merge = coalesce(&[
2782 &crate::column::Column::from_expr(v1, None),
2783 &crate::column::Column::from_expr(v2, None),
2784 ])
2785 .into_expr();
2786 map1.clone().map_zip_with(map2, merge)
2787}
2788
2789pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2791 use polars::prelude::{col, lit};
2792 let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2793 map_col.clone().map_filter(pred)
2794}
2795
2796pub fn struct_(columns: &[&Column]) -> Column {
2798 use polars::prelude::as_struct;
2799 if columns.is_empty() {
2800 panic!("struct requires at least one column");
2801 }
2802 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2803 crate::column::Column::from_expr(as_struct(exprs), None)
2804}
2805
2806pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2808 use polars::prelude::as_struct;
2809 if pairs.is_empty() {
2810 panic!("named_struct requires at least one (name, column) pair");
2811 }
2812 let exprs: Vec<Expr> = pairs
2813 .iter()
2814 .map(|(name, col)| col.expr().clone().alias(*name))
2815 .collect();
2816 crate::column::Column::from_expr(as_struct(exprs), None)
2817}
2818
2819pub fn array_append(array: &Column, elem: &Column) -> Column {
2821 array.clone().array_append(elem)
2822}
2823
2824pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2826 array.clone().array_prepend(elem)
2827}
2828
2829pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2831 array.clone().array_insert(pos, elem)
2832}
2833
2834pub fn array_except(a: &Column, b: &Column) -> Column {
2836 a.clone().array_except(b)
2837}
2838
2839pub fn array_intersect(a: &Column, b: &Column) -> Column {
2841 a.clone().array_intersect(b)
2842}
2843
2844pub fn array_union(a: &Column, b: &Column) -> Column {
2846 a.clone().array_union(b)
2847}
2848
2849pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2851 left.clone().zip_with(right, merge)
2852}
2853
2854pub fn get_json_object(column: &Column, path: &str) -> Column {
2856 column.clone().get_json_object(path)
2857}
2858
2859pub fn json_object_keys(column: &Column) -> Column {
2861 column.clone().json_object_keys()
2862}
2863
2864pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2866 column.clone().json_tuple(keys)
2867}
2868
2869pub fn from_csv(column: &Column) -> Column {
2871 column.clone().from_csv()
2872}
2873
2874pub fn to_csv(column: &Column) -> Column {
2876 column.clone().to_csv()
2877}
2878
2879pub fn schema_of_csv(_column: &Column) -> Column {
2881 Column::from_expr(
2882 lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2883 Some("schema_of_csv".to_string()),
2884 )
2885}
2886
2887pub fn schema_of_json(_column: &Column) -> Column {
2889 Column::from_expr(
2890 lit("STRUCT<>".to_string()),
2891 Some("schema_of_json".to_string()),
2892 )
2893}
2894
2895pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2897 column.clone().from_json(schema)
2898}
2899
2900pub fn to_json(column: &Column) -> Column {
2902 column.clone().to_json()
2903}
2904
2905pub fn isin(column: &Column, other: &Column) -> Column {
2907 column.clone().isin(other)
2908}
2909
2910pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2912 let s = Series::from_iter(values.iter().cloned());
2913 Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2914}
2915
2916pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2918 let s: Series = Series::from_iter(values.iter().copied());
2919 Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2920}
2921
2922pub fn url_decode(column: &Column) -> Column {
2924 column.clone().url_decode()
2925}
2926
2927pub fn url_encode(column: &Column) -> Column {
2929 column.clone().url_encode()
2930}
2931
2932pub fn shift_left(column: &Column, n: i32) -> Column {
2934 column.clone().shift_left(n)
2935}
2936
2937pub fn shift_right(column: &Column, n: i32) -> Column {
2939 column.clone().shift_right(n)
2940}
2941
2942pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2944 column.clone().shift_right_unsigned(n)
2945}
2946
2947pub fn version() -> Column {
2949 Column::from_expr(
2950 lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2951 None,
2952 )
2953}
2954
2955pub fn equal_null(left: &Column, right: &Column) -> Column {
2957 left.clone().eq_null_safe(right)
2958}
2959
2960pub fn json_array_length(column: &Column, path: &str) -> Column {
2962 column.clone().json_array_length(path)
2963}
2964
2965pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2968 column.clone().parse_url(part, key)
2969}
2970
2971pub fn hash(columns: &[&Column]) -> Column {
2973 use polars::prelude::*;
2974 if columns.is_empty() {
2975 return crate::column::Column::from_expr(lit(0i64), None);
2976 }
2977 if columns.len() == 1 {
2978 return columns[0].clone().hash();
2979 }
2980 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2981 let struct_expr = polars::prelude::as_struct(exprs);
2982 let name = columns[0].name().to_string();
2983 let expr = struct_expr.map(
2984 |s| crate::column::expect_col(crate::udfs::apply_hash_struct(s)),
2985 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2986 );
2987 crate::column::Column::from_expr(expr, Some(name))
2988}
2989
2990pub fn stack(columns: &[&Column]) -> Column {
2992 struct_(columns)
2993}
2994
2995#[cfg(test)]
2996mod tests {
2997 use super::*;
2998 use polars::prelude::{IntoLazy, df};
2999
3000 #[test]
3001 fn test_col_creates_column() {
3002 let column = col("test");
3003 assert_eq!(column.name(), "test");
3004 }
3005
3006 #[test]
3007 fn test_lit_i32() {
3008 let column = lit_i32(42);
3009 assert_eq!(column.name(), "<expr>");
3011 }
3012
3013 #[test]
3014 fn test_lit_i64() {
3015 let column = lit_i64(123456789012345i64);
3016 assert_eq!(column.name(), "<expr>");
3017 }
3018
3019 #[test]
3020 fn test_lit_f64() {
3021 let column = lit_f64(std::f64::consts::PI);
3022 assert_eq!(column.name(), "<expr>");
3023 }
3024
3025 #[test]
3026 fn test_lit_bool() {
3027 let column = lit_bool(true);
3028 assert_eq!(column.name(), "<expr>");
3029 }
3030
3031 #[test]
3032 fn test_lit_str() {
3033 let column = lit_str("hello");
3034 assert_eq!(column.name(), "<expr>");
3035 }
3036
3037 #[test]
3038 fn test_create_map_empty() {
3039 let empty_col = create_map(&[]).unwrap();
3041 let df = df!("id" => &[1i64, 2i64]).unwrap();
3042 let out = df
3043 .lazy()
3044 .with_columns([empty_col.into_expr().alias("m")])
3045 .collect()
3046 .unwrap();
3047 assert_eq!(out.height(), 2);
3048 let m = out.column("m").unwrap();
3049 assert_eq!(m.len(), 2);
3050 let list = m.list().unwrap();
3051 for i in 0..2 {
3052 let row = list.get(i).unwrap();
3053 assert_eq!(row.len(), 0);
3054 }
3055 }
3056
3057 #[test]
3058 fn test_count_aggregation() {
3059 let column = col("value");
3060 let result = count(&column);
3061 assert_eq!(result.name(), "count");
3062 }
3063
3064 #[test]
3065 fn test_sum_aggregation() {
3066 let column = col("value");
3067 let result = sum(&column);
3068 assert_eq!(result.name(), "sum");
3069 }
3070
3071 #[test]
3072 fn test_avg_aggregation() {
3073 let column = col("value");
3074 let result = avg(&column);
3075 assert_eq!(result.name(), "avg");
3076 }
3077
3078 #[test]
3079 fn test_max_aggregation() {
3080 let column = col("value");
3081 let result = max(&column);
3082 assert_eq!(result.name(), "max");
3083 }
3084
3085 #[test]
3086 fn test_min_aggregation() {
3087 let column = col("value");
3088 let result = min(&column);
3089 assert_eq!(result.name(), "min");
3090 }
3091
3092 #[test]
3093 fn test_when_then_otherwise() {
3094 let df = df!(
3096 "age" => &[15, 25, 35]
3097 )
3098 .unwrap();
3099
3100 let age_col = col("age");
3102 let condition = age_col.gt(polars::prelude::lit(18));
3103 let result = when(&condition)
3104 .then(&lit_str("adult"))
3105 .otherwise(&lit_str("minor"));
3106
3107 let result_df = df
3109 .lazy()
3110 .with_column(result.into_expr().alias("status"))
3111 .collect()
3112 .unwrap();
3113
3114 let status_col = result_df.column("status").unwrap();
3116 let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
3117
3118 assert_eq!(values[0], Some("minor")); assert_eq!(values[1], Some("adult")); assert_eq!(values[2], Some("adult")); }
3122
3123 #[test]
3124 fn test_coalesce_returns_first_non_null() {
3125 let df = df!(
3127 "a" => &[Some(1), None, None],
3128 "b" => &[None, Some(2), None],
3129 "c" => &[None, None, Some(3)]
3130 )
3131 .unwrap();
3132
3133 let col_a = col("a");
3134 let col_b = col("b");
3135 let col_c = col("c");
3136 let result = coalesce(&[&col_a, &col_b, &col_c]);
3137
3138 let result_df = df
3140 .lazy()
3141 .with_column(result.into_expr().alias("coalesced"))
3142 .collect()
3143 .unwrap();
3144
3145 let coalesced_col = result_df.column("coalesced").unwrap();
3147 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3148
3149 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(2)); assert_eq!(values[2], Some(3)); }
3153
3154 #[test]
3155 fn test_coalesce_with_literal_fallback() {
3156 let df = df!(
3158 "a" => &[Some(1), None],
3159 "b" => &[None::<i32>, None::<i32>]
3160 )
3161 .unwrap();
3162
3163 let col_a = col("a");
3164 let col_b = col("b");
3165 let fallback = lit_i32(0);
3166 let result = coalesce(&[&col_a, &col_b, &fallback]);
3167
3168 let result_df = df
3170 .lazy()
3171 .with_column(result.into_expr().alias("coalesced"))
3172 .collect()
3173 .unwrap();
3174
3175 let coalesced_col = result_df.column("coalesced").unwrap();
3177 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3178
3179 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(0)); }
3182
3183 #[test]
3184 #[should_panic(expected = "coalesce requires at least one column")]
3185 fn test_coalesce_empty_panics() {
3186 let columns: [&Column; 0] = [];
3187 let _ = coalesce(&columns);
3188 }
3189
3190 #[test]
3191 fn test_cast_double_string_column_strict_ok() {
3192 let df = df!(
3194 "s" => &["123", " 45.5 ", "0"]
3195 )
3196 .unwrap();
3197
3198 let s_col = col("s");
3199 let cast_col = cast(&s_col, "double").unwrap();
3200
3201 let out = df
3202 .lazy()
3203 .with_column(cast_col.into_expr().alias("v"))
3204 .collect()
3205 .unwrap();
3206
3207 let v = out.column("v").unwrap();
3208 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3209 assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3210 }
3211
3212 #[test]
3213 fn test_try_cast_double_string_column_invalid_to_null() {
3214 let df = df!(
3216 "s" => &["123", " 45.5 ", "abc", ""]
3217 )
3218 .unwrap();
3219
3220 let s_col = col("s");
3221 let try_cast_col = try_cast(&s_col, "double").unwrap();
3222
3223 let out = df
3224 .lazy()
3225 .with_column(try_cast_col.into_expr().alias("v"))
3226 .collect()
3227 .unwrap();
3228
3229 let v = out.column("v").unwrap();
3230 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3231 assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3232 }
3233
3234 #[test]
3235 fn test_to_number_and_try_to_number_numerics_and_strings() {
3236 let df = df!(
3238 "i" => &[1i32, 2, 3],
3239 "f" => &[1.5f64, 2.5, 3.5],
3240 "s" => &["10", "20.5", "xyz"]
3241 )
3242 .unwrap();
3243
3244 let i_col = col("i");
3245 let f_col = col("f");
3246 let s_col = col("s");
3247
3248 let to_number_i = to_number(&i_col, None).unwrap();
3249 let to_number_f = to_number(&f_col, None).unwrap();
3250 let try_to_number_s = try_to_number(&s_col, None).unwrap();
3251
3252 let out = df
3253 .lazy()
3254 .with_columns([
3255 to_number_i.into_expr().alias("i_num"),
3256 to_number_f.into_expr().alias("f_num"),
3257 try_to_number_s.into_expr().alias("s_num"),
3258 ])
3259 .collect()
3260 .unwrap();
3261
3262 let i_num = out.column("i_num").unwrap();
3263 let f_num = out.column("f_num").unwrap();
3264 let s_num = out.column("s_num").unwrap();
3265
3266 let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3267 let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3268 let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3269
3270 assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3271 assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3272 assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3273 }
3274}