1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5#[derive(Debug, Clone)]
10pub struct SortOrder {
11 pub(crate) expr: Expr,
12 pub(crate) descending: bool,
13 pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17 pub fn expr(&self) -> &Expr {
18 &self.expr
19 }
20}
21
22pub fn asc(column: &Column) -> SortOrder {
24 SortOrder {
25 expr: column.expr().clone(),
26 descending: false,
27 nulls_last: false,
28 }
29}
30
31pub fn asc_nulls_first(column: &Column) -> SortOrder {
33 SortOrder {
34 expr: column.expr().clone(),
35 descending: false,
36 nulls_last: false,
37 }
38}
39
40pub fn asc_nulls_last(column: &Column) -> SortOrder {
42 SortOrder {
43 expr: column.expr().clone(),
44 descending: false,
45 nulls_last: true,
46 }
47}
48
49pub fn desc(column: &Column) -> SortOrder {
51 SortOrder {
52 expr: column.expr().clone(),
53 descending: true,
54 nulls_last: true,
55 }
56}
57
58pub fn desc_nulls_first(column: &Column) -> SortOrder {
60 SortOrder {
61 expr: column.expr().clone(),
62 descending: true,
63 nulls_last: false,
64 }
65}
66
67pub fn desc_nulls_last(column: &Column) -> SortOrder {
69 SortOrder {
70 expr: column.expr().clone(),
71 descending: true,
72 nulls_last: true,
73 }
74}
75
76pub fn parse_type_name(name: &str) -> Result<DataType, String> {
81 let s = name.trim().to_lowercase();
82 if s.starts_with("decimal(") && s.contains(')') {
83 return Ok(DataType::Float64);
84 }
85 Ok(match s.as_str() {
86 "int" | "integer" => DataType::Int32,
87 "long" | "bigint" => DataType::Int64,
88 "float" => DataType::Float32,
89 "double" => DataType::Float64,
90 "string" | "str" => DataType::String,
91 "boolean" | "bool" => DataType::Boolean,
92 "date" => DataType::Date,
93 "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
94 _ => return Err(format!("unknown type name: {name}")),
95 })
96}
97
98pub fn col(name: &str) -> Column {
100 Column::new(name.to_string())
101}
102
103pub fn grouping(column: &Column) -> Column {
105 let _ = column;
106 Column::from_expr(lit(0i32), Some("grouping".to_string()))
107}
108
109pub fn grouping_id(_columns: &[Column]) -> Column {
111 Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
112}
113
114pub fn lit_i32(value: i32) -> Column {
116 let expr: Expr = lit(value);
117 Column::from_expr(expr, None)
118}
119
120pub fn lit_i64(value: i64) -> Column {
121 let expr: Expr = lit(value);
122 Column::from_expr(expr, None)
123}
124
125pub fn lit_f64(value: f64) -> Column {
126 let expr: Expr = lit(value);
127 Column::from_expr(expr, None)
128}
129
130pub fn lit_bool(value: bool) -> Column {
131 let expr: Expr = lit(value);
132 Column::from_expr(expr, None)
133}
134
135pub fn lit_str(value: &str) -> Column {
136 let expr: Expr = lit(value);
137 Column::from_expr(expr, None)
138}
139
140pub fn lit_null(dtype: &str) -> Result<Column, String> {
143 Column::lit_null(dtype)
144}
145
146pub fn count(col: &Column) -> Column {
148 Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
149}
150
151pub fn sum(col: &Column) -> Column {
153 Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
154}
155
156pub fn avg(col: &Column) -> Column {
158 Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
159}
160
161pub fn mean(col: &Column) -> Column {
163 avg(col)
164}
165
166pub fn max(col: &Column) -> Column {
168 Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
169}
170
171pub fn min(col: &Column) -> Column {
173 Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
174}
175
176pub fn first(col: &Column, ignorenulls: bool) -> Column {
178 let _ = ignorenulls;
179 Column::from_expr(col.expr().clone().first(), None)
180}
181
182pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
184 let _ = ignorenulls;
185 Column::from_expr(col.expr().clone().first(), None)
186}
187
188pub fn count_if(col: &Column) -> Column {
190 use polars::prelude::DataType;
191 Column::from_expr(
192 col.expr().clone().cast(DataType::Int64).sum(),
193 Some("count_if".to_string()),
194 )
195}
196
197pub fn try_sum(col: &Column) -> Column {
199 Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
200}
201
202pub fn try_avg(col: &Column) -> Column {
204 Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
205}
206
207pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
209 use polars::prelude::{SortOptions, as_struct};
210 let st = as_struct(vec![
211 ord_col.expr().clone().alias("_ord"),
212 value_col.expr().clone().alias("_val"),
213 ]);
214 let e = st
215 .sort(SortOptions::default().with_order_descending(true))
216 .first()
217 .struct_()
218 .field_by_name("_val");
219 Column::from_expr(e, None)
220}
221
222pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
224 use polars::prelude::{SortOptions, as_struct};
225 let st = as_struct(vec![
226 ord_col.expr().clone().alias("_ord"),
227 value_col.expr().clone().alias("_val"),
228 ]);
229 let e = st
230 .sort(SortOptions::default())
231 .first()
232 .struct_()
233 .field_by_name("_val");
234 Column::from_expr(e, None)
235}
236
237pub fn collect_list(col: &Column) -> Column {
239 Column::from_expr(
240 col.expr().clone().implode(),
241 Some("collect_list".to_string()),
242 )
243}
244
245pub fn collect_set(col: &Column) -> Column {
247 Column::from_expr(
248 col.expr().clone().unique().implode(),
249 Some("collect_set".to_string()),
250 )
251}
252
253pub fn bool_and(col: &Column) -> Column {
255 Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
256}
257
258pub fn every(col: &Column) -> Column {
260 Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
261}
262
263pub fn stddev(col: &Column) -> Column {
265 Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
266}
267
268pub fn variance(col: &Column) -> Column {
270 Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
271}
272
273pub fn stddev_pop(col: &Column) -> Column {
275 Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
276}
277
278pub fn stddev_samp(col: &Column) -> Column {
280 stddev(col)
281}
282
283pub fn std(col: &Column) -> Column {
285 stddev(col)
286}
287
288pub fn var_pop(col: &Column) -> Column {
290 Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
291}
292
293pub fn var_samp(col: &Column) -> Column {
295 variance(col)
296}
297
298pub fn median(col: &Column) -> Column {
300 use polars::prelude::QuantileMethod;
301 Column::from_expr(
302 col.expr()
303 .clone()
304 .quantile(lit(0.5), QuantileMethod::Linear),
305 Some("median".to_string()),
306 )
307}
308
309pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
311 use polars::prelude::QuantileMethod;
312 Column::from_expr(
313 col.expr()
314 .clone()
315 .quantile(lit(percentage), QuantileMethod::Linear),
316 Some(format!("approx_percentile({percentage})")),
317 )
318}
319
320pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
322 approx_percentile(col, percentage, accuracy)
323}
324
325pub fn mode(col: &Column) -> Column {
327 col.clone().mode()
328}
329
330pub fn count_distinct(col: &Column) -> Column {
332 use polars::prelude::DataType;
333 Column::from_expr(
334 col.expr().clone().n_unique().cast(DataType::Int64),
335 Some("count_distinct".to_string()),
336 )
337}
338
339pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
341 use polars::prelude::DataType;
342 Column::from_expr(
343 col.expr().clone().n_unique().cast(DataType::Int64),
344 Some("approx_count_distinct".to_string()),
345 )
346}
347
348pub fn kurtosis(col: &Column) -> Column {
350 Column::from_expr(
351 col.expr()
352 .clone()
353 .cast(DataType::Float64)
354 .kurtosis(true, true),
355 Some("kurtosis".to_string()),
356 )
357}
358
359pub fn skewness(col: &Column) -> Column {
361 Column::from_expr(
362 col.expr().clone().cast(DataType::Float64).skew(true),
363 Some("skewness".to_string()),
364 )
365}
366
367pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
369 use polars::prelude::{col as pl_col, len};
370 let c1 = pl_col(col1).cast(DataType::Float64);
371 let c2 = pl_col(col2).cast(DataType::Float64);
372 let n = len().cast(DataType::Float64);
373 let sum_ab = (c1.clone() * c2.clone()).sum();
374 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
375 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
376 (sum_ab - sum_a * sum_b / n.clone()) / n
377}
378
379pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
381 use polars::prelude::len;
382 let c1 = col1.expr().clone().cast(DataType::Float64);
383 let c2 = col2.expr().clone().cast(DataType::Float64);
384 let n = len().cast(DataType::Float64);
385 let sum_ab = (c1.clone() * c2.clone()).sum();
386 let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
387 let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
388 let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
389 Column::from_expr(e, Some("covar_pop".to_string()))
390}
391
392pub fn corr(col1: &Column, col2: &Column) -> Column {
394 use polars::prelude::{len, lit, when};
395 let c1 = col1.expr().clone().cast(DataType::Float64);
396 let c2 = col2.expr().clone().cast(DataType::Float64);
397 let n = len().cast(DataType::Float64);
398 let n1 = (len() - lit(1)).cast(DataType::Float64);
399 let sum_ab = (c1.clone() * c2.clone()).sum();
400 let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
401 let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
402 let sum_a2 = (c1.clone() * c1).sum();
403 let sum_b2 = (c2.clone() * c2).sum();
404 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
405 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
406 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
407 let std_a = var_a.sqrt();
408 let std_b = var_b.sqrt();
409 let e = when(len().gt(lit(1)))
410 .then(cov_samp / (std_a * std_b))
411 .otherwise(lit(f64::NAN));
412 Column::from_expr(e, Some("corr".to_string()))
413}
414
415pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
417 use polars::prelude::{col as pl_col, len, lit, when};
418 let c1 = pl_col(col1).cast(DataType::Float64);
419 let c2 = pl_col(col2).cast(DataType::Float64);
420 let n = len().cast(DataType::Float64);
421 let sum_ab = (c1.clone() * c2.clone()).sum();
422 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
423 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
424 when(len().gt(lit(1)))
425 .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
426 .otherwise(lit(f64::NAN))
427}
428
429pub fn corr_expr(col1: &str, col2: &str) -> Expr {
431 use polars::prelude::{col as pl_col, len, lit, when};
432 let c1 = pl_col(col1).cast(DataType::Float64);
433 let c2 = pl_col(col2).cast(DataType::Float64);
434 let n = len().cast(DataType::Float64);
435 let n1 = (len() - lit(1)).cast(DataType::Float64);
436 let sum_ab = (c1.clone() * c2.clone()).sum();
437 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
438 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
439 let sum_a2 = (c1.clone() * c1).sum();
440 let sum_b2 = (c2.clone() * c2).sum();
441 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
442 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
443 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
444 let std_a = var_a.sqrt();
445 let std_b = var_b.sqrt();
446 when(len().gt(lit(1)))
447 .then(cov_samp / (std_a * std_b))
448 .otherwise(lit(f64::NAN))
449}
450
451fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
454 use polars::prelude::col as pl_col;
455 let y = pl_col(y_col).cast(DataType::Float64);
456 let x = pl_col(x_col).cast(DataType::Float64);
457 let cond = y.clone().is_not_null().and(x.clone().is_not_null());
458 let n = y
459 .clone()
460 .filter(cond.clone())
461 .count()
462 .cast(DataType::Float64);
463 let sum_x = x.clone().filter(cond.clone()).sum();
464 let sum_y = y.clone().filter(cond.clone()).sum();
465 let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
466 let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
467 let sum_xy = (x * y).filter(cond).sum();
468 (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
469}
470
471pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
473 let (n, ..) = regr_cond_and_sums(y_col, x_col);
474 n
475}
476
477pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
479 use polars::prelude::{lit, when};
480 let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
481 when(n.clone().gt(lit(0.0)))
482 .then(sum_x / n)
483 .otherwise(lit(f64::NAN))
484}
485
486pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
488 use polars::prelude::{lit, when};
489 let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
490 when(n.clone().gt(lit(0.0)))
491 .then(sum_y / n)
492 .otherwise(lit(f64::NAN))
493}
494
495pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
497 use polars::prelude::{lit, when};
498 let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
499 when(n.clone().gt(lit(0.0)))
500 .then(sum_xx - sum_x.clone() * sum_x / n)
501 .otherwise(lit(f64::NAN))
502}
503
504pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
506 use polars::prelude::{lit, when};
507 let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
508 when(n.clone().gt(lit(0.0)))
509 .then(sum_yy - sum_y.clone() * sum_y / n)
510 .otherwise(lit(f64::NAN))
511}
512
513pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
515 use polars::prelude::{lit, when};
516 let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
517 when(n.clone().gt(lit(0.0)))
518 .then(sum_xy - sum_x * sum_y / n)
519 .otherwise(lit(f64::NAN))
520}
521
522pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
524 use polars::prelude::{lit, when};
525 let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
526 let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
527 let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
528 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
529 .then(regr_sxy / regr_sxx)
530 .otherwise(lit(f64::NAN))
531}
532
533pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
535 use polars::prelude::{lit, when};
536 let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
537 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
538 let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
539 let slope = regr_sxy.clone() / regr_sxx.clone();
540 let avg_y = sum_y / n.clone();
541 let avg_x = sum_x / n.clone();
542 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
543 .then(avg_y - slope * avg_x)
544 .otherwise(lit(f64::NAN))
545}
546
547pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
549 use polars::prelude::{lit, when};
550 let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
551 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
552 let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
553 let regr_sxy = sum_xy - sum_x * sum_y / n;
554 when(
555 regr_sxx
556 .clone()
557 .gt(lit(0.0))
558 .and(regr_syy.clone().gt(lit(0.0))),
559 )
560 .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
561 .otherwise(lit(f64::NAN))
562}
563
564pub fn when(condition: &Column) -> WhenBuilder {
576 WhenBuilder::new(condition.expr().clone())
577}
578
579pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
581 use polars::prelude::*;
582 let null_expr = lit(NULL);
583 let expr = polars::prelude::when(condition.expr().clone())
584 .then(value.expr().clone())
585 .otherwise(null_expr);
586 crate::column::Column::from_expr(expr, None)
587}
588
589pub struct WhenBuilder {
591 condition: Expr,
592}
593
594impl WhenBuilder {
595 fn new(condition: Expr) -> Self {
596 WhenBuilder { condition }
597 }
598
599 pub fn then(self, value: &Column) -> ThenBuilder {
601 use polars::prelude::*;
602 let when_then = when(self.condition).then(value.expr().clone());
603 ThenBuilder::new(when_then)
604 }
605
606 pub fn otherwise(self, _value: &Column) -> Column {
611 panic!(
614 "when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)"
615 );
616 }
617}
618
619pub struct ThenBuilder {
621 state: WhenThenState,
622}
623
624enum WhenThenState {
625 Single(Box<polars::prelude::Then>),
626 Chained(Box<polars::prelude::ChainedThen>),
627}
628
629pub struct ChainedWhenBuilder {
631 inner: polars::prelude::ChainedWhen,
632}
633
634impl ThenBuilder {
635 fn new(when_then: polars::prelude::Then) -> Self {
636 ThenBuilder {
637 state: WhenThenState::Single(Box::new(when_then)),
638 }
639 }
640
641 fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
642 ThenBuilder {
643 state: WhenThenState::Chained(Box::new(chained)),
644 }
645 }
646
647 pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
649 let chained_when = match self.state {
650 WhenThenState::Single(t) => t.when(condition.expr().clone()),
651 WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
652 };
653 ChainedWhenBuilder {
654 inner: chained_when,
655 }
656 }
657
658 pub fn otherwise(self, value: &Column) -> Column {
660 let expr = match self.state {
661 WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
662 WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
663 };
664 crate::column::Column::from_expr(expr, None)
665 }
666}
667
668impl ChainedWhenBuilder {
669 pub fn then(self, value: &Column) -> ThenBuilder {
671 ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
672 }
673}
674
675pub fn upper(column: &Column) -> Column {
677 column.clone().upper()
678}
679
680pub fn lower(column: &Column) -> Column {
682 column.clone().lower()
683}
684
685pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
687 column.clone().substr(start, length)
688}
689
690pub fn length(column: &Column) -> Column {
692 column.clone().length()
693}
694
695pub fn trim(column: &Column) -> Column {
697 column.clone().trim()
698}
699
700pub fn ltrim(column: &Column) -> Column {
702 column.clone().ltrim()
703}
704
705pub fn rtrim(column: &Column) -> Column {
707 column.clone().rtrim()
708}
709
710pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
712 column.clone().btrim(trim_str)
713}
714
715pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
717 column.clone().locate(substr, pos)
718}
719
720pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
722 column.clone().conv(from_base, to_base)
723}
724
725pub fn hex(column: &Column) -> Column {
727 column.clone().hex()
728}
729
730pub fn unhex(column: &Column) -> Column {
732 column.clone().unhex()
733}
734
735pub fn encode(column: &Column, charset: &str) -> Column {
737 column.clone().encode(charset)
738}
739
740pub fn decode(column: &Column, charset: &str) -> Column {
742 column.clone().decode(charset)
743}
744
745pub fn to_binary(column: &Column, fmt: &str) -> Column {
747 column.clone().to_binary(fmt)
748}
749
750pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
752 column.clone().try_to_binary(fmt)
753}
754
755pub fn aes_encrypt(column: &Column, key: &str) -> Column {
757 column.clone().aes_encrypt(key)
758}
759
760pub fn aes_decrypt(column: &Column, key: &str) -> Column {
762 column.clone().aes_decrypt(key)
763}
764
765pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
767 column.clone().try_aes_decrypt(key)
768}
769
770pub fn bin(column: &Column) -> Column {
772 column.clone().bin()
773}
774
775pub fn getbit(column: &Column, pos: i64) -> Column {
777 column.clone().getbit(pos)
778}
779
780pub fn bit_and(left: &Column, right: &Column) -> Column {
782 left.clone().bit_and(right)
783}
784
785pub fn bit_or(left: &Column, right: &Column) -> Column {
787 left.clone().bit_or(right)
788}
789
790pub fn bit_xor(left: &Column, right: &Column) -> Column {
792 left.clone().bit_xor(right)
793}
794
795pub fn bit_count(column: &Column) -> Column {
797 column.clone().bit_count()
798}
799
800pub fn bitwise_not(column: &Column) -> Column {
802 column.clone().bitwise_not()
803}
804
805pub fn bitmap_bit_position(column: &Column) -> Column {
809 use polars::prelude::DataType;
810 let expr = column.expr().clone().cast(DataType::Int32);
811 Column::from_expr(expr, None)
812}
813
814pub fn bitmap_bucket_number(column: &Column) -> Column {
816 use polars::prelude::DataType;
817 let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
818 Column::from_expr(expr, None)
819}
820
821pub fn bitmap_count(column: &Column) -> Column {
823 use polars::prelude::{DataType, Field};
824 let expr = column.expr().clone().map(
825 |s| crate::column::expect_col(crate::udfs::apply_bitmap_count(s)),
826 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
827 );
828 Column::from_expr(expr, None)
829}
830
831pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
834 use polars::prelude::{DataType, Field};
835 column.expr().clone().implode().map(
836 |s| crate::column::expect_col(crate::udfs::apply_bitmap_construct_agg(s)),
837 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
838 )
839}
840
841pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
843 use polars::prelude::{DataType, Field};
844 column.expr().clone().implode().map(
845 |s| crate::column::expect_col(crate::udfs::apply_bitmap_or_agg(s)),
846 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
847 )
848}
849
850pub fn bit_get(column: &Column, pos: i64) -> Column {
852 getbit(column, pos)
853}
854
855pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
858 column.clone().assert_true(err_msg)
859}
860
861pub fn raise_error(message: &str) -> Column {
863 let msg = message.to_string();
864 let expr = lit(0i64).map(
865 move |_col| -> PolarsResult<polars::prelude::Column> {
866 Err(PolarsError::ComputeError(msg.clone().into()))
867 },
868 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
869 );
870 Column::from_expr(expr, Some("raise_error".to_string()))
871}
872
873pub fn broadcast(df: &DataFrame) -> DataFrame {
875 df.clone()
876}
877
878pub fn spark_partition_id() -> Column {
880 Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
881}
882
883pub fn input_file_name() -> Column {
885 Column::from_expr(lit(""), Some("input_file_name".to_string()))
886}
887
888pub fn monotonically_increasing_id() -> Column {
891 Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
892}
893
894pub fn current_catalog() -> Column {
896 Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
897}
898
899pub fn current_database() -> Column {
901 Column::from_expr(lit("default"), Some("current_database".to_string()))
902}
903
904pub fn current_schema() -> Column {
906 Column::from_expr(lit("default"), Some("current_schema".to_string()))
907}
908
909pub fn current_user() -> Column {
911 Column::from_expr(lit("unknown"), Some("current_user".to_string()))
912}
913
914pub fn user() -> Column {
916 Column::from_expr(lit("unknown"), Some("user".to_string()))
917}
918
919pub fn rand(seed: Option<u64>) -> Column {
922 Column::from_rand(seed)
923}
924
925pub fn randn(seed: Option<u64>) -> Column {
928 Column::from_randn(seed)
929}
930
931pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
934 use polars::prelude::Column as PlColumn;
935
936 let session = crate::session::get_thread_udf_session().ok_or_else(|| {
937 PolarsError::InvalidOperation(
938 "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
939 )
940 })?;
941 let case_sensitive = session.is_case_sensitive();
942
943 let udf = session
945 .udf_registry
946 .get_rust_udf(name, case_sensitive)
947 .ok_or_else(|| {
948 PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
949 })?;
950
951 let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
952 let output_type = DataType::String; let expr = if exprs.len() == 1 {
955 let udf = udf.clone();
956 exprs.into_iter().next().unwrap().map(
957 move |c| {
958 let s = c.take_materialized_series();
959 udf.apply(&[s]).map(|out| PlColumn::new("_".into(), out))
960 },
961 move |_schema, field| Ok(Field::new(field.name().clone(), output_type.clone())),
962 )
963 } else {
964 let udf = udf.clone();
965 let first = exprs[0].clone();
966 let rest: Vec<Expr> = exprs[1..].to_vec();
967 first.map_many(
968 move |columns| {
969 let series: Vec<Series> = columns
970 .iter_mut()
971 .map(|c| std::mem::take(c).take_materialized_series())
972 .collect();
973 udf.apply(&series).map(|out| PlColumn::new("_".into(), out))
974 },
975 &rest,
976 move |_schema, fields| Ok(Field::new(fields[0].name().clone(), output_type.clone())),
977 )
978 };
979
980 Ok(Column::from_expr(expr, Some(format!("{name}()"))))
981}
982
983pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
985 left.clone().arrays_overlap(right)
986}
987
988pub fn arrays_zip(left: &Column, right: &Column) -> Column {
990 left.clone().arrays_zip(right)
991}
992
993pub fn explode_outer(column: &Column) -> Column {
995 column.clone().explode_outer()
996}
997
998pub fn posexplode_outer(column: &Column) -> (Column, Column) {
1000 column.clone().posexplode_outer()
1001}
1002
1003pub fn array_agg(column: &Column) -> Column {
1005 column.clone().array_agg()
1006}
1007
1008pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
1010 column.clone().transform_keys(key_expr)
1011}
1012
1013pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
1015 column.clone().transform_values(value_expr)
1016}
1017
1018pub fn str_to_map(
1020 column: &Column,
1021 pair_delim: Option<&str>,
1022 key_value_delim: Option<&str>,
1023) -> Column {
1024 let pd = pair_delim.unwrap_or(",");
1025 let kvd = key_value_delim.unwrap_or(":");
1026 column.clone().str_to_map(pd, kvd)
1027}
1028
1029pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
1031 column.clone().regexp_extract(pattern, group_index)
1032}
1033
1034pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
1036 column.clone().regexp_replace(pattern, replacement)
1037}
1038
1039pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
1041 column.clone().split(delimiter, limit)
1042}
1043
1044pub fn initcap(column: &Column) -> Column {
1046 column.clone().initcap()
1047}
1048
1049pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
1051 column.clone().regexp_extract_all(pattern)
1052}
1053
1054pub fn regexp_like(column: &Column, pattern: &str) -> Column {
1056 column.clone().regexp_like(pattern)
1057}
1058
1059pub fn regexp_count(column: &Column, pattern: &str) -> Column {
1061 column.clone().regexp_count(pattern)
1062}
1063
1064pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
1066 column.clone().regexp_substr(pattern)
1067}
1068
1069pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
1071 column.clone().split_part(delimiter, part_num)
1072}
1073
1074pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
1076 column.clone().regexp_instr(pattern, group_idx)
1077}
1078
1079pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
1081 str_column.clone().find_in_set(set_column)
1082}
1083
1084pub fn format_string(format: &str, columns: &[&Column]) -> Column {
1086 use polars::prelude::*;
1087 if columns.is_empty() {
1088 panic!("format_string needs at least one column");
1089 }
1090 let format_owned = format.to_string();
1091 let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
1092 let expr = columns[0].expr().clone().map_many(
1093 move |cols| {
1094 crate::column::expect_col(crate::udfs::apply_format_string(cols, &format_owned))
1095 },
1096 &args,
1097 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::String)),
1098 );
1099 crate::column::Column::from_expr(expr, None)
1100}
1101
1102pub fn printf(format: &str, columns: &[&Column]) -> Column {
1104 format_string(format, columns)
1105}
1106
1107pub fn repeat(column: &Column, n: i32) -> Column {
1109 column.clone().repeat(n)
1110}
1111
1112pub fn reverse(column: &Column) -> Column {
1114 column.clone().reverse()
1115}
1116
1117pub fn instr(column: &Column, substr: &str) -> Column {
1119 column.clone().instr(substr)
1120}
1121
1122pub fn position(substr: &str, column: &Column) -> Column {
1124 column.clone().instr(substr)
1125}
1126
1127pub fn ascii(column: &Column) -> Column {
1129 column.clone().ascii()
1130}
1131
1132pub fn format_number(column: &Column, decimals: u32) -> Column {
1134 column.clone().format_number(decimals)
1135}
1136
1137pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
1139 column.clone().overlay(replace, pos, length)
1140}
1141
1142pub fn char(column: &Column) -> Column {
1144 column.clone().char()
1145}
1146
1147pub fn chr(column: &Column) -> Column {
1149 column.clone().chr()
1150}
1151
1152pub fn base64(column: &Column) -> Column {
1154 column.clone().base64()
1155}
1156
1157pub fn unbase64(column: &Column) -> Column {
1159 column.clone().unbase64()
1160}
1161
1162pub fn sha1(column: &Column) -> Column {
1164 column.clone().sha1()
1165}
1166
1167pub fn sha2(column: &Column, bit_length: i32) -> Column {
1169 column.clone().sha2(bit_length)
1170}
1171
1172pub fn md5(column: &Column) -> Column {
1174 column.clone().md5()
1175}
1176
1177pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1179 column.clone().lpad(length, pad)
1180}
1181
1182pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1184 column.clone().rpad(length, pad)
1185}
1186
1187pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1189 column.clone().translate(from_str, to_str)
1190}
1191
1192pub fn mask(
1194 column: &Column,
1195 upper_char: Option<char>,
1196 lower_char: Option<char>,
1197 digit_char: Option<char>,
1198 other_char: Option<char>,
1199) -> Column {
1200 column
1201 .clone()
1202 .mask(upper_char, lower_char, digit_char, other_char)
1203}
1204
1205pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1207 column.clone().substring_index(delimiter, count)
1208}
1209
1210pub fn left(column: &Column, n: i64) -> Column {
1212 column.clone().left(n)
1213}
1214
1215pub fn right(column: &Column, n: i64) -> Column {
1217 column.clone().right(n)
1218}
1219
1220pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1222 column.clone().replace(search, replacement)
1223}
1224
1225pub fn startswith(column: &Column, prefix: &str) -> Column {
1227 column.clone().startswith(prefix)
1228}
1229
1230pub fn endswith(column: &Column, suffix: &str) -> Column {
1232 column.clone().endswith(suffix)
1233}
1234
1235pub fn contains(column: &Column, substring: &str) -> Column {
1237 column.clone().contains(substring)
1238}
1239
1240pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1243 column.clone().like(pattern, escape_char)
1244}
1245
1246pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1249 column.clone().ilike(pattern, escape_char)
1250}
1251
1252pub fn rlike(column: &Column, pattern: &str) -> Column {
1254 column.clone().regexp_like(pattern)
1255}
1256
1257pub fn regexp(column: &Column, pattern: &str) -> Column {
1259 rlike(column, pattern)
1260}
1261
1262pub fn soundex(column: &Column) -> Column {
1264 column.clone().soundex()
1265}
1266
1267pub fn levenshtein(column: &Column, other: &Column) -> Column {
1269 column.clone().levenshtein(other)
1270}
1271
1272pub fn crc32(column: &Column) -> Column {
1274 column.clone().crc32()
1275}
1276
1277pub fn xxhash64(column: &Column) -> Column {
1279 column.clone().xxhash64()
1280}
1281
1282pub fn abs(column: &Column) -> Column {
1284 column.clone().abs()
1285}
1286
1287pub fn ceil(column: &Column) -> Column {
1289 column.clone().ceil()
1290}
1291
1292pub fn floor(column: &Column) -> Column {
1294 column.clone().floor()
1295}
1296
1297pub fn round(column: &Column, decimals: u32) -> Column {
1299 column.clone().round(decimals)
1300}
1301
1302pub fn bround(column: &Column, scale: i32) -> Column {
1304 column.clone().bround(scale)
1305}
1306
1307pub fn negate(column: &Column) -> Column {
1309 column.clone().negate()
1310}
1311
1312pub fn negative(column: &Column) -> Column {
1314 negate(column)
1315}
1316
1317pub fn positive(column: &Column) -> Column {
1319 column.clone()
1320}
1321
1322pub fn cot(column: &Column) -> Column {
1324 column.clone().cot()
1325}
1326
1327pub fn csc(column: &Column) -> Column {
1329 column.clone().csc()
1330}
1331
1332pub fn sec(column: &Column) -> Column {
1334 column.clone().sec()
1335}
1336
1337pub fn e() -> Column {
1339 Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1340}
1341
1342pub fn pi() -> Column {
1344 Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1345}
1346
1347pub fn sqrt(column: &Column) -> Column {
1349 column.clone().sqrt()
1350}
1351
1352pub fn pow(column: &Column, exp: i64) -> Column {
1354 column.clone().pow(exp)
1355}
1356
1357pub fn exp(column: &Column) -> Column {
1359 column.clone().exp()
1360}
1361
1362pub fn log(column: &Column) -> Column {
1364 column.clone().log()
1365}
1366
1367pub fn log_with_base(column: &Column, base: f64) -> Column {
1369 crate::column::Column::from_expr(column.expr().clone().log(lit(base)), None)
1370}
1371
1372pub fn sin(column: &Column) -> Column {
1374 column.clone().sin()
1375}
1376
1377pub fn cos(column: &Column) -> Column {
1379 column.clone().cos()
1380}
1381
1382pub fn tan(column: &Column) -> Column {
1384 column.clone().tan()
1385}
1386
1387pub fn asin(column: &Column) -> Column {
1389 column.clone().asin()
1390}
1391
1392pub fn acos(column: &Column) -> Column {
1394 column.clone().acos()
1395}
1396
1397pub fn atan(column: &Column) -> Column {
1399 column.clone().atan()
1400}
1401
1402pub fn atan2(y: &Column, x: &Column) -> Column {
1404 y.clone().atan2(x)
1405}
1406
1407pub fn degrees(column: &Column) -> Column {
1409 column.clone().degrees()
1410}
1411
1412pub fn radians(column: &Column) -> Column {
1414 column.clone().radians()
1415}
1416
1417pub fn signum(column: &Column) -> Column {
1419 column.clone().signum()
1420}
1421
1422pub fn sign(column: &Column) -> Column {
1424 signum(column)
1425}
1426
1427pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1431 let dtype = parse_type_name(type_name)?;
1432 if dtype == DataType::Boolean {
1433 let expr = column.expr().clone().map(
1434 |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, true)),
1435 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1436 );
1437 return Ok(Column::from_expr(expr, None));
1438 }
1439 if dtype == DataType::Date {
1440 let expr = column.expr().clone().map(
1441 |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, true)),
1442 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1443 );
1444 return Ok(Column::from_expr(expr, None));
1445 }
1446 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1447 let target = dtype.clone();
1448 let expr = column.expr().clone().map(
1450 move |col| {
1451 crate::column::expect_col(crate::udfs::apply_string_to_int(
1452 col,
1453 true,
1454 target.clone(),
1455 ))
1456 },
1457 move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1458 );
1459 return Ok(Column::from_expr(expr, None));
1460 }
1461 if dtype == DataType::Float64 {
1462 let expr = column.expr().clone().map(
1464 |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, true)),
1465 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1466 );
1467 return Ok(Column::from_expr(expr, None));
1468 }
1469 Ok(Column::from_expr(
1470 column.expr().clone().strict_cast(dtype),
1471 None,
1472 ))
1473}
1474
1475pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1479 let dtype = parse_type_name(type_name)?;
1480 if dtype == DataType::Boolean {
1481 let expr = column.expr().clone().map(
1482 |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, false)),
1483 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1484 );
1485 return Ok(Column::from_expr(expr, None));
1486 }
1487 if dtype == DataType::Date {
1488 let expr = column.expr().clone().map(
1489 |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, false)),
1490 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1491 );
1492 return Ok(Column::from_expr(expr, None));
1493 }
1494 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1495 let target = dtype.clone();
1496 let expr = column.expr().clone().map(
1497 move |col| {
1498 crate::column::expect_col(crate::udfs::apply_string_to_int(
1499 col,
1500 false,
1501 target.clone(),
1502 ))
1503 },
1504 move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1505 );
1506 return Ok(Column::from_expr(expr, None));
1507 }
1508 if dtype == DataType::Float64 {
1509 let expr = column.expr().clone().map(
1510 |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, false)),
1511 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1512 );
1513 return Ok(Column::from_expr(expr, None));
1514 }
1515 Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1516}
1517
1518pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1522 match format {
1523 Some(fmt) => Ok(column
1524 .clone()
1525 .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1526 None => cast(column, "string"),
1527 }
1528}
1529
1530pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1532 to_char(column, format)
1533}
1534
1535pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1538 cast(column, "double")
1539}
1540
1541pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1544 try_cast(column, "double")
1545}
1546
1547pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1550 use polars::prelude::{DataType, Field, TimeUnit};
1551 let fmt_owned = format.map(|s| s.to_string());
1552 let expr = column.expr().clone().map(
1553 move |s| {
1554 crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1555 s,
1556 fmt_owned.as_deref(),
1557 true,
1558 ))
1559 },
1560 |_schema, field| {
1561 Ok(Field::new(
1562 field.name().clone(),
1563 DataType::Datetime(TimeUnit::Microseconds, None),
1564 ))
1565 },
1566 );
1567 Ok(crate::column::Column::from_expr(expr, None))
1568}
1569
1570pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1573 use polars::prelude::*;
1574 let fmt_owned = format.map(|s| s.to_string());
1575 let expr = column.expr().clone().map(
1576 move |s| {
1577 crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1578 s,
1579 fmt_owned.as_deref(),
1580 false,
1581 ))
1582 },
1583 |_schema, field| {
1584 Ok(Field::new(
1585 field.name().clone(),
1586 DataType::Datetime(TimeUnit::Microseconds, None),
1587 ))
1588 },
1589 );
1590 Ok(crate::column::Column::from_expr(expr, None))
1591}
1592
1593pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1595 use polars::prelude::{DataType, Field, TimeUnit};
1596 match format {
1597 None => crate::cast(column, "timestamp"),
1598 Some(fmt) => {
1599 let fmt_owned = fmt.to_string();
1600 let expr = column.expr().clone().map(
1601 move |s| {
1602 crate::column::expect_col(crate::udfs::apply_to_timestamp_ltz_format(
1603 s,
1604 Some(&fmt_owned),
1605 true,
1606 ))
1607 },
1608 |_schema, field| {
1609 Ok(Field::new(
1610 field.name().clone(),
1611 DataType::Datetime(TimeUnit::Microseconds, None),
1612 ))
1613 },
1614 );
1615 Ok(crate::column::Column::from_expr(expr, None))
1616 }
1617 }
1618}
1619
1620pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1622 use polars::prelude::{DataType, Field, TimeUnit};
1623 match format {
1624 None => crate::cast(column, "timestamp"),
1625 Some(fmt) => {
1626 let fmt_owned = fmt.to_string();
1627 let expr = column.expr().clone().map(
1628 move |s| {
1629 crate::column::expect_col(crate::udfs::apply_to_timestamp_ntz_format(
1630 s,
1631 Some(&fmt_owned),
1632 true,
1633 ))
1634 },
1635 |_schema, field| {
1636 Ok(Field::new(
1637 field.name().clone(),
1638 DataType::Datetime(TimeUnit::Microseconds, None),
1639 ))
1640 },
1641 );
1642 Ok(crate::column::Column::from_expr(expr, None))
1643 }
1644 }
1645}
1646
1647pub fn try_divide(left: &Column, right: &Column) -> Column {
1649 use polars::prelude::*;
1650 let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1651 let null_expr = lit(NULL);
1652 let div_expr =
1653 left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1654 let expr = polars::prelude::when(zero_cond)
1655 .then(null_expr)
1656 .otherwise(div_expr);
1657 crate::column::Column::from_expr(expr, None)
1658}
1659
1660pub fn try_add(left: &Column, right: &Column) -> Column {
1662 let args = [right.expr().clone()];
1663 let expr = left.expr().clone().map_many(
1664 |cols| crate::column::expect_col(crate::udfs::apply_try_add(cols)),
1665 &args,
1666 |_schema, fields| Ok(fields[0].clone()),
1667 );
1668 Column::from_expr(expr, None)
1669}
1670
1671pub fn try_subtract(left: &Column, right: &Column) -> Column {
1673 let args = [right.expr().clone()];
1674 let expr = left.expr().clone().map_many(
1675 |cols| crate::column::expect_col(crate::udfs::apply_try_subtract(cols)),
1676 &args,
1677 |_schema, fields| Ok(fields[0].clone()),
1678 );
1679 Column::from_expr(expr, None)
1680}
1681
1682pub fn try_multiply(left: &Column, right: &Column) -> Column {
1684 let args = [right.expr().clone()];
1685 let expr = left.expr().clone().map_many(
1686 |cols| crate::column::expect_col(crate::udfs::apply_try_multiply(cols)),
1687 &args,
1688 |_schema, fields| Ok(fields[0].clone()),
1689 );
1690 Column::from_expr(expr, None)
1691}
1692
1693pub fn try_element_at(column: &Column, index: i64) -> Column {
1695 column.clone().element_at(index)
1696}
1697
1698pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1700 if num_bucket <= 0 {
1701 panic!(
1702 "width_bucket: num_bucket must be positive, got {}",
1703 num_bucket
1704 );
1705 }
1706 use polars::prelude::*;
1707 let v = value.expr().clone().cast(DataType::Float64);
1708 let min_expr = lit(min_val);
1709 let max_expr = lit(max_val);
1710 let nb = num_bucket as f64;
1711 let width = (max_val - min_val) / nb;
1712 let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1713 let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1714 let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1715 let expr = polars::prelude::when(v.clone().lt(min_expr))
1716 .then(lit(0i64))
1717 .when(v.gt_eq(max_expr))
1718 .then(lit(num_bucket + 1))
1719 .otherwise(bucket_clamped);
1720 crate::column::Column::from_expr(expr, None)
1721}
1722
1723pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1725 use polars::prelude::*;
1726 if columns.is_empty() {
1727 panic!("elt requires at least one column");
1728 }
1729 let idx_expr = index.expr().clone();
1730 let null_expr = lit(NULL);
1731 let mut expr = null_expr;
1732 for (i, c) in columns.iter().enumerate().rev() {
1733 let n = (i + 1) as i64;
1734 expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1735 .then(c.expr().clone())
1736 .otherwise(expr);
1737 }
1738 crate::column::Column::from_expr(expr, None)
1739}
1740
1741pub fn bit_length(column: &Column) -> Column {
1743 column.clone().bit_length()
1744}
1745
1746pub fn octet_length(column: &Column) -> Column {
1748 column.clone().octet_length()
1749}
1750
1751pub fn char_length(column: &Column) -> Column {
1753 column.clone().char_length()
1754}
1755
1756pub fn character_length(column: &Column) -> Column {
1758 column.clone().character_length()
1759}
1760
1761pub fn typeof_(column: &Column) -> Column {
1763 column.clone().typeof_()
1764}
1765
1766pub fn isnan(column: &Column) -> Column {
1768 column.clone().is_nan()
1769}
1770
1771pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1773 if columns.is_empty() {
1774 return Err("greatest requires at least one column".to_string());
1775 }
1776 if columns.len() == 1 {
1777 return Ok((*columns[0]).clone());
1778 }
1779 let mut expr = columns[0].expr().clone();
1780 for c in columns.iter().skip(1) {
1781 let args = [c.expr().clone()];
1782 expr = expr.map_many(
1783 |cols| crate::column::expect_col(crate::udfs::apply_greatest2(cols)),
1784 &args,
1785 |_schema, fields| Ok(fields[0].clone()),
1786 );
1787 }
1788 Ok(Column::from_expr(expr, None))
1789}
1790
1791pub fn least(columns: &[&Column]) -> Result<Column, String> {
1793 if columns.is_empty() {
1794 return Err("least requires at least one column".to_string());
1795 }
1796 if columns.len() == 1 {
1797 return Ok((*columns[0]).clone());
1798 }
1799 let mut expr = columns[0].expr().clone();
1800 for c in columns.iter().skip(1) {
1801 let args = [c.expr().clone()];
1802 expr = expr.map_many(
1803 |cols| crate::column::expect_col(crate::udfs::apply_least2(cols)),
1804 &args,
1805 |_schema, fields| Ok(fields[0].clone()),
1806 );
1807 }
1808 Ok(Column::from_expr(expr, None))
1809}
1810
1811pub fn year(column: &Column) -> Column {
1813 column.clone().year()
1814}
1815
1816pub fn month(column: &Column) -> Column {
1818 column.clone().month()
1819}
1820
1821pub fn day(column: &Column) -> Column {
1823 column.clone().day()
1824}
1825
1826pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1828 let fmt = format.map(|s| s.to_string());
1829 let expr = column.expr().clone().map(
1830 move |col| {
1831 crate::column::expect_col(crate::udfs::apply_string_to_date_format(
1832 col,
1833 fmt.as_deref(),
1834 false,
1835 ))
1836 },
1837 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1838 );
1839 Ok(Column::from_expr(expr, None))
1840}
1841
1842pub fn date_format(column: &Column, format: &str) -> Column {
1844 column
1845 .clone()
1846 .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1847}
1848
1849pub fn current_date() -> Column {
1851 use polars::prelude::*;
1852 let today = chrono::Utc::now().date_naive();
1853 let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1854 crate::column::Column::from_expr(
1855 Expr::Literal(LiteralValue::Scalar(Scalar::new_date(days))),
1856 None,
1857 )
1858}
1859
1860pub fn current_timestamp() -> Column {
1862 use polars::prelude::*;
1863 let ts = chrono::Utc::now().timestamp_micros();
1864 crate::column::Column::from_expr(
1865 Expr::Literal(LiteralValue::Scalar(Scalar::new_datetime(
1866 ts,
1867 TimeUnit::Microseconds,
1868 None,
1869 ))),
1870 None,
1871 )
1872}
1873
1874pub fn curdate() -> Column {
1876 current_date()
1877}
1878
1879pub fn now() -> Column {
1881 current_timestamp()
1882}
1883
1884pub fn localtimestamp() -> Column {
1886 current_timestamp()
1887}
1888
1889pub fn date_diff(end: &Column, start: &Column) -> Column {
1891 datediff(end, start)
1892}
1893
1894pub fn dateadd(column: &Column, n: i32) -> Column {
1896 date_add(column, n)
1897}
1898
1899pub fn extract(column: &Column, field: &str) -> Column {
1901 column.clone().extract(field)
1902}
1903
1904pub fn date_part(column: &Column, field: &str) -> Column {
1906 extract(column, field)
1907}
1908
1909pub fn datepart(column: &Column, field: &str) -> Column {
1911 extract(column, field)
1912}
1913
1914pub fn unix_micros(column: &Column) -> Column {
1916 column.clone().unix_micros()
1917}
1918
1919pub fn unix_millis(column: &Column) -> Column {
1921 column.clone().unix_millis()
1922}
1923
1924pub fn unix_seconds(column: &Column) -> Column {
1926 column.clone().unix_seconds()
1927}
1928
1929pub fn dayname(column: &Column) -> Column {
1931 column.clone().dayname()
1932}
1933
1934pub fn weekday(column: &Column) -> Column {
1936 column.clone().weekday()
1937}
1938
1939pub fn hour(column: &Column) -> Column {
1941 column.clone().hour()
1942}
1943
1944pub fn minute(column: &Column) -> Column {
1946 column.clone().minute()
1947}
1948
1949pub fn second(column: &Column) -> Column {
1951 column.clone().second()
1952}
1953
1954pub fn date_add(column: &Column, n: i32) -> Column {
1956 column.clone().date_add(n)
1957}
1958
1959pub fn date_sub(column: &Column, n: i32) -> Column {
1961 column.clone().date_sub(n)
1962}
1963
1964pub fn datediff(end: &Column, start: &Column) -> Column {
1966 start.clone().datediff(end)
1967}
1968
1969pub fn last_day(column: &Column) -> Column {
1971 column.clone().last_day()
1972}
1973
1974pub fn trunc(column: &Column, format: &str) -> Column {
1976 column.clone().trunc(format)
1977}
1978
1979pub fn date_trunc(format: &str, column: &Column) -> Column {
1981 trunc(column, format)
1982}
1983
1984pub fn quarter(column: &Column) -> Column {
1986 column.clone().quarter()
1987}
1988
1989pub fn weekofyear(column: &Column) -> Column {
1991 column.clone().weekofyear()
1992}
1993
1994pub fn dayofweek(column: &Column) -> Column {
1996 column.clone().dayofweek()
1997}
1998
1999pub fn dayofyear(column: &Column) -> Column {
2001 column.clone().dayofyear()
2002}
2003
2004pub fn add_months(column: &Column, n: i32) -> Column {
2006 column.clone().add_months(n)
2007}
2008
2009pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
2012 end.clone().months_between(start, round_off)
2013}
2014
2015pub fn next_day(column: &Column, day_of_week: &str) -> Column {
2017 column.clone().next_day(day_of_week)
2018}
2019
2020pub fn unix_timestamp_now() -> Column {
2022 use polars::prelude::*;
2023 let secs = chrono::Utc::now().timestamp();
2024 crate::column::Column::from_expr(lit(secs), None)
2025}
2026
2027pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2029 column.clone().unix_timestamp(format)
2030}
2031
2032pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2034 unix_timestamp(column, format)
2035}
2036
2037pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
2039 column.clone().from_unixtime(format)
2040}
2041
2042pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
2044 use polars::prelude::*;
2045 let args = [month.expr().clone(), day.expr().clone()];
2046 let expr = year.expr().clone().map_many(
2047 |cols| crate::column::expect_col(crate::udfs::apply_make_date(cols)),
2048 &args,
2049 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Date)),
2050 );
2051 crate::column::Column::from_expr(expr, None)
2052}
2053
2054pub fn make_timestamp(
2057 year: &Column,
2058 month: &Column,
2059 day: &Column,
2060 hour: &Column,
2061 minute: &Column,
2062 sec: &Column,
2063 timezone: Option<&str>,
2064) -> Column {
2065 use polars::prelude::*;
2066 let tz_owned = timezone.map(|s| s.to_string());
2067 let args = [
2068 month.expr().clone(),
2069 day.expr().clone(),
2070 hour.expr().clone(),
2071 minute.expr().clone(),
2072 sec.expr().clone(),
2073 ];
2074 let expr = year.expr().clone().map_many(
2075 move |cols| {
2076 crate::column::expect_col(crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()))
2077 },
2078 &args,
2079 |_schema, fields| {
2080 Ok(Field::new(
2081 fields[0].name().clone(),
2082 DataType::Datetime(TimeUnit::Microseconds, None),
2083 ))
2084 },
2085 );
2086 crate::column::Column::from_expr(expr, None)
2087}
2088
2089pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
2091 ts.clone().timestampadd(unit, amount)
2092}
2093
2094pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
2096 start.clone().timestampdiff(unit, end)
2097}
2098
2099pub fn days(n: i64) -> Column {
2101 make_interval(0, 0, 0, n, 0, 0, 0)
2102}
2103
2104pub fn hours(n: i64) -> Column {
2106 make_interval(0, 0, 0, 0, n, 0, 0)
2107}
2108
2109pub fn minutes(n: i64) -> Column {
2111 make_interval(0, 0, 0, 0, 0, n, 0)
2112}
2113
2114pub fn months(n: i64) -> Column {
2116 make_interval(0, n, 0, 0, 0, 0, 0)
2117}
2118
2119pub fn years(n: i64) -> Column {
2121 make_interval(n, 0, 0, 0, 0, 0, 0)
2122}
2123
2124pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
2126 column.clone().from_utc_timestamp(tz)
2127}
2128
2129pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
2131 column.clone().to_utc_timestamp(tz)
2132}
2133
2134pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
2136 let source_tz = source_tz.to_string();
2137 let target_tz = target_tz.to_string();
2138 let expr = column.expr().clone().map(
2139 move |s| {
2140 crate::column::expect_col(crate::udfs::apply_convert_timezone(
2141 s, &source_tz, &target_tz,
2142 ))
2143 },
2144 |_schema, field| Ok(field.clone()),
2145 );
2146 crate::column::Column::from_expr(expr, None)
2147}
2148
2149pub fn current_timezone() -> Column {
2151 use polars::prelude::*;
2152 crate::column::Column::from_expr(lit("UTC"), None)
2153}
2154
2155pub fn make_interval(
2157 years: i64,
2158 months: i64,
2159 weeks: i64,
2160 days: i64,
2161 hours: i64,
2162 mins: i64,
2163 secs: i64,
2164) -> Column {
2165 use polars::prelude::*;
2166 let total_days = years * 365 + months * 30 + weeks * 7 + days;
2168 let args = DurationArgs::new()
2169 .with_days(lit(total_days))
2170 .with_hours(lit(hours))
2171 .with_minutes(lit(mins))
2172 .with_seconds(lit(secs));
2173 let dur = duration(args);
2174 crate::column::Column::from_expr(dur, None)
2175}
2176
2177pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2179 use polars::prelude::*;
2180 let args = DurationArgs::new()
2181 .with_days(lit(days))
2182 .with_hours(lit(hours))
2183 .with_minutes(lit(minutes))
2184 .with_seconds(lit(seconds));
2185 let dur = duration(args);
2186 crate::column::Column::from_expr(dur, None)
2187}
2188
2189pub fn make_ym_interval(years: i32, months: i32) -> Column {
2191 use polars::prelude::*;
2192 let total_months = years * 12 + months;
2193 crate::column::Column::from_expr(lit(total_months), None)
2194}
2195
2196pub fn make_timestamp_ntz(
2198 year: &Column,
2199 month: &Column,
2200 day: &Column,
2201 hour: &Column,
2202 minute: &Column,
2203 sec: &Column,
2204) -> Column {
2205 make_timestamp(year, month, day, hour, minute, sec, None)
2206}
2207
2208pub fn timestamp_seconds(column: &Column) -> Column {
2210 column.clone().timestamp_seconds()
2211}
2212
2213pub fn timestamp_millis(column: &Column) -> Column {
2215 column.clone().timestamp_millis()
2216}
2217
2218pub fn timestamp_micros(column: &Column) -> Column {
2220 column.clone().timestamp_micros()
2221}
2222
2223pub fn unix_date(column: &Column) -> Column {
2225 column.clone().unix_date()
2226}
2227
2228pub fn date_from_unix_date(column: &Column) -> Column {
2230 column.clone().date_from_unix_date()
2231}
2232
2233pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2235 dividend.clone().pmod(divisor)
2236}
2237
2238pub fn factorial(column: &Column) -> Column {
2240 column.clone().factorial()
2241}
2242
2243pub fn concat(columns: &[&Column]) -> Column {
2245 use polars::prelude::*;
2246 if columns.is_empty() {
2247 panic!("concat requires at least one column");
2248 }
2249 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2250 crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2251}
2252
2253pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2255 use polars::prelude::*;
2256 if columns.is_empty() {
2257 panic!("concat_ws requires at least one column");
2258 }
2259 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2260 crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2261}
2262
2263pub fn row_number(column: &Column) -> Column {
2273 column.clone().row_number(false)
2274}
2275
2276pub fn rank(column: &Column, descending: bool) -> Column {
2278 column.clone().rank(descending)
2279}
2280
2281pub fn dense_rank(column: &Column, descending: bool) -> Column {
2283 column.clone().dense_rank(descending)
2284}
2285
2286pub fn lag(column: &Column, n: i64) -> Column {
2288 column.clone().lag(n)
2289}
2290
2291pub fn lead(column: &Column, n: i64) -> Column {
2293 column.clone().lead(n)
2294}
2295
2296pub fn first_value(column: &Column) -> Column {
2298 column.clone().first_value()
2299}
2300
2301pub fn last_value(column: &Column) -> Column {
2303 column.clone().last_value()
2304}
2305
2306pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2308 column.clone().percent_rank(partition_by, descending)
2309}
2310
2311pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2313 column.clone().cume_dist(partition_by, descending)
2314}
2315
2316pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2318 column.clone().ntile(n, partition_by, descending)
2319}
2320
2321pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2323 column.clone().nth_value(n, partition_by, descending)
2324}
2325
2326pub fn coalesce(columns: &[&Column]) -> Column {
2336 use polars::prelude::*;
2337 if columns.is_empty() {
2338 panic!("coalesce requires at least one column");
2339 }
2340 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2341 let expr = coalesce(&exprs);
2342 crate::column::Column::from_expr(expr, None)
2343}
2344
2345pub fn nvl(column: &Column, value: &Column) -> Column {
2347 coalesce(&[column, value])
2348}
2349
2350pub fn ifnull(column: &Column, value: &Column) -> Column {
2352 nvl(column, value)
2353}
2354
2355pub fn nullif(column: &Column, value: &Column) -> Column {
2357 use polars::prelude::*;
2358 let cond = column.expr().clone().eq(value.expr().clone());
2359 let null_lit = lit(NULL);
2360 let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2361 crate::column::Column::from_expr(expr, None)
2362}
2363
2364pub fn nanvl(column: &Column, value: &Column) -> Column {
2366 use polars::prelude::*;
2367 let cond = column.expr().clone().is_nan();
2368 let expr = when(cond)
2369 .then(value.expr().clone())
2370 .otherwise(column.expr().clone());
2371 crate::column::Column::from_expr(expr, None)
2372}
2373
2374pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2376 use polars::prelude::*;
2377 let cond = col1.expr().clone().is_not_null();
2378 let expr = when(cond)
2379 .then(col2.expr().clone())
2380 .otherwise(col3.expr().clone());
2381 crate::column::Column::from_expr(expr, None)
2382}
2383
2384pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2386 substring(column, start, length)
2387}
2388
2389pub fn power(column: &Column, exp: i64) -> Column {
2391 pow(column, exp)
2392}
2393
2394pub fn ln(column: &Column) -> Column {
2396 log(column)
2397}
2398
2399pub fn ceiling(column: &Column) -> Column {
2401 ceil(column)
2402}
2403
2404pub fn lcase(column: &Column) -> Column {
2406 lower(column)
2407}
2408
2409pub fn ucase(column: &Column) -> Column {
2411 upper(column)
2412}
2413
2414pub fn dayofmonth(column: &Column) -> Column {
2416 day(column)
2417}
2418
2419pub fn to_degrees(column: &Column) -> Column {
2421 degrees(column)
2422}
2423
2424pub fn to_radians(column: &Column) -> Column {
2426 radians(column)
2427}
2428
2429pub fn cosh(column: &Column) -> Column {
2431 column.clone().cosh()
2432}
2433pub fn sinh(column: &Column) -> Column {
2435 column.clone().sinh()
2436}
2437pub fn tanh(column: &Column) -> Column {
2439 column.clone().tanh()
2440}
2441pub fn acosh(column: &Column) -> Column {
2443 column.clone().acosh()
2444}
2445pub fn asinh(column: &Column) -> Column {
2447 column.clone().asinh()
2448}
2449pub fn atanh(column: &Column) -> Column {
2451 column.clone().atanh()
2452}
2453pub fn cbrt(column: &Column) -> Column {
2455 column.clone().cbrt()
2456}
2457pub fn expm1(column: &Column) -> Column {
2459 column.clone().expm1()
2460}
2461pub fn log1p(column: &Column) -> Column {
2463 column.clone().log1p()
2464}
2465pub fn log10(column: &Column) -> Column {
2467 column.clone().log10()
2468}
2469pub fn log2(column: &Column) -> Column {
2471 column.clone().log2()
2472}
2473pub fn rint(column: &Column) -> Column {
2475 column.clone().rint()
2476}
2477pub fn hypot(x: &Column, y: &Column) -> Column {
2479 let xx = x.expr().clone() * x.expr().clone();
2480 let yy = y.expr().clone() * y.expr().clone();
2481 crate::column::Column::from_expr((xx + yy).sqrt(), None)
2482}
2483
2484pub fn isnull(column: &Column) -> Column {
2486 column.clone().is_null()
2487}
2488
2489pub fn isnotnull(column: &Column) -> Column {
2491 column.clone().is_not_null()
2492}
2493
2494pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2497 use polars::prelude::*;
2498 if columns.is_empty() {
2499 let empty_inner = Series::new("".into(), Vec::<i64>::new());
2502 let list_series = ListChunked::from_iter([Some(empty_inner)])
2503 .with_name("array".into())
2504 .into_series();
2505 let expr = lit(list_series).first();
2506 return Ok(crate::column::Column::from_expr(expr, None));
2507 }
2508 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2509 let expr = concat_list(exprs)
2510 .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2511 Ok(crate::column::Column::from_expr(expr, None))
2512}
2513
2514pub fn array_size(column: &Column) -> Column {
2516 column.clone().array_size()
2517}
2518
2519pub fn size(column: &Column) -> Column {
2521 column.clone().array_size()
2522}
2523
2524pub fn cardinality(column: &Column) -> Column {
2526 column.clone().cardinality()
2527}
2528
2529pub fn array_contains(column: &Column, value: &Column) -> Column {
2531 column.clone().array_contains(value.expr().clone())
2532}
2533
2534pub fn array_join(column: &Column, separator: &str) -> Column {
2536 column.clone().array_join(separator)
2537}
2538
2539pub fn array_max(column: &Column) -> Column {
2541 column.clone().array_max()
2542}
2543
2544pub fn array_min(column: &Column) -> Column {
2546 column.clone().array_min()
2547}
2548
2549pub fn element_at(column: &Column, index: i64) -> Column {
2551 column.clone().element_at(index)
2552}
2553
2554pub fn array_sort(column: &Column) -> Column {
2556 column.clone().array_sort()
2557}
2558
2559pub fn array_distinct(column: &Column) -> Column {
2561 column.clone().array_distinct()
2562}
2563
2564pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2566 column.clone().array_slice(start, length)
2567}
2568
2569pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2572 use polars::prelude::{DataType, Field, as_struct, lit};
2573 let step_expr = step
2574 .map(|c| c.expr().clone().alias("2"))
2575 .unwrap_or_else(|| lit(1i64).alias("2"));
2576 let struct_expr = as_struct(vec![
2577 start.expr().clone().alias("0"),
2578 stop.expr().clone().alias("1"),
2579 step_expr,
2580 ]);
2581 let out_dtype = DataType::List(Box::new(DataType::Int64));
2582 let expr = struct_expr.map(
2583 |s| crate::column::expect_col(crate::udfs::apply_sequence(s)),
2584 move |_schema, field| Ok(Field::new(field.name().clone(), out_dtype.clone())),
2585 );
2586 crate::column::Column::from_expr(expr, None)
2587}
2588
2589pub fn shuffle(column: &Column) -> Column {
2591 let expr = column.expr().clone().map(
2592 |s| crate::column::expect_col(crate::udfs::apply_shuffle(s)),
2593 |_schema, field| Ok(field.clone()),
2594 );
2595 crate::column::Column::from_expr(expr, None)
2596}
2597
2598pub fn inline(column: &Column) -> Column {
2601 column.clone().explode()
2602}
2603
2604pub fn inline_outer(column: &Column) -> Column {
2606 column.clone().explode_outer()
2607}
2608
2609pub fn explode(column: &Column) -> Column {
2611 column.clone().explode()
2612}
2613
2614pub fn array_position(column: &Column, value: &Column) -> Column {
2617 column.clone().array_position(value.expr().clone())
2618}
2619
2620pub fn array_compact(column: &Column) -> Column {
2622 column.clone().array_compact()
2623}
2624
2625pub fn array_remove(column: &Column, value: &Column) -> Column {
2628 column.clone().array_remove(value.expr().clone())
2629}
2630
2631pub fn array_repeat(column: &Column, n: i64) -> Column {
2633 column.clone().array_repeat(n)
2634}
2635
2636pub fn array_flatten(column: &Column) -> Column {
2638 column.clone().array_flatten()
2639}
2640
2641pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2643 column.clone().array_exists(predicate)
2644}
2645
2646pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2648 column.clone().array_forall(predicate)
2649}
2650
2651pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2653 column.clone().array_filter(predicate)
2654}
2655
2656pub fn array_transform(column: &Column, f: Expr) -> Column {
2658 column.clone().array_transform(f)
2659}
2660
2661pub fn array_sum(column: &Column) -> Column {
2663 column.clone().array_sum()
2664}
2665
2666pub fn aggregate(column: &Column, zero: &Column) -> Column {
2668 column.clone().array_aggregate(zero)
2669}
2670
2671pub fn array_mean(column: &Column) -> Column {
2673 column.clone().array_mean()
2674}
2675
2676pub fn posexplode(column: &Column) -> (Column, Column) {
2679 column.clone().posexplode()
2680}
2681
2682pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2686 use polars::chunked_array::StructChunked;
2687 use polars::prelude::{IntoSeries, ListChunked, as_struct, concat_list, lit};
2688 if key_values.is_empty() {
2689 let key_s = Series::new("key".into(), Vec::<String>::new());
2691 let value_s = Series::new("value".into(), Vec::<String>::new());
2692 let fields: [&Series; 2] = [&key_s, &value_s];
2693 let empty_struct = StructChunked::from_series(
2694 polars::prelude::PlSmallStr::EMPTY,
2695 0,
2696 fields.iter().copied(),
2697 )
2698 .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2699 .into_series();
2700 let list_series = ListChunked::from_iter([Some(empty_struct)])
2701 .with_name("create_map".into())
2702 .into_series();
2703 let expr = lit(list_series).first();
2704 return Ok(crate::column::Column::from_expr(expr, None));
2705 }
2706 let mut struct_exprs: Vec<Expr> = Vec::new();
2707 for i in (0..key_values.len()).step_by(2) {
2708 if i + 1 < key_values.len() {
2709 let k = key_values[i].expr().clone().alias("key");
2710 let v = key_values[i + 1].expr().clone().alias("value");
2711 struct_exprs.push(as_struct(vec![k, v]));
2712 }
2713 }
2714 let expr = concat_list(struct_exprs)
2715 .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2716 Ok(crate::column::Column::from_expr(expr, None))
2717}
2718
2719pub fn map_keys(column: &Column) -> Column {
2721 column.clone().map_keys()
2722}
2723
2724pub fn map_values(column: &Column) -> Column {
2726 column.clone().map_values()
2727}
2728
2729pub fn map_entries(column: &Column) -> Column {
2731 column.clone().map_entries()
2732}
2733
2734pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2736 keys.clone().map_from_arrays(values)
2737}
2738
2739pub fn map_concat(a: &Column, b: &Column) -> Column {
2741 a.clone().map_concat(b)
2742}
2743
2744pub fn map_from_entries(column: &Column) -> Column {
2746 column.clone().map_from_entries()
2747}
2748
2749pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2751 map_col.clone().map_contains_key(key)
2752}
2753
2754pub fn get(map_col: &Column, key: &Column) -> Column {
2756 map_col.clone().get(key)
2757}
2758
2759pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2761 map_col.clone().map_filter(predicate)
2762}
2763
2764pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2766 map1.clone().map_zip_with(map2, merge)
2767}
2768
2769pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2771 use polars::prelude::col;
2772 let left_field = col("").struct_().field_by_name("left");
2773 let right_field = col("").struct_().field_by_name("right");
2774 let merge = crate::column::Column::from_expr(
2775 coalesce(&[
2776 &crate::column::Column::from_expr(left_field, None),
2777 &crate::column::Column::from_expr(right_field, None),
2778 ])
2779 .into_expr(),
2780 None,
2781 );
2782 left.clone().zip_with(right, merge.into_expr())
2783}
2784
2785pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2787 use polars::prelude::col;
2788 let v1 = col("").struct_().field_by_name("value1");
2789 let v2 = col("").struct_().field_by_name("value2");
2790 let merge = coalesce(&[
2791 &crate::column::Column::from_expr(v1, None),
2792 &crate::column::Column::from_expr(v2, None),
2793 ])
2794 .into_expr();
2795 map1.clone().map_zip_with(map2, merge)
2796}
2797
2798pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2800 use polars::prelude::{col, lit};
2801 let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2802 map_col.clone().map_filter(pred)
2803}
2804
2805pub fn struct_(columns: &[&Column]) -> Column {
2807 use polars::prelude::as_struct;
2808 if columns.is_empty() {
2809 panic!("struct requires at least one column");
2810 }
2811 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2812 crate::column::Column::from_expr(as_struct(exprs), None)
2813}
2814
2815pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2817 use polars::prelude::as_struct;
2818 if pairs.is_empty() {
2819 panic!("named_struct requires at least one (name, column) pair");
2820 }
2821 let exprs: Vec<Expr> = pairs
2822 .iter()
2823 .map(|(name, col)| col.expr().clone().alias(*name))
2824 .collect();
2825 crate::column::Column::from_expr(as_struct(exprs), None)
2826}
2827
2828pub fn array_append(array: &Column, elem: &Column) -> Column {
2830 array.clone().array_append(elem)
2831}
2832
2833pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2835 array.clone().array_prepend(elem)
2836}
2837
2838pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2840 array.clone().array_insert(pos, elem)
2841}
2842
2843pub fn array_except(a: &Column, b: &Column) -> Column {
2845 a.clone().array_except(b)
2846}
2847
2848pub fn array_intersect(a: &Column, b: &Column) -> Column {
2850 a.clone().array_intersect(b)
2851}
2852
2853pub fn array_union(a: &Column, b: &Column) -> Column {
2855 a.clone().array_union(b)
2856}
2857
2858pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2860 left.clone().zip_with(right, merge)
2861}
2862
2863pub fn get_json_object(column: &Column, path: &str) -> Column {
2865 column.clone().get_json_object(path)
2866}
2867
2868pub fn json_object_keys(column: &Column) -> Column {
2870 column.clone().json_object_keys()
2871}
2872
2873pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2875 column.clone().json_tuple(keys)
2876}
2877
2878pub fn from_csv(column: &Column) -> Column {
2880 column.clone().from_csv()
2881}
2882
2883pub fn to_csv(column: &Column) -> Column {
2885 column.clone().to_csv()
2886}
2887
2888pub fn schema_of_csv(_column: &Column) -> Column {
2890 Column::from_expr(
2891 lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2892 Some("schema_of_csv".to_string()),
2893 )
2894}
2895
2896pub fn schema_of_json(_column: &Column) -> Column {
2898 Column::from_expr(
2899 lit("STRUCT<>".to_string()),
2900 Some("schema_of_json".to_string()),
2901 )
2902}
2903
2904pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2906 column.clone().from_json(schema)
2907}
2908
2909pub fn to_json(column: &Column) -> Column {
2911 column.clone().to_json()
2912}
2913
2914pub fn isin(column: &Column, other: &Column) -> Column {
2916 column.clone().isin(other)
2917}
2918
2919pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2921 let s = Series::from_iter(values.iter().cloned());
2922 Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2923}
2924
2925pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2927 let s: Series = Series::from_iter(values.iter().copied());
2928 Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2929}
2930
2931pub fn url_decode(column: &Column) -> Column {
2933 column.clone().url_decode()
2934}
2935
2936pub fn url_encode(column: &Column) -> Column {
2938 column.clone().url_encode()
2939}
2940
2941pub fn shift_left(column: &Column, n: i32) -> Column {
2943 column.clone().shift_left(n)
2944}
2945
2946pub fn shift_right(column: &Column, n: i32) -> Column {
2948 column.clone().shift_right(n)
2949}
2950
2951pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2953 column.clone().shift_right_unsigned(n)
2954}
2955
2956pub fn version() -> Column {
2958 Column::from_expr(
2959 lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2960 None,
2961 )
2962}
2963
2964pub fn equal_null(left: &Column, right: &Column) -> Column {
2966 left.clone().eq_null_safe(right)
2967}
2968
2969pub fn json_array_length(column: &Column, path: &str) -> Column {
2971 column.clone().json_array_length(path)
2972}
2973
2974pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2977 column.clone().parse_url(part, key)
2978}
2979
2980pub fn hash(columns: &[&Column]) -> Column {
2982 use polars::prelude::*;
2983 if columns.is_empty() {
2984 return crate::column::Column::from_expr(lit(0i64), None);
2985 }
2986 if columns.len() == 1 {
2987 return columns[0].clone().hash();
2988 }
2989 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2990 let struct_expr = polars::prelude::as_struct(exprs);
2991 let name = columns[0].name().to_string();
2992 let expr = struct_expr.map(
2993 |s| crate::column::expect_col(crate::udfs::apply_hash_struct(s)),
2994 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2995 );
2996 crate::column::Column::from_expr(expr, Some(name))
2997}
2998
2999pub fn stack(columns: &[&Column]) -> Column {
3001 struct_(columns)
3002}
3003
3004#[cfg(test)]
3005mod tests {
3006 use super::*;
3007 use polars::prelude::{IntoLazy, df};
3008
3009 #[test]
3010 fn test_col_creates_column() {
3011 let column = col("test");
3012 assert_eq!(column.name(), "test");
3013 }
3014
3015 #[test]
3016 fn test_lit_i32() {
3017 let column = lit_i32(42);
3018 assert_eq!(column.name(), "<expr>");
3020 }
3021
3022 #[test]
3023 fn test_lit_i64() {
3024 let column = lit_i64(123456789012345i64);
3025 assert_eq!(column.name(), "<expr>");
3026 }
3027
3028 #[test]
3029 fn test_lit_f64() {
3030 let column = lit_f64(std::f64::consts::PI);
3031 assert_eq!(column.name(), "<expr>");
3032 }
3033
3034 #[test]
3035 fn test_lit_bool() {
3036 let column = lit_bool(true);
3037 assert_eq!(column.name(), "<expr>");
3038 }
3039
3040 #[test]
3041 fn test_lit_str() {
3042 let column = lit_str("hello");
3043 assert_eq!(column.name(), "<expr>");
3044 }
3045
3046 #[test]
3047 fn test_create_map_empty() {
3048 let empty_col = create_map(&[]).unwrap();
3050 let df = df!("id" => &[1i64, 2i64]).unwrap();
3051 let out = df
3052 .lazy()
3053 .with_columns([empty_col.into_expr().alias("m")])
3054 .collect()
3055 .unwrap();
3056 assert_eq!(out.height(), 2);
3057 let m = out.column("m").unwrap();
3058 assert_eq!(m.len(), 2);
3059 let list = m.list().unwrap();
3060 for i in 0..2 {
3061 let row = list.get(i).unwrap();
3062 assert_eq!(row.len(), 0);
3063 }
3064 }
3065
3066 #[test]
3067 fn test_count_aggregation() {
3068 let column = col("value");
3069 let result = count(&column);
3070 assert_eq!(result.name(), "count");
3071 }
3072
3073 #[test]
3074 fn test_sum_aggregation() {
3075 let column = col("value");
3076 let result = sum(&column);
3077 assert_eq!(result.name(), "sum");
3078 }
3079
3080 #[test]
3081 fn test_avg_aggregation() {
3082 let column = col("value");
3083 let result = avg(&column);
3084 assert_eq!(result.name(), "avg");
3085 }
3086
3087 #[test]
3088 fn test_max_aggregation() {
3089 let column = col("value");
3090 let result = max(&column);
3091 assert_eq!(result.name(), "max");
3092 }
3093
3094 #[test]
3095 fn test_min_aggregation() {
3096 let column = col("value");
3097 let result = min(&column);
3098 assert_eq!(result.name(), "min");
3099 }
3100
3101 #[test]
3102 fn test_when_then_otherwise() {
3103 let df = df!(
3105 "age" => &[15, 25, 35]
3106 )
3107 .unwrap();
3108
3109 let age_col = col("age");
3111 let condition = age_col.gt(polars::prelude::lit(18));
3112 let result = when(&condition)
3113 .then(&lit_str("adult"))
3114 .otherwise(&lit_str("minor"));
3115
3116 let result_df = df
3118 .lazy()
3119 .with_column(result.into_expr().alias("status"))
3120 .collect()
3121 .unwrap();
3122
3123 let status_col = result_df.column("status").unwrap();
3125 let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
3126
3127 assert_eq!(values[0], Some("minor")); assert_eq!(values[1], Some("adult")); assert_eq!(values[2], Some("adult")); }
3131
3132 #[test]
3133 fn test_coalesce_returns_first_non_null() {
3134 let df = df!(
3136 "a" => &[Some(1), None, None],
3137 "b" => &[None, Some(2), None],
3138 "c" => &[None, None, Some(3)]
3139 )
3140 .unwrap();
3141
3142 let col_a = col("a");
3143 let col_b = col("b");
3144 let col_c = col("c");
3145 let result = coalesce(&[&col_a, &col_b, &col_c]);
3146
3147 let result_df = df
3149 .lazy()
3150 .with_column(result.into_expr().alias("coalesced"))
3151 .collect()
3152 .unwrap();
3153
3154 let coalesced_col = result_df.column("coalesced").unwrap();
3156 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3157
3158 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(2)); assert_eq!(values[2], Some(3)); }
3162
3163 #[test]
3164 fn test_coalesce_with_literal_fallback() {
3165 let df = df!(
3167 "a" => &[Some(1), None],
3168 "b" => &[None::<i32>, None::<i32>]
3169 )
3170 .unwrap();
3171
3172 let col_a = col("a");
3173 let col_b = col("b");
3174 let fallback = lit_i32(0);
3175 let result = coalesce(&[&col_a, &col_b, &fallback]);
3176
3177 let result_df = df
3179 .lazy()
3180 .with_column(result.into_expr().alias("coalesced"))
3181 .collect()
3182 .unwrap();
3183
3184 let coalesced_col = result_df.column("coalesced").unwrap();
3186 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3187
3188 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(0)); }
3191
3192 #[test]
3193 #[should_panic(expected = "coalesce requires at least one column")]
3194 fn test_coalesce_empty_panics() {
3195 let columns: [&Column; 0] = [];
3196 let _ = coalesce(&columns);
3197 }
3198
3199 #[test]
3200 fn test_cast_double_string_column_strict_ok() {
3201 let df = df!(
3203 "s" => &["123", " 45.5 ", "0"]
3204 )
3205 .unwrap();
3206
3207 let s_col = col("s");
3208 let cast_col = cast(&s_col, "double").unwrap();
3209
3210 let out = df
3211 .lazy()
3212 .with_column(cast_col.into_expr().alias("v"))
3213 .collect()
3214 .unwrap();
3215
3216 let v = out.column("v").unwrap();
3217 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3218 assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3219 }
3220
3221 #[test]
3222 fn test_try_cast_double_string_column_invalid_to_null() {
3223 let df = df!(
3225 "s" => &["123", " 45.5 ", "abc", ""]
3226 )
3227 .unwrap();
3228
3229 let s_col = col("s");
3230 let try_cast_col = try_cast(&s_col, "double").unwrap();
3231
3232 let out = df
3233 .lazy()
3234 .with_column(try_cast_col.into_expr().alias("v"))
3235 .collect()
3236 .unwrap();
3237
3238 let v = out.column("v").unwrap();
3239 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3240 assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3241 }
3242
3243 #[test]
3244 fn test_to_number_and_try_to_number_numerics_and_strings() {
3245 let df = df!(
3247 "i" => &[1i32, 2, 3],
3248 "f" => &[1.5f64, 2.5, 3.5],
3249 "s" => &["10", "20.5", "xyz"]
3250 )
3251 .unwrap();
3252
3253 let i_col = col("i");
3254 let f_col = col("f");
3255 let s_col = col("s");
3256
3257 let to_number_i = to_number(&i_col, None).unwrap();
3258 let to_number_f = to_number(&f_col, None).unwrap();
3259 let try_to_number_s = try_to_number(&s_col, None).unwrap();
3260
3261 let out = df
3262 .lazy()
3263 .with_columns([
3264 to_number_i.into_expr().alias("i_num"),
3265 to_number_f.into_expr().alias("f_num"),
3266 try_to_number_s.into_expr().alias("s_num"),
3267 ])
3268 .collect()
3269 .unwrap();
3270
3271 let i_num = out.column("i_num").unwrap();
3272 let f_num = out.column("f_num").unwrap();
3273 let s_num = out.column("s_num").unwrap();
3274
3275 let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3276 let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3277 let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3278
3279 assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3280 assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3281 assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3282 }
3283}