1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5#[derive(Debug, Clone)]
10pub struct SortOrder {
11 pub(crate) expr: Expr,
12 pub(crate) descending: bool,
13 pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17 pub fn expr(&self) -> &Expr {
18 &self.expr
19 }
20}
21
22pub fn asc(column: &Column) -> SortOrder {
24 SortOrder {
25 expr: column.expr().clone(),
26 descending: false,
27 nulls_last: false,
28 }
29}
30
31pub fn asc_nulls_first(column: &Column) -> SortOrder {
33 SortOrder {
34 expr: column.expr().clone(),
35 descending: false,
36 nulls_last: false,
37 }
38}
39
40pub fn asc_nulls_last(column: &Column) -> SortOrder {
42 SortOrder {
43 expr: column.expr().clone(),
44 descending: false,
45 nulls_last: true,
46 }
47}
48
49pub fn desc(column: &Column) -> SortOrder {
51 SortOrder {
52 expr: column.expr().clone(),
53 descending: true,
54 nulls_last: true,
55 }
56}
57
58pub fn desc_nulls_first(column: &Column) -> SortOrder {
60 SortOrder {
61 expr: column.expr().clone(),
62 descending: true,
63 nulls_last: false,
64 }
65}
66
67pub fn desc_nulls_last(column: &Column) -> SortOrder {
69 SortOrder {
70 expr: column.expr().clone(),
71 descending: true,
72 nulls_last: true,
73 }
74}
75
76pub fn parse_type_name(name: &str) -> Result<DataType, String> {
81 let s = name.trim().to_lowercase();
82 if s.starts_with("decimal(") && s.contains(')') {
83 return Ok(DataType::Float64);
84 }
85 Ok(match s.as_str() {
86 "int" | "integer" => DataType::Int32,
87 "long" | "bigint" => DataType::Int64,
88 "float" => DataType::Float32,
89 "double" => DataType::Float64,
90 "string" | "str" => DataType::String,
91 "boolean" | "bool" => DataType::Boolean,
92 "date" => DataType::Date,
93 "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
94 _ => return Err(format!("unknown type name: {name}")),
95 })
96}
97
98pub fn col(name: &str) -> Column {
100 Column::new(name.to_string())
101}
102
103pub fn grouping(column: &Column) -> Column {
105 let _ = column;
106 Column::from_expr(lit(0i32), Some("grouping".to_string()))
107}
108
109pub fn grouping_id(_columns: &[Column]) -> Column {
111 Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
112}
113
114pub fn lit_i32(value: i32) -> Column {
116 let expr: Expr = lit(value);
117 Column::from_expr(expr, None)
118}
119
120pub fn lit_i64(value: i64) -> Column {
121 let expr: Expr = lit(value);
122 Column::from_expr(expr, None)
123}
124
125pub fn lit_f64(value: f64) -> Column {
126 let expr: Expr = lit(value);
127 Column::from_expr(expr, None)
128}
129
130pub fn lit_bool(value: bool) -> Column {
131 let expr: Expr = lit(value);
132 Column::from_expr(expr, None)
133}
134
135pub fn lit_str(value: &str) -> Column {
136 let expr: Expr = lit(value);
137 Column::from_expr(expr, None)
138}
139
140pub fn count(col: &Column) -> Column {
142 Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
143}
144
145pub fn sum(col: &Column) -> Column {
147 Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
148}
149
150pub fn avg(col: &Column) -> Column {
152 Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
153}
154
155pub fn mean(col: &Column) -> Column {
157 avg(col)
158}
159
160pub fn max(col: &Column) -> Column {
162 Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
163}
164
165pub fn min(col: &Column) -> Column {
167 Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
168}
169
170pub fn first(col: &Column, ignorenulls: bool) -> Column {
172 let _ = ignorenulls;
173 Column::from_expr(col.expr().clone().first(), None)
174}
175
176pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
178 let _ = ignorenulls;
179 Column::from_expr(col.expr().clone().first(), None)
180}
181
182pub fn count_if(col: &Column) -> Column {
184 use polars::prelude::DataType;
185 Column::from_expr(
186 col.expr().clone().cast(DataType::Int64).sum(),
187 Some("count_if".to_string()),
188 )
189}
190
191pub fn try_sum(col: &Column) -> Column {
193 Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
194}
195
196pub fn try_avg(col: &Column) -> Column {
198 Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
199}
200
201pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
203 use polars::prelude::{as_struct, SortOptions};
204 let st = as_struct(vec![
205 ord_col.expr().clone().alias("_ord"),
206 value_col.expr().clone().alias("_val"),
207 ]);
208 let e = st
209 .sort(SortOptions::default().with_order_descending(true))
210 .first()
211 .struct_()
212 .field_by_name("_val");
213 Column::from_expr(e, None)
214}
215
216pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
218 use polars::prelude::{as_struct, SortOptions};
219 let st = as_struct(vec![
220 ord_col.expr().clone().alias("_ord"),
221 value_col.expr().clone().alias("_val"),
222 ]);
223 let e = st
224 .sort(SortOptions::default())
225 .first()
226 .struct_()
227 .field_by_name("_val");
228 Column::from_expr(e, None)
229}
230
231pub fn collect_list(col: &Column) -> Column {
233 Column::from_expr(
234 col.expr().clone().implode(),
235 Some("collect_list".to_string()),
236 )
237}
238
239pub fn collect_set(col: &Column) -> Column {
241 Column::from_expr(
242 col.expr().clone().unique().implode(),
243 Some("collect_set".to_string()),
244 )
245}
246
247pub fn bool_and(col: &Column) -> Column {
249 Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
250}
251
252pub fn every(col: &Column) -> Column {
254 Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
255}
256
257pub fn stddev(col: &Column) -> Column {
259 Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
260}
261
262pub fn variance(col: &Column) -> Column {
264 Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
265}
266
267pub fn stddev_pop(col: &Column) -> Column {
269 Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
270}
271
272pub fn stddev_samp(col: &Column) -> Column {
274 stddev(col)
275}
276
277pub fn std(col: &Column) -> Column {
279 stddev(col)
280}
281
282pub fn var_pop(col: &Column) -> Column {
284 Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
285}
286
287pub fn var_samp(col: &Column) -> Column {
289 variance(col)
290}
291
292pub fn median(col: &Column) -> Column {
294 use polars::prelude::QuantileMethod;
295 Column::from_expr(
296 col.expr()
297 .clone()
298 .quantile(lit(0.5), QuantileMethod::Linear),
299 Some("median".to_string()),
300 )
301}
302
303pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
305 use polars::prelude::QuantileMethod;
306 Column::from_expr(
307 col.expr()
308 .clone()
309 .quantile(lit(percentage), QuantileMethod::Linear),
310 Some(format!("approx_percentile({percentage})")),
311 )
312}
313
314pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
316 approx_percentile(col, percentage, accuracy)
317}
318
319pub fn mode(col: &Column) -> Column {
321 col.clone().mode()
322}
323
324pub fn count_distinct(col: &Column) -> Column {
326 use polars::prelude::DataType;
327 Column::from_expr(
328 col.expr().clone().n_unique().cast(DataType::Int64),
329 Some("count_distinct".to_string()),
330 )
331}
332
333pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
335 use polars::prelude::DataType;
336 Column::from_expr(
337 col.expr().clone().n_unique().cast(DataType::Int64),
338 Some("approx_count_distinct".to_string()),
339 )
340}
341
342pub fn kurtosis(col: &Column) -> Column {
344 Column::from_expr(
345 col.expr()
346 .clone()
347 .cast(DataType::Float64)
348 .kurtosis(true, true),
349 Some("kurtosis".to_string()),
350 )
351}
352
353pub fn skewness(col: &Column) -> Column {
355 Column::from_expr(
356 col.expr().clone().cast(DataType::Float64).skew(true),
357 Some("skewness".to_string()),
358 )
359}
360
361pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
363 use polars::prelude::{col as pl_col, len};
364 let c1 = pl_col(col1).cast(DataType::Float64);
365 let c2 = pl_col(col2).cast(DataType::Float64);
366 let n = len().cast(DataType::Float64);
367 let sum_ab = (c1.clone() * c2.clone()).sum();
368 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
369 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
370 (sum_ab - sum_a * sum_b / n.clone()) / n
371}
372
373pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
375 use polars::prelude::len;
376 let c1 = col1.expr().clone().cast(DataType::Float64);
377 let c2 = col2.expr().clone().cast(DataType::Float64);
378 let n = len().cast(DataType::Float64);
379 let sum_ab = (c1.clone() * c2.clone()).sum();
380 let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
381 let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
382 let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
383 Column::from_expr(e, Some("covar_pop".to_string()))
384}
385
386pub fn corr(col1: &Column, col2: &Column) -> Column {
388 use polars::prelude::{len, lit, when};
389 let c1 = col1.expr().clone().cast(DataType::Float64);
390 let c2 = col2.expr().clone().cast(DataType::Float64);
391 let n = len().cast(DataType::Float64);
392 let n1 = (len() - lit(1)).cast(DataType::Float64);
393 let sum_ab = (c1.clone() * c2.clone()).sum();
394 let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
395 let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
396 let sum_a2 = (c1.clone() * c1).sum();
397 let sum_b2 = (c2.clone() * c2).sum();
398 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
399 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
400 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
401 let std_a = var_a.sqrt();
402 let std_b = var_b.sqrt();
403 let e = when(len().gt(lit(1)))
404 .then(cov_samp / (std_a * std_b))
405 .otherwise(lit(f64::NAN));
406 Column::from_expr(e, Some("corr".to_string()))
407}
408
409pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
411 use polars::prelude::{col as pl_col, len, lit, when};
412 let c1 = pl_col(col1).cast(DataType::Float64);
413 let c2 = pl_col(col2).cast(DataType::Float64);
414 let n = len().cast(DataType::Float64);
415 let sum_ab = (c1.clone() * c2.clone()).sum();
416 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
417 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
418 when(len().gt(lit(1)))
419 .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
420 .otherwise(lit(f64::NAN))
421}
422
423pub fn corr_expr(col1: &str, col2: &str) -> Expr {
425 use polars::prelude::{col as pl_col, len, lit, when};
426 let c1 = pl_col(col1).cast(DataType::Float64);
427 let c2 = pl_col(col2).cast(DataType::Float64);
428 let n = len().cast(DataType::Float64);
429 let n1 = (len() - lit(1)).cast(DataType::Float64);
430 let sum_ab = (c1.clone() * c2.clone()).sum();
431 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
432 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
433 let sum_a2 = (c1.clone() * c1).sum();
434 let sum_b2 = (c2.clone() * c2).sum();
435 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
436 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
437 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
438 let std_a = var_a.sqrt();
439 let std_b = var_b.sqrt();
440 when(len().gt(lit(1)))
441 .then(cov_samp / (std_a * std_b))
442 .otherwise(lit(f64::NAN))
443}
444
445fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
448 use polars::prelude::col as pl_col;
449 let y = pl_col(y_col).cast(DataType::Float64);
450 let x = pl_col(x_col).cast(DataType::Float64);
451 let cond = y.clone().is_not_null().and(x.clone().is_not_null());
452 let n = y
453 .clone()
454 .filter(cond.clone())
455 .count()
456 .cast(DataType::Float64);
457 let sum_x = x.clone().filter(cond.clone()).sum();
458 let sum_y = y.clone().filter(cond.clone()).sum();
459 let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
460 let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
461 let sum_xy = (x * y).filter(cond).sum();
462 (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
463}
464
465pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
467 let (n, ..) = regr_cond_and_sums(y_col, x_col);
468 n
469}
470
471pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
473 use polars::prelude::{lit, when};
474 let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
475 when(n.clone().gt(lit(0.0)))
476 .then(sum_x / n)
477 .otherwise(lit(f64::NAN))
478}
479
480pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
482 use polars::prelude::{lit, when};
483 let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
484 when(n.clone().gt(lit(0.0)))
485 .then(sum_y / n)
486 .otherwise(lit(f64::NAN))
487}
488
489pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
491 use polars::prelude::{lit, when};
492 let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
493 when(n.clone().gt(lit(0.0)))
494 .then(sum_xx - sum_x.clone() * sum_x / n)
495 .otherwise(lit(f64::NAN))
496}
497
498pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
500 use polars::prelude::{lit, when};
501 let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
502 when(n.clone().gt(lit(0.0)))
503 .then(sum_yy - sum_y.clone() * sum_y / n)
504 .otherwise(lit(f64::NAN))
505}
506
507pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
509 use polars::prelude::{lit, when};
510 let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
511 when(n.clone().gt(lit(0.0)))
512 .then(sum_xy - sum_x * sum_y / n)
513 .otherwise(lit(f64::NAN))
514}
515
516pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
518 use polars::prelude::{lit, when};
519 let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
520 let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
521 let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
522 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
523 .then(regr_sxy / regr_sxx)
524 .otherwise(lit(f64::NAN))
525}
526
527pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
529 use polars::prelude::{lit, when};
530 let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
531 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
532 let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
533 let slope = regr_sxy.clone() / regr_sxx.clone();
534 let avg_y = sum_y / n.clone();
535 let avg_x = sum_x / n.clone();
536 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
537 .then(avg_y - slope * avg_x)
538 .otherwise(lit(f64::NAN))
539}
540
541pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
543 use polars::prelude::{lit, when};
544 let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
545 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
546 let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
547 let regr_sxy = sum_xy - sum_x * sum_y / n;
548 when(
549 regr_sxx
550 .clone()
551 .gt(lit(0.0))
552 .and(regr_syy.clone().gt(lit(0.0))),
553 )
554 .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
555 .otherwise(lit(f64::NAN))
556}
557
558pub fn when(condition: &Column) -> WhenBuilder {
570 WhenBuilder::new(condition.expr().clone())
571}
572
573pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
575 use polars::prelude::*;
576 let null_expr = Expr::Literal(LiteralValue::Null);
577 let expr = polars::prelude::when(condition.expr().clone())
578 .then(value.expr().clone())
579 .otherwise(null_expr);
580 crate::column::Column::from_expr(expr, None)
581}
582
583pub struct WhenBuilder {
585 condition: Expr,
586}
587
588impl WhenBuilder {
589 fn new(condition: Expr) -> Self {
590 WhenBuilder { condition }
591 }
592
593 pub fn then(self, value: &Column) -> ThenBuilder {
595 use polars::prelude::*;
596 let when_then = when(self.condition).then(value.expr().clone());
597 ThenBuilder::new(when_then)
598 }
599
600 pub fn otherwise(self, _value: &Column) -> Column {
605 panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
608 }
609}
610
611pub struct ThenBuilder {
613 state: WhenThenState,
614}
615
616enum WhenThenState {
617 Single(Box<polars::prelude::Then>),
618 Chained(Box<polars::prelude::ChainedThen>),
619}
620
621pub struct ChainedWhenBuilder {
623 inner: polars::prelude::ChainedWhen,
624}
625
626impl ThenBuilder {
627 fn new(when_then: polars::prelude::Then) -> Self {
628 ThenBuilder {
629 state: WhenThenState::Single(Box::new(when_then)),
630 }
631 }
632
633 fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
634 ThenBuilder {
635 state: WhenThenState::Chained(Box::new(chained)),
636 }
637 }
638
639 pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
641 let chained_when = match self.state {
642 WhenThenState::Single(t) => t.when(condition.expr().clone()),
643 WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
644 };
645 ChainedWhenBuilder {
646 inner: chained_when,
647 }
648 }
649
650 pub fn otherwise(self, value: &Column) -> Column {
652 let expr = match self.state {
653 WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
654 WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
655 };
656 crate::column::Column::from_expr(expr, None)
657 }
658}
659
660impl ChainedWhenBuilder {
661 pub fn then(self, value: &Column) -> ThenBuilder {
663 ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
664 }
665}
666
667pub fn upper(column: &Column) -> Column {
669 column.clone().upper()
670}
671
672pub fn lower(column: &Column) -> Column {
674 column.clone().lower()
675}
676
677pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
679 column.clone().substr(start, length)
680}
681
682pub fn length(column: &Column) -> Column {
684 column.clone().length()
685}
686
687pub fn trim(column: &Column) -> Column {
689 column.clone().trim()
690}
691
692pub fn ltrim(column: &Column) -> Column {
694 column.clone().ltrim()
695}
696
697pub fn rtrim(column: &Column) -> Column {
699 column.clone().rtrim()
700}
701
702pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
704 column.clone().btrim(trim_str)
705}
706
707pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
709 column.clone().locate(substr, pos)
710}
711
712pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
714 column.clone().conv(from_base, to_base)
715}
716
717pub fn hex(column: &Column) -> Column {
719 column.clone().hex()
720}
721
722pub fn unhex(column: &Column) -> Column {
724 column.clone().unhex()
725}
726
727pub fn encode(column: &Column, charset: &str) -> Column {
729 column.clone().encode(charset)
730}
731
732pub fn decode(column: &Column, charset: &str) -> Column {
734 column.clone().decode(charset)
735}
736
737pub fn to_binary(column: &Column, fmt: &str) -> Column {
739 column.clone().to_binary(fmt)
740}
741
742pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
744 column.clone().try_to_binary(fmt)
745}
746
747pub fn aes_encrypt(column: &Column, key: &str) -> Column {
749 column.clone().aes_encrypt(key)
750}
751
752pub fn aes_decrypt(column: &Column, key: &str) -> Column {
754 column.clone().aes_decrypt(key)
755}
756
757pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
759 column.clone().try_aes_decrypt(key)
760}
761
762pub fn bin(column: &Column) -> Column {
764 column.clone().bin()
765}
766
767pub fn getbit(column: &Column, pos: i64) -> Column {
769 column.clone().getbit(pos)
770}
771
772pub fn bit_and(left: &Column, right: &Column) -> Column {
774 left.clone().bit_and(right)
775}
776
777pub fn bit_or(left: &Column, right: &Column) -> Column {
779 left.clone().bit_or(right)
780}
781
782pub fn bit_xor(left: &Column, right: &Column) -> Column {
784 left.clone().bit_xor(right)
785}
786
787pub fn bit_count(column: &Column) -> Column {
789 column.clone().bit_count()
790}
791
792pub fn bitwise_not(column: &Column) -> Column {
794 column.clone().bitwise_not()
795}
796
797pub fn bitmap_bit_position(column: &Column) -> Column {
801 use polars::prelude::DataType;
802 let expr = column.expr().clone().cast(DataType::Int32);
803 Column::from_expr(expr, None)
804}
805
806pub fn bitmap_bucket_number(column: &Column) -> Column {
808 use polars::prelude::DataType;
809 let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
810 Column::from_expr(expr, None)
811}
812
813pub fn bitmap_count(column: &Column) -> Column {
815 use polars::prelude::{DataType, GetOutput};
816 let expr = column.expr().clone().map(
817 crate::udfs::apply_bitmap_count,
818 GetOutput::from_type(DataType::Int64),
819 );
820 Column::from_expr(expr, None)
821}
822
823pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
826 use polars::prelude::{DataType, GetOutput};
827 column.expr().clone().implode().map(
828 crate::udfs::apply_bitmap_construct_agg,
829 GetOutput::from_type(DataType::Binary),
830 )
831}
832
833pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
835 use polars::prelude::{DataType, GetOutput};
836 column.expr().clone().implode().map(
837 crate::udfs::apply_bitmap_or_agg,
838 GetOutput::from_type(DataType::Binary),
839 )
840}
841
842pub fn bit_get(column: &Column, pos: i64) -> Column {
844 getbit(column, pos)
845}
846
847pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
850 column.clone().assert_true(err_msg)
851}
852
853pub fn raise_error(message: &str) -> Column {
855 let msg = message.to_string();
856 let expr = lit(0i64).map(
857 move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
858 Err(PolarsError::ComputeError(msg.clone().into()))
859 },
860 GetOutput::from_type(DataType::Int64),
861 );
862 Column::from_expr(expr, Some("raise_error".to_string()))
863}
864
865pub fn broadcast(df: &DataFrame) -> DataFrame {
867 df.clone()
868}
869
870pub fn spark_partition_id() -> Column {
872 Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
873}
874
875pub fn input_file_name() -> Column {
877 Column::from_expr(lit(""), Some("input_file_name".to_string()))
878}
879
880pub fn monotonically_increasing_id() -> Column {
883 Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
884}
885
886pub fn current_catalog() -> Column {
888 Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
889}
890
891pub fn current_database() -> Column {
893 Column::from_expr(lit("default"), Some("current_database".to_string()))
894}
895
896pub fn current_schema() -> Column {
898 Column::from_expr(lit("default"), Some("current_schema".to_string()))
899}
900
901pub fn current_user() -> Column {
903 Column::from_expr(lit("unknown"), Some("current_user".to_string()))
904}
905
906pub fn user() -> Column {
908 Column::from_expr(lit("unknown"), Some("user".to_string()))
909}
910
911pub fn rand(seed: Option<u64>) -> Column {
914 Column::from_rand(seed)
915}
916
917pub fn randn(seed: Option<u64>) -> Column {
920 Column::from_randn(seed)
921}
922
923pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
926 use polars::prelude::Column as PlColumn;
927
928 let session = crate::session::get_thread_udf_session().ok_or_else(|| {
929 PolarsError::InvalidOperation(
930 "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
931 )
932 })?;
933 let case_sensitive = session.is_case_sensitive();
934
935 let udf = session
937 .udf_registry
938 .get_rust_udf(name, case_sensitive)
939 .ok_or_else(|| {
940 PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
941 })?;
942
943 let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
944 let output_type = DataType::String; let expr = if exprs.len() == 1 {
947 let udf = udf.clone();
948 exprs.into_iter().next().unwrap().map(
949 move |c| {
950 let s = c.take_materialized_series();
951 udf.apply(&[s])
952 .map(|out| Some(PlColumn::new("_".into(), out)))
953 },
954 GetOutput::from_type(output_type),
955 )
956 } else {
957 let udf = udf.clone();
958 let first = exprs[0].clone();
959 let rest: Vec<Expr> = exprs[1..].to_vec();
960 first.map_many(
961 move |columns| {
962 let series: Vec<Series> = columns
963 .iter_mut()
964 .map(|c| std::mem::take(c).take_materialized_series())
965 .collect();
966 udf.apply(&series)
967 .map(|out| Some(PlColumn::new("_".into(), out)))
968 },
969 &rest,
970 GetOutput::from_type(output_type),
971 )
972 };
973
974 Ok(Column::from_expr(expr, Some(format!("{name}()"))))
975}
976
977pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
979 left.clone().arrays_overlap(right)
980}
981
982pub fn arrays_zip(left: &Column, right: &Column) -> Column {
984 left.clone().arrays_zip(right)
985}
986
987pub fn explode_outer(column: &Column) -> Column {
989 column.clone().explode_outer()
990}
991
992pub fn posexplode_outer(column: &Column) -> (Column, Column) {
994 column.clone().posexplode_outer()
995}
996
997pub fn array_agg(column: &Column) -> Column {
999 column.clone().array_agg()
1000}
1001
1002pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
1004 column.clone().transform_keys(key_expr)
1005}
1006
1007pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
1009 column.clone().transform_values(value_expr)
1010}
1011
1012pub fn str_to_map(
1014 column: &Column,
1015 pair_delim: Option<&str>,
1016 key_value_delim: Option<&str>,
1017) -> Column {
1018 let pd = pair_delim.unwrap_or(",");
1019 let kvd = key_value_delim.unwrap_or(":");
1020 column.clone().str_to_map(pd, kvd)
1021}
1022
1023pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
1025 column.clone().regexp_extract(pattern, group_index)
1026}
1027
1028pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
1030 column.clone().regexp_replace(pattern, replacement)
1031}
1032
1033pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
1035 column.clone().split(delimiter, limit)
1036}
1037
1038pub fn initcap(column: &Column) -> Column {
1040 column.clone().initcap()
1041}
1042
1043pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
1045 column.clone().regexp_extract_all(pattern)
1046}
1047
1048pub fn regexp_like(column: &Column, pattern: &str) -> Column {
1050 column.clone().regexp_like(pattern)
1051}
1052
1053pub fn regexp_count(column: &Column, pattern: &str) -> Column {
1055 column.clone().regexp_count(pattern)
1056}
1057
1058pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
1060 column.clone().regexp_substr(pattern)
1061}
1062
1063pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
1065 column.clone().split_part(delimiter, part_num)
1066}
1067
1068pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
1070 column.clone().regexp_instr(pattern, group_idx)
1071}
1072
1073pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
1075 str_column.clone().find_in_set(set_column)
1076}
1077
1078pub fn format_string(format: &str, columns: &[&Column]) -> Column {
1080 use polars::prelude::*;
1081 if columns.is_empty() {
1082 panic!("format_string needs at least one column");
1083 }
1084 let format_owned = format.to_string();
1085 let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
1086 let expr = columns[0].expr().clone().map_many(
1087 move |cols| crate::udfs::apply_format_string(cols, &format_owned),
1088 &args,
1089 GetOutput::from_type(DataType::String),
1090 );
1091 crate::column::Column::from_expr(expr, None)
1092}
1093
1094pub fn printf(format: &str, columns: &[&Column]) -> Column {
1096 format_string(format, columns)
1097}
1098
1099pub fn repeat(column: &Column, n: i32) -> Column {
1101 column.clone().repeat(n)
1102}
1103
1104pub fn reverse(column: &Column) -> Column {
1106 column.clone().reverse()
1107}
1108
1109pub fn instr(column: &Column, substr: &str) -> Column {
1111 column.clone().instr(substr)
1112}
1113
1114pub fn position(substr: &str, column: &Column) -> Column {
1116 column.clone().instr(substr)
1117}
1118
1119pub fn ascii(column: &Column) -> Column {
1121 column.clone().ascii()
1122}
1123
1124pub fn format_number(column: &Column, decimals: u32) -> Column {
1126 column.clone().format_number(decimals)
1127}
1128
1129pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
1131 column.clone().overlay(replace, pos, length)
1132}
1133
1134pub fn char(column: &Column) -> Column {
1136 column.clone().char()
1137}
1138
1139pub fn chr(column: &Column) -> Column {
1141 column.clone().chr()
1142}
1143
1144pub fn base64(column: &Column) -> Column {
1146 column.clone().base64()
1147}
1148
1149pub fn unbase64(column: &Column) -> Column {
1151 column.clone().unbase64()
1152}
1153
1154pub fn sha1(column: &Column) -> Column {
1156 column.clone().sha1()
1157}
1158
1159pub fn sha2(column: &Column, bit_length: i32) -> Column {
1161 column.clone().sha2(bit_length)
1162}
1163
1164pub fn md5(column: &Column) -> Column {
1166 column.clone().md5()
1167}
1168
1169pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1171 column.clone().lpad(length, pad)
1172}
1173
1174pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1176 column.clone().rpad(length, pad)
1177}
1178
1179pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1181 column.clone().translate(from_str, to_str)
1182}
1183
1184pub fn mask(
1186 column: &Column,
1187 upper_char: Option<char>,
1188 lower_char: Option<char>,
1189 digit_char: Option<char>,
1190 other_char: Option<char>,
1191) -> Column {
1192 column
1193 .clone()
1194 .mask(upper_char, lower_char, digit_char, other_char)
1195}
1196
1197pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1199 column.clone().substring_index(delimiter, count)
1200}
1201
1202pub fn left(column: &Column, n: i64) -> Column {
1204 column.clone().left(n)
1205}
1206
1207pub fn right(column: &Column, n: i64) -> Column {
1209 column.clone().right(n)
1210}
1211
1212pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1214 column.clone().replace(search, replacement)
1215}
1216
1217pub fn startswith(column: &Column, prefix: &str) -> Column {
1219 column.clone().startswith(prefix)
1220}
1221
1222pub fn endswith(column: &Column, suffix: &str) -> Column {
1224 column.clone().endswith(suffix)
1225}
1226
1227pub fn contains(column: &Column, substring: &str) -> Column {
1229 column.clone().contains(substring)
1230}
1231
1232pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1235 column.clone().like(pattern, escape_char)
1236}
1237
1238pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1241 column.clone().ilike(pattern, escape_char)
1242}
1243
1244pub fn rlike(column: &Column, pattern: &str) -> Column {
1246 column.clone().regexp_like(pattern)
1247}
1248
1249pub fn regexp(column: &Column, pattern: &str) -> Column {
1251 rlike(column, pattern)
1252}
1253
1254pub fn soundex(column: &Column) -> Column {
1256 column.clone().soundex()
1257}
1258
1259pub fn levenshtein(column: &Column, other: &Column) -> Column {
1261 column.clone().levenshtein(other)
1262}
1263
1264pub fn crc32(column: &Column) -> Column {
1266 column.clone().crc32()
1267}
1268
1269pub fn xxhash64(column: &Column) -> Column {
1271 column.clone().xxhash64()
1272}
1273
1274pub fn abs(column: &Column) -> Column {
1276 column.clone().abs()
1277}
1278
1279pub fn ceil(column: &Column) -> Column {
1281 column.clone().ceil()
1282}
1283
1284pub fn floor(column: &Column) -> Column {
1286 column.clone().floor()
1287}
1288
1289pub fn round(column: &Column, decimals: u32) -> Column {
1291 column.clone().round(decimals)
1292}
1293
1294pub fn bround(column: &Column, scale: i32) -> Column {
1296 column.clone().bround(scale)
1297}
1298
1299pub fn negate(column: &Column) -> Column {
1301 column.clone().negate()
1302}
1303
1304pub fn negative(column: &Column) -> Column {
1306 negate(column)
1307}
1308
1309pub fn positive(column: &Column) -> Column {
1311 column.clone()
1312}
1313
1314pub fn cot(column: &Column) -> Column {
1316 column.clone().cot()
1317}
1318
1319pub fn csc(column: &Column) -> Column {
1321 column.clone().csc()
1322}
1323
1324pub fn sec(column: &Column) -> Column {
1326 column.clone().sec()
1327}
1328
1329pub fn e() -> Column {
1331 Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1332}
1333
1334pub fn pi() -> Column {
1336 Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1337}
1338
1339pub fn sqrt(column: &Column) -> Column {
1341 column.clone().sqrt()
1342}
1343
1344pub fn pow(column: &Column, exp: i64) -> Column {
1346 column.clone().pow(exp)
1347}
1348
1349pub fn exp(column: &Column) -> Column {
1351 column.clone().exp()
1352}
1353
1354pub fn log(column: &Column) -> Column {
1356 column.clone().log()
1357}
1358
1359pub fn log_with_base(column: &Column, base: f64) -> Column {
1361 crate::column::Column::from_expr(column.expr().clone().log(base), None)
1362}
1363
1364pub fn sin(column: &Column) -> Column {
1366 column.clone().sin()
1367}
1368
1369pub fn cos(column: &Column) -> Column {
1371 column.clone().cos()
1372}
1373
1374pub fn tan(column: &Column) -> Column {
1376 column.clone().tan()
1377}
1378
1379pub fn asin(column: &Column) -> Column {
1381 column.clone().asin()
1382}
1383
1384pub fn acos(column: &Column) -> Column {
1386 column.clone().acos()
1387}
1388
1389pub fn atan(column: &Column) -> Column {
1391 column.clone().atan()
1392}
1393
1394pub fn atan2(y: &Column, x: &Column) -> Column {
1396 y.clone().atan2(x)
1397}
1398
1399pub fn degrees(column: &Column) -> Column {
1401 column.clone().degrees()
1402}
1403
1404pub fn radians(column: &Column) -> Column {
1406 column.clone().radians()
1407}
1408
1409pub fn signum(column: &Column) -> Column {
1411 column.clone().signum()
1412}
1413
1414pub fn sign(column: &Column) -> Column {
1416 signum(column)
1417}
1418
1419pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1423 let dtype = parse_type_name(type_name)?;
1424 if dtype == DataType::Boolean {
1425 use polars::prelude::GetOutput;
1426 let expr = column.expr().clone().map(
1427 |col| crate::udfs::apply_string_to_boolean(col, true),
1428 GetOutput::from_type(DataType::Boolean),
1429 );
1430 return Ok(Column::from_expr(expr, None));
1431 }
1432 if dtype == DataType::Date {
1433 use polars::prelude::GetOutput;
1434 let expr = column.expr().clone().map(
1435 |col| crate::udfs::apply_string_to_date(col, true),
1436 GetOutput::from_type(DataType::Date),
1437 );
1438 return Ok(Column::from_expr(expr, None));
1439 }
1440 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1441 use polars::prelude::GetOutput;
1442 let target = dtype.clone();
1443 let expr = column.expr().clone().map(
1445 move |col| crate::udfs::apply_string_to_int(col, true, target.clone()),
1446 GetOutput::from_type(dtype),
1447 );
1448 return Ok(Column::from_expr(expr, None));
1449 }
1450 if dtype == DataType::Float64 {
1451 use polars::prelude::GetOutput;
1452 let expr = column.expr().clone().map(
1454 |col| crate::udfs::apply_string_to_double(col, true),
1455 GetOutput::from_type(DataType::Float64),
1456 );
1457 return Ok(Column::from_expr(expr, None));
1458 }
1459 Ok(Column::from_expr(
1460 column.expr().clone().strict_cast(dtype),
1461 None,
1462 ))
1463}
1464
1465pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1469 let dtype = parse_type_name(type_name)?;
1470 if dtype == DataType::Boolean {
1471 use polars::prelude::GetOutput;
1472 let expr = column.expr().clone().map(
1473 |col| crate::udfs::apply_string_to_boolean(col, false),
1474 GetOutput::from_type(DataType::Boolean),
1475 );
1476 return Ok(Column::from_expr(expr, None));
1477 }
1478 if dtype == DataType::Date {
1479 use polars::prelude::GetOutput;
1480 let expr = column.expr().clone().map(
1481 |col| crate::udfs::apply_string_to_date(col, false),
1482 GetOutput::from_type(DataType::Date),
1483 );
1484 return Ok(Column::from_expr(expr, None));
1485 }
1486 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1487 use polars::prelude::GetOutput;
1488 let target = dtype.clone();
1489 let expr = column.expr().clone().map(
1490 move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
1491 GetOutput::from_type(dtype),
1492 );
1493 return Ok(Column::from_expr(expr, None));
1494 }
1495 if dtype == DataType::Float64 {
1496 use polars::prelude::GetOutput;
1497 let expr = column.expr().clone().map(
1498 |col| crate::udfs::apply_string_to_double(col, false),
1499 GetOutput::from_type(DataType::Float64),
1500 );
1501 return Ok(Column::from_expr(expr, None));
1502 }
1503 Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1504}
1505
1506pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1510 match format {
1511 Some(fmt) => Ok(column
1512 .clone()
1513 .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1514 None => cast(column, "string"),
1515 }
1516}
1517
1518pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1520 to_char(column, format)
1521}
1522
1523pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1526 cast(column, "double")
1527}
1528
1529pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1532 try_cast(column, "double")
1533}
1534
1535pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1538 use polars::prelude::{DataType, GetOutput, TimeUnit};
1539 let fmt_owned = format.map(|s| s.to_string());
1540 let expr = column.expr().clone().map(
1541 move |s| crate::udfs::apply_to_timestamp_format(s, fmt_owned.as_deref(), true),
1542 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1543 );
1544 Ok(crate::column::Column::from_expr(expr, None))
1545}
1546
1547pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1550 use polars::prelude::*;
1551 let fmt_owned = format.map(|s| s.to_string());
1552 let expr = column.expr().clone().map(
1553 move |s| crate::udfs::apply_to_timestamp_format(s, fmt_owned.as_deref(), false),
1554 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1555 );
1556 Ok(crate::column::Column::from_expr(expr, None))
1557}
1558
1559pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1561 use polars::prelude::{DataType, GetOutput, TimeUnit};
1562 match format {
1563 None => crate::cast(column, "timestamp"),
1564 Some(fmt) => {
1565 let fmt_owned = fmt.to_string();
1566 let expr = column.expr().clone().map(
1567 move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1568 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1569 );
1570 Ok(crate::column::Column::from_expr(expr, None))
1571 }
1572 }
1573}
1574
1575pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1577 use polars::prelude::{DataType, GetOutput, TimeUnit};
1578 match format {
1579 None => crate::cast(column, "timestamp"),
1580 Some(fmt) => {
1581 let fmt_owned = fmt.to_string();
1582 let expr = column.expr().clone().map(
1583 move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1584 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1585 );
1586 Ok(crate::column::Column::from_expr(expr, None))
1587 }
1588 }
1589}
1590
1591pub fn try_divide(left: &Column, right: &Column) -> Column {
1593 use polars::prelude::*;
1594 let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1595 let null_expr = Expr::Literal(LiteralValue::Null);
1596 let div_expr =
1597 left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1598 let expr = polars::prelude::when(zero_cond)
1599 .then(null_expr)
1600 .otherwise(div_expr);
1601 crate::column::Column::from_expr(expr, None)
1602}
1603
1604pub fn try_add(left: &Column, right: &Column) -> Column {
1606 let args = [right.expr().clone()];
1607 let expr =
1608 left.expr()
1609 .clone()
1610 .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1611 Column::from_expr(expr, None)
1612}
1613
1614pub fn try_subtract(left: &Column, right: &Column) -> Column {
1616 let args = [right.expr().clone()];
1617 let expr = left.expr().clone().map_many(
1618 crate::udfs::apply_try_subtract,
1619 &args,
1620 GetOutput::same_type(),
1621 );
1622 Column::from_expr(expr, None)
1623}
1624
1625pub fn try_multiply(left: &Column, right: &Column) -> Column {
1627 let args = [right.expr().clone()];
1628 let expr = left.expr().clone().map_many(
1629 crate::udfs::apply_try_multiply,
1630 &args,
1631 GetOutput::same_type(),
1632 );
1633 Column::from_expr(expr, None)
1634}
1635
1636pub fn try_element_at(column: &Column, index: i64) -> Column {
1638 column.clone().element_at(index)
1639}
1640
1641pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1643 if num_bucket <= 0 {
1644 panic!(
1645 "width_bucket: num_bucket must be positive, got {}",
1646 num_bucket
1647 );
1648 }
1649 use polars::prelude::*;
1650 let v = value.expr().clone().cast(DataType::Float64);
1651 let min_expr = lit(min_val);
1652 let max_expr = lit(max_val);
1653 let nb = num_bucket as f64;
1654 let width = (max_val - min_val) / nb;
1655 let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1656 let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1657 let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1658 let expr = polars::prelude::when(v.clone().lt(min_expr))
1659 .then(lit(0i64))
1660 .when(v.gt_eq(max_expr))
1661 .then(lit(num_bucket + 1))
1662 .otherwise(bucket_clamped);
1663 crate::column::Column::from_expr(expr, None)
1664}
1665
1666pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1668 use polars::prelude::*;
1669 if columns.is_empty() {
1670 panic!("elt requires at least one column");
1671 }
1672 let idx_expr = index.expr().clone();
1673 let null_expr = Expr::Literal(LiteralValue::Null);
1674 let mut expr = null_expr;
1675 for (i, c) in columns.iter().enumerate().rev() {
1676 let n = (i + 1) as i64;
1677 expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1678 .then(c.expr().clone())
1679 .otherwise(expr);
1680 }
1681 crate::column::Column::from_expr(expr, None)
1682}
1683
1684pub fn bit_length(column: &Column) -> Column {
1686 column.clone().bit_length()
1687}
1688
1689pub fn octet_length(column: &Column) -> Column {
1691 column.clone().octet_length()
1692}
1693
1694pub fn char_length(column: &Column) -> Column {
1696 column.clone().char_length()
1697}
1698
1699pub fn character_length(column: &Column) -> Column {
1701 column.clone().character_length()
1702}
1703
1704pub fn typeof_(column: &Column) -> Column {
1706 column.clone().typeof_()
1707}
1708
1709pub fn isnan(column: &Column) -> Column {
1711 column.clone().is_nan()
1712}
1713
1714pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1716 if columns.is_empty() {
1717 return Err("greatest requires at least one column".to_string());
1718 }
1719 if columns.len() == 1 {
1720 return Ok((*columns[0]).clone());
1721 }
1722 let mut expr = columns[0].expr().clone();
1723 for c in columns.iter().skip(1) {
1724 let args = [c.expr().clone()];
1725 expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1726 }
1727 Ok(Column::from_expr(expr, None))
1728}
1729
1730pub fn least(columns: &[&Column]) -> Result<Column, String> {
1732 if columns.is_empty() {
1733 return Err("least requires at least one column".to_string());
1734 }
1735 if columns.len() == 1 {
1736 return Ok((*columns[0]).clone());
1737 }
1738 let mut expr = columns[0].expr().clone();
1739 for c in columns.iter().skip(1) {
1740 let args = [c.expr().clone()];
1741 expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1742 }
1743 Ok(Column::from_expr(expr, None))
1744}
1745
1746pub fn year(column: &Column) -> Column {
1748 column.clone().year()
1749}
1750
1751pub fn month(column: &Column) -> Column {
1753 column.clone().month()
1754}
1755
1756pub fn day(column: &Column) -> Column {
1758 column.clone().day()
1759}
1760
1761pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1763 use polars::prelude::GetOutput;
1764 let fmt = format.map(|s| s.to_string());
1765 let expr = column.expr().clone().map(
1766 move |col| crate::udfs::apply_string_to_date_format(col, fmt.as_deref(), false),
1767 GetOutput::from_type(DataType::Date),
1768 );
1769 Ok(Column::from_expr(expr, None))
1770}
1771
1772pub fn date_format(column: &Column, format: &str) -> Column {
1774 column
1775 .clone()
1776 .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1777}
1778
1779pub fn current_date() -> Column {
1781 use polars::prelude::*;
1782 let today = chrono::Utc::now().date_naive();
1783 let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1784 crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1785}
1786
1787pub fn current_timestamp() -> Column {
1789 use polars::prelude::*;
1790 let ts = chrono::Utc::now().timestamp_micros();
1791 crate::column::Column::from_expr(
1792 Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1793 None,
1794 )
1795}
1796
1797pub fn curdate() -> Column {
1799 current_date()
1800}
1801
1802pub fn now() -> Column {
1804 current_timestamp()
1805}
1806
1807pub fn localtimestamp() -> Column {
1809 current_timestamp()
1810}
1811
1812pub fn date_diff(end: &Column, start: &Column) -> Column {
1814 datediff(end, start)
1815}
1816
1817pub fn dateadd(column: &Column, n: i32) -> Column {
1819 date_add(column, n)
1820}
1821
1822pub fn extract(column: &Column, field: &str) -> Column {
1824 column.clone().extract(field)
1825}
1826
1827pub fn date_part(column: &Column, field: &str) -> Column {
1829 extract(column, field)
1830}
1831
1832pub fn datepart(column: &Column, field: &str) -> Column {
1834 extract(column, field)
1835}
1836
1837pub fn unix_micros(column: &Column) -> Column {
1839 column.clone().unix_micros()
1840}
1841
1842pub fn unix_millis(column: &Column) -> Column {
1844 column.clone().unix_millis()
1845}
1846
1847pub fn unix_seconds(column: &Column) -> Column {
1849 column.clone().unix_seconds()
1850}
1851
1852pub fn dayname(column: &Column) -> Column {
1854 column.clone().dayname()
1855}
1856
1857pub fn weekday(column: &Column) -> Column {
1859 column.clone().weekday()
1860}
1861
1862pub fn hour(column: &Column) -> Column {
1864 column.clone().hour()
1865}
1866
1867pub fn minute(column: &Column) -> Column {
1869 column.clone().minute()
1870}
1871
1872pub fn second(column: &Column) -> Column {
1874 column.clone().second()
1875}
1876
1877pub fn date_add(column: &Column, n: i32) -> Column {
1879 column.clone().date_add(n)
1880}
1881
1882pub fn date_sub(column: &Column, n: i32) -> Column {
1884 column.clone().date_sub(n)
1885}
1886
1887pub fn datediff(end: &Column, start: &Column) -> Column {
1889 start.clone().datediff(end)
1890}
1891
1892pub fn last_day(column: &Column) -> Column {
1894 column.clone().last_day()
1895}
1896
1897pub fn trunc(column: &Column, format: &str) -> Column {
1899 column.clone().trunc(format)
1900}
1901
1902pub fn date_trunc(format: &str, column: &Column) -> Column {
1904 trunc(column, format)
1905}
1906
1907pub fn quarter(column: &Column) -> Column {
1909 column.clone().quarter()
1910}
1911
1912pub fn weekofyear(column: &Column) -> Column {
1914 column.clone().weekofyear()
1915}
1916
1917pub fn dayofweek(column: &Column) -> Column {
1919 column.clone().dayofweek()
1920}
1921
1922pub fn dayofyear(column: &Column) -> Column {
1924 column.clone().dayofyear()
1925}
1926
1927pub fn add_months(column: &Column, n: i32) -> Column {
1929 column.clone().add_months(n)
1930}
1931
1932pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1935 end.clone().months_between(start, round_off)
1936}
1937
1938pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1940 column.clone().next_day(day_of_week)
1941}
1942
1943pub fn unix_timestamp_now() -> Column {
1945 use polars::prelude::*;
1946 let secs = chrono::Utc::now().timestamp();
1947 crate::column::Column::from_expr(lit(secs), None)
1948}
1949
1950pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1952 column.clone().unix_timestamp(format)
1953}
1954
1955pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1957 unix_timestamp(column, format)
1958}
1959
1960pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1962 column.clone().from_unixtime(format)
1963}
1964
1965pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1967 use polars::prelude::*;
1968 let args = [month.expr().clone(), day.expr().clone()];
1969 let expr = year.expr().clone().map_many(
1970 crate::udfs::apply_make_date,
1971 &args,
1972 GetOutput::from_type(DataType::Date),
1973 );
1974 crate::column::Column::from_expr(expr, None)
1975}
1976
1977pub fn make_timestamp(
1980 year: &Column,
1981 month: &Column,
1982 day: &Column,
1983 hour: &Column,
1984 minute: &Column,
1985 sec: &Column,
1986 timezone: Option<&str>,
1987) -> Column {
1988 use polars::prelude::*;
1989 let tz_owned = timezone.map(|s| s.to_string());
1990 let args = [
1991 month.expr().clone(),
1992 day.expr().clone(),
1993 hour.expr().clone(),
1994 minute.expr().clone(),
1995 sec.expr().clone(),
1996 ];
1997 let expr = year.expr().clone().map_many(
1998 move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1999 &args,
2000 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
2001 );
2002 crate::column::Column::from_expr(expr, None)
2003}
2004
2005pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
2007 ts.clone().timestampadd(unit, amount)
2008}
2009
2010pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
2012 start.clone().timestampdiff(unit, end)
2013}
2014
2015pub fn days(n: i64) -> Column {
2017 make_interval(0, 0, 0, n, 0, 0, 0)
2018}
2019
2020pub fn hours(n: i64) -> Column {
2022 make_interval(0, 0, 0, 0, n, 0, 0)
2023}
2024
2025pub fn minutes(n: i64) -> Column {
2027 make_interval(0, 0, 0, 0, 0, n, 0)
2028}
2029
2030pub fn months(n: i64) -> Column {
2032 make_interval(0, n, 0, 0, 0, 0, 0)
2033}
2034
2035pub fn years(n: i64) -> Column {
2037 make_interval(n, 0, 0, 0, 0, 0, 0)
2038}
2039
2040pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
2042 column.clone().from_utc_timestamp(tz)
2043}
2044
2045pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
2047 column.clone().to_utc_timestamp(tz)
2048}
2049
2050pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
2052 let source_tz = source_tz.to_string();
2053 let target_tz = target_tz.to_string();
2054 let expr = column.expr().clone().map(
2055 move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
2056 GetOutput::same_type(),
2057 );
2058 crate::column::Column::from_expr(expr, None)
2059}
2060
2061pub fn current_timezone() -> Column {
2063 use polars::prelude::*;
2064 crate::column::Column::from_expr(lit("UTC"), None)
2065}
2066
2067pub fn make_interval(
2069 years: i64,
2070 months: i64,
2071 weeks: i64,
2072 days: i64,
2073 hours: i64,
2074 mins: i64,
2075 secs: i64,
2076) -> Column {
2077 use polars::prelude::*;
2078 let total_days = years * 365 + months * 30 + weeks * 7 + days;
2080 let args = DurationArgs::new()
2081 .with_days(lit(total_days))
2082 .with_hours(lit(hours))
2083 .with_minutes(lit(mins))
2084 .with_seconds(lit(secs));
2085 let dur = duration(args);
2086 crate::column::Column::from_expr(dur, None)
2087}
2088
2089pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2091 use polars::prelude::*;
2092 let args = DurationArgs::new()
2093 .with_days(lit(days))
2094 .with_hours(lit(hours))
2095 .with_minutes(lit(minutes))
2096 .with_seconds(lit(seconds));
2097 let dur = duration(args);
2098 crate::column::Column::from_expr(dur, None)
2099}
2100
2101pub fn make_ym_interval(years: i32, months: i32) -> Column {
2103 use polars::prelude::*;
2104 let total_months = years * 12 + months;
2105 crate::column::Column::from_expr(lit(total_months), None)
2106}
2107
2108pub fn make_timestamp_ntz(
2110 year: &Column,
2111 month: &Column,
2112 day: &Column,
2113 hour: &Column,
2114 minute: &Column,
2115 sec: &Column,
2116) -> Column {
2117 make_timestamp(year, month, day, hour, minute, sec, None)
2118}
2119
2120pub fn timestamp_seconds(column: &Column) -> Column {
2122 column.clone().timestamp_seconds()
2123}
2124
2125pub fn timestamp_millis(column: &Column) -> Column {
2127 column.clone().timestamp_millis()
2128}
2129
2130pub fn timestamp_micros(column: &Column) -> Column {
2132 column.clone().timestamp_micros()
2133}
2134
2135pub fn unix_date(column: &Column) -> Column {
2137 column.clone().unix_date()
2138}
2139
2140pub fn date_from_unix_date(column: &Column) -> Column {
2142 column.clone().date_from_unix_date()
2143}
2144
2145pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2147 dividend.clone().pmod(divisor)
2148}
2149
2150pub fn factorial(column: &Column) -> Column {
2152 column.clone().factorial()
2153}
2154
2155pub fn concat(columns: &[&Column]) -> Column {
2157 use polars::prelude::*;
2158 if columns.is_empty() {
2159 panic!("concat requires at least one column");
2160 }
2161 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2162 crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2163}
2164
2165pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2167 use polars::prelude::*;
2168 if columns.is_empty() {
2169 panic!("concat_ws requires at least one column");
2170 }
2171 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2172 crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2173}
2174
2175pub fn row_number(column: &Column) -> Column {
2185 column.clone().row_number(false)
2186}
2187
2188pub fn rank(column: &Column, descending: bool) -> Column {
2190 column.clone().rank(descending)
2191}
2192
2193pub fn dense_rank(column: &Column, descending: bool) -> Column {
2195 column.clone().dense_rank(descending)
2196}
2197
2198pub fn lag(column: &Column, n: i64) -> Column {
2200 column.clone().lag(n)
2201}
2202
2203pub fn lead(column: &Column, n: i64) -> Column {
2205 column.clone().lead(n)
2206}
2207
2208pub fn first_value(column: &Column) -> Column {
2210 column.clone().first_value()
2211}
2212
2213pub fn last_value(column: &Column) -> Column {
2215 column.clone().last_value()
2216}
2217
2218pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2220 column.clone().percent_rank(partition_by, descending)
2221}
2222
2223pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2225 column.clone().cume_dist(partition_by, descending)
2226}
2227
2228pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2230 column.clone().ntile(n, partition_by, descending)
2231}
2232
2233pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2235 column.clone().nth_value(n, partition_by, descending)
2236}
2237
2238pub fn coalesce(columns: &[&Column]) -> Column {
2248 use polars::prelude::*;
2249 if columns.is_empty() {
2250 panic!("coalesce requires at least one column");
2251 }
2252 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2253 let expr = coalesce(&exprs);
2254 crate::column::Column::from_expr(expr, None)
2255}
2256
2257pub fn nvl(column: &Column, value: &Column) -> Column {
2259 coalesce(&[column, value])
2260}
2261
2262pub fn ifnull(column: &Column, value: &Column) -> Column {
2264 nvl(column, value)
2265}
2266
2267pub fn nullif(column: &Column, value: &Column) -> Column {
2269 use polars::prelude::*;
2270 let cond = column.expr().clone().eq(value.expr().clone());
2271 let null_lit = Expr::Literal(LiteralValue::Null);
2272 let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2273 crate::column::Column::from_expr(expr, None)
2274}
2275
2276pub fn nanvl(column: &Column, value: &Column) -> Column {
2278 use polars::prelude::*;
2279 let cond = column.expr().clone().is_nan();
2280 let expr = when(cond)
2281 .then(value.expr().clone())
2282 .otherwise(column.expr().clone());
2283 crate::column::Column::from_expr(expr, None)
2284}
2285
2286pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2288 use polars::prelude::*;
2289 let cond = col1.expr().clone().is_not_null();
2290 let expr = when(cond)
2291 .then(col2.expr().clone())
2292 .otherwise(col3.expr().clone());
2293 crate::column::Column::from_expr(expr, None)
2294}
2295
2296pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2298 substring(column, start, length)
2299}
2300
2301pub fn power(column: &Column, exp: i64) -> Column {
2303 pow(column, exp)
2304}
2305
2306pub fn ln(column: &Column) -> Column {
2308 log(column)
2309}
2310
2311pub fn ceiling(column: &Column) -> Column {
2313 ceil(column)
2314}
2315
2316pub fn lcase(column: &Column) -> Column {
2318 lower(column)
2319}
2320
2321pub fn ucase(column: &Column) -> Column {
2323 upper(column)
2324}
2325
2326pub fn dayofmonth(column: &Column) -> Column {
2328 day(column)
2329}
2330
2331pub fn to_degrees(column: &Column) -> Column {
2333 degrees(column)
2334}
2335
2336pub fn to_radians(column: &Column) -> Column {
2338 radians(column)
2339}
2340
2341pub fn cosh(column: &Column) -> Column {
2343 column.clone().cosh()
2344}
2345pub fn sinh(column: &Column) -> Column {
2347 column.clone().sinh()
2348}
2349pub fn tanh(column: &Column) -> Column {
2351 column.clone().tanh()
2352}
2353pub fn acosh(column: &Column) -> Column {
2355 column.clone().acosh()
2356}
2357pub fn asinh(column: &Column) -> Column {
2359 column.clone().asinh()
2360}
2361pub fn atanh(column: &Column) -> Column {
2363 column.clone().atanh()
2364}
2365pub fn cbrt(column: &Column) -> Column {
2367 column.clone().cbrt()
2368}
2369pub fn expm1(column: &Column) -> Column {
2371 column.clone().expm1()
2372}
2373pub fn log1p(column: &Column) -> Column {
2375 column.clone().log1p()
2376}
2377pub fn log10(column: &Column) -> Column {
2379 column.clone().log10()
2380}
2381pub fn log2(column: &Column) -> Column {
2383 column.clone().log2()
2384}
2385pub fn rint(column: &Column) -> Column {
2387 column.clone().rint()
2388}
2389pub fn hypot(x: &Column, y: &Column) -> Column {
2391 let xx = x.expr().clone() * x.expr().clone();
2392 let yy = y.expr().clone() * y.expr().clone();
2393 crate::column::Column::from_expr((xx + yy).sqrt(), None)
2394}
2395
2396pub fn isnull(column: &Column) -> Column {
2398 column.clone().is_null()
2399}
2400
2401pub fn isnotnull(column: &Column) -> Column {
2403 column.clone().is_not_null()
2404}
2405
2406pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2409 use polars::prelude::*;
2410 if columns.is_empty() {
2411 let empty_inner = Series::new("".into(), Vec::<i64>::new());
2414 let list_series = ListChunked::from_iter([Some(empty_inner)])
2415 .with_name("array".into())
2416 .into_series();
2417 let expr = lit(list_series).first();
2418 return Ok(crate::column::Column::from_expr(expr, None));
2419 }
2420 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2421 let expr = concat_list(exprs)
2422 .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2423 Ok(crate::column::Column::from_expr(expr, None))
2424}
2425
2426pub fn array_size(column: &Column) -> Column {
2428 column.clone().array_size()
2429}
2430
2431pub fn size(column: &Column) -> Column {
2433 column.clone().array_size()
2434}
2435
2436pub fn cardinality(column: &Column) -> Column {
2438 column.clone().cardinality()
2439}
2440
2441pub fn array_contains(column: &Column, value: &Column) -> Column {
2443 column.clone().array_contains(value.expr().clone())
2444}
2445
2446pub fn array_join(column: &Column, separator: &str) -> Column {
2448 column.clone().array_join(separator)
2449}
2450
2451pub fn array_max(column: &Column) -> Column {
2453 column.clone().array_max()
2454}
2455
2456pub fn array_min(column: &Column) -> Column {
2458 column.clone().array_min()
2459}
2460
2461pub fn element_at(column: &Column, index: i64) -> Column {
2463 column.clone().element_at(index)
2464}
2465
2466pub fn array_sort(column: &Column) -> Column {
2468 column.clone().array_sort()
2469}
2470
2471pub fn array_distinct(column: &Column) -> Column {
2473 column.clone().array_distinct()
2474}
2475
2476pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2478 column.clone().array_slice(start, length)
2479}
2480
2481pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2484 use polars::prelude::{as_struct, lit, DataType, GetOutput};
2485 let step_expr = step
2486 .map(|c| c.expr().clone().alias("2"))
2487 .unwrap_or_else(|| lit(1i64).alias("2"));
2488 let struct_expr = as_struct(vec![
2489 start.expr().clone().alias("0"),
2490 stop.expr().clone().alias("1"),
2491 step_expr,
2492 ]);
2493 let out_dtype = DataType::List(Box::new(DataType::Int64));
2494 let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2495 crate::column::Column::from_expr(expr, None)
2496}
2497
2498pub fn shuffle(column: &Column) -> Column {
2500 use polars::prelude::GetOutput;
2501 let expr = column
2502 .expr()
2503 .clone()
2504 .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2505 crate::column::Column::from_expr(expr, None)
2506}
2507
2508pub fn inline(column: &Column) -> Column {
2511 column.clone().explode()
2512}
2513
2514pub fn inline_outer(column: &Column) -> Column {
2516 column.clone().explode_outer()
2517}
2518
2519pub fn explode(column: &Column) -> Column {
2521 column.clone().explode()
2522}
2523
2524pub fn array_position(column: &Column, value: &Column) -> Column {
2527 column.clone().array_position(value.expr().clone())
2528}
2529
2530pub fn array_compact(column: &Column) -> Column {
2532 column.clone().array_compact()
2533}
2534
2535pub fn array_remove(column: &Column, value: &Column) -> Column {
2538 column.clone().array_remove(value.expr().clone())
2539}
2540
2541pub fn array_repeat(column: &Column, n: i64) -> Column {
2543 column.clone().array_repeat(n)
2544}
2545
2546pub fn array_flatten(column: &Column) -> Column {
2548 column.clone().array_flatten()
2549}
2550
2551pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2553 column.clone().array_exists(predicate)
2554}
2555
2556pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2558 column.clone().array_forall(predicate)
2559}
2560
2561pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2563 column.clone().array_filter(predicate)
2564}
2565
2566pub fn array_transform(column: &Column, f: Expr) -> Column {
2568 column.clone().array_transform(f)
2569}
2570
2571pub fn array_sum(column: &Column) -> Column {
2573 column.clone().array_sum()
2574}
2575
2576pub fn aggregate(column: &Column, zero: &Column) -> Column {
2578 column.clone().array_aggregate(zero)
2579}
2580
2581pub fn array_mean(column: &Column) -> Column {
2583 column.clone().array_mean()
2584}
2585
2586pub fn posexplode(column: &Column) -> (Column, Column) {
2589 column.clone().posexplode()
2590}
2591
2592pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2596 use polars::chunked_array::StructChunked;
2597 use polars::prelude::{as_struct, concat_list, lit, IntoSeries, ListChunked};
2598 if key_values.is_empty() {
2599 let key_s = Series::new("key".into(), Vec::<String>::new());
2601 let value_s = Series::new("value".into(), Vec::<String>::new());
2602 let fields: [&Series; 2] = [&key_s, &value_s];
2603 let empty_struct = StructChunked::from_series(
2604 polars::prelude::PlSmallStr::EMPTY,
2605 0,
2606 fields.iter().copied(),
2607 )
2608 .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2609 .into_series();
2610 let list_series = ListChunked::from_iter([Some(empty_struct)])
2611 .with_name("create_map".into())
2612 .into_series();
2613 let expr = lit(list_series).first();
2614 return Ok(crate::column::Column::from_expr(expr, None));
2615 }
2616 let mut struct_exprs: Vec<Expr> = Vec::new();
2617 for i in (0..key_values.len()).step_by(2) {
2618 if i + 1 < key_values.len() {
2619 let k = key_values[i].expr().clone().alias("key");
2620 let v = key_values[i + 1].expr().clone().alias("value");
2621 struct_exprs.push(as_struct(vec![k, v]));
2622 }
2623 }
2624 let expr = concat_list(struct_exprs)
2625 .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2626 Ok(crate::column::Column::from_expr(expr, None))
2627}
2628
2629pub fn map_keys(column: &Column) -> Column {
2631 column.clone().map_keys()
2632}
2633
2634pub fn map_values(column: &Column) -> Column {
2636 column.clone().map_values()
2637}
2638
2639pub fn map_entries(column: &Column) -> Column {
2641 column.clone().map_entries()
2642}
2643
2644pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2646 keys.clone().map_from_arrays(values)
2647}
2648
2649pub fn map_concat(a: &Column, b: &Column) -> Column {
2651 a.clone().map_concat(b)
2652}
2653
2654pub fn map_from_entries(column: &Column) -> Column {
2656 column.clone().map_from_entries()
2657}
2658
2659pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2661 map_col.clone().map_contains_key(key)
2662}
2663
2664pub fn get(map_col: &Column, key: &Column) -> Column {
2666 map_col.clone().get(key)
2667}
2668
2669pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2671 map_col.clone().map_filter(predicate)
2672}
2673
2674pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2676 map1.clone().map_zip_with(map2, merge)
2677}
2678
2679pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2681 use polars::prelude::col;
2682 let left_field = col("").struct_().field_by_name("left");
2683 let right_field = col("").struct_().field_by_name("right");
2684 let merge = crate::column::Column::from_expr(
2685 coalesce(&[
2686 &crate::column::Column::from_expr(left_field, None),
2687 &crate::column::Column::from_expr(right_field, None),
2688 ])
2689 .into_expr(),
2690 None,
2691 );
2692 left.clone().zip_with(right, merge.into_expr())
2693}
2694
2695pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2697 use polars::prelude::col;
2698 let v1 = col("").struct_().field_by_name("value1");
2699 let v2 = col("").struct_().field_by_name("value2");
2700 let merge = coalesce(&[
2701 &crate::column::Column::from_expr(v1, None),
2702 &crate::column::Column::from_expr(v2, None),
2703 ])
2704 .into_expr();
2705 map1.clone().map_zip_with(map2, merge)
2706}
2707
2708pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2710 use polars::prelude::{col, lit};
2711 let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2712 map_col.clone().map_filter(pred)
2713}
2714
2715pub fn struct_(columns: &[&Column]) -> Column {
2717 use polars::prelude::as_struct;
2718 if columns.is_empty() {
2719 panic!("struct requires at least one column");
2720 }
2721 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2722 crate::column::Column::from_expr(as_struct(exprs), None)
2723}
2724
2725pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2727 use polars::prelude::as_struct;
2728 if pairs.is_empty() {
2729 panic!("named_struct requires at least one (name, column) pair");
2730 }
2731 let exprs: Vec<Expr> = pairs
2732 .iter()
2733 .map(|(name, col)| col.expr().clone().alias(*name))
2734 .collect();
2735 crate::column::Column::from_expr(as_struct(exprs), None)
2736}
2737
2738pub fn array_append(array: &Column, elem: &Column) -> Column {
2740 array.clone().array_append(elem)
2741}
2742
2743pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2745 array.clone().array_prepend(elem)
2746}
2747
2748pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2750 array.clone().array_insert(pos, elem)
2751}
2752
2753pub fn array_except(a: &Column, b: &Column) -> Column {
2755 a.clone().array_except(b)
2756}
2757
2758pub fn array_intersect(a: &Column, b: &Column) -> Column {
2760 a.clone().array_intersect(b)
2761}
2762
2763pub fn array_union(a: &Column, b: &Column) -> Column {
2765 a.clone().array_union(b)
2766}
2767
2768pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2770 left.clone().zip_with(right, merge)
2771}
2772
2773pub fn get_json_object(column: &Column, path: &str) -> Column {
2775 column.clone().get_json_object(path)
2776}
2777
2778pub fn json_object_keys(column: &Column) -> Column {
2780 column.clone().json_object_keys()
2781}
2782
2783pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2785 column.clone().json_tuple(keys)
2786}
2787
2788pub fn from_csv(column: &Column) -> Column {
2790 column.clone().from_csv()
2791}
2792
2793pub fn to_csv(column: &Column) -> Column {
2795 column.clone().to_csv()
2796}
2797
2798pub fn schema_of_csv(_column: &Column) -> Column {
2800 Column::from_expr(
2801 lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2802 Some("schema_of_csv".to_string()),
2803 )
2804}
2805
2806pub fn schema_of_json(_column: &Column) -> Column {
2808 Column::from_expr(
2809 lit("STRUCT<>".to_string()),
2810 Some("schema_of_json".to_string()),
2811 )
2812}
2813
2814pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2816 column.clone().from_json(schema)
2817}
2818
2819pub fn to_json(column: &Column) -> Column {
2821 column.clone().to_json()
2822}
2823
2824pub fn isin(column: &Column, other: &Column) -> Column {
2826 column.clone().isin(other)
2827}
2828
2829pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2831 let s = Series::from_iter(values.iter().cloned());
2832 Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2833}
2834
2835pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2837 let s: Series = Series::from_iter(values.iter().copied());
2838 Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2839}
2840
2841pub fn url_decode(column: &Column) -> Column {
2843 column.clone().url_decode()
2844}
2845
2846pub fn url_encode(column: &Column) -> Column {
2848 column.clone().url_encode()
2849}
2850
2851pub fn shift_left(column: &Column, n: i32) -> Column {
2853 column.clone().shift_left(n)
2854}
2855
2856pub fn shift_right(column: &Column, n: i32) -> Column {
2858 column.clone().shift_right(n)
2859}
2860
2861pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2863 column.clone().shift_right_unsigned(n)
2864}
2865
2866pub fn version() -> Column {
2868 Column::from_expr(
2869 lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2870 None,
2871 )
2872}
2873
2874pub fn equal_null(left: &Column, right: &Column) -> Column {
2876 left.clone().eq_null_safe(right)
2877}
2878
2879pub fn json_array_length(column: &Column, path: &str) -> Column {
2881 column.clone().json_array_length(path)
2882}
2883
2884pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2887 column.clone().parse_url(part, key)
2888}
2889
2890pub fn hash(columns: &[&Column]) -> Column {
2892 use polars::prelude::*;
2893 if columns.is_empty() {
2894 return crate::column::Column::from_expr(lit(0i64), None);
2895 }
2896 if columns.len() == 1 {
2897 return columns[0].clone().hash();
2898 }
2899 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2900 let struct_expr = polars::prelude::as_struct(exprs);
2901 let name = columns[0].name().to_string();
2902 let expr = struct_expr.map(
2903 crate::udfs::apply_hash_struct,
2904 GetOutput::from_type(DataType::Int64),
2905 );
2906 crate::column::Column::from_expr(expr, Some(name))
2907}
2908
2909pub fn stack(columns: &[&Column]) -> Column {
2911 struct_(columns)
2912}
2913
2914#[cfg(test)]
2915mod tests {
2916 use super::*;
2917 use polars::prelude::{df, IntoLazy};
2918
2919 #[test]
2920 fn test_col_creates_column() {
2921 let column = col("test");
2922 assert_eq!(column.name(), "test");
2923 }
2924
2925 #[test]
2926 fn test_lit_i32() {
2927 let column = lit_i32(42);
2928 assert_eq!(column.name(), "<expr>");
2930 }
2931
2932 #[test]
2933 fn test_lit_i64() {
2934 let column = lit_i64(123456789012345i64);
2935 assert_eq!(column.name(), "<expr>");
2936 }
2937
2938 #[test]
2939 fn test_lit_f64() {
2940 let column = lit_f64(std::f64::consts::PI);
2941 assert_eq!(column.name(), "<expr>");
2942 }
2943
2944 #[test]
2945 fn test_lit_bool() {
2946 let column = lit_bool(true);
2947 assert_eq!(column.name(), "<expr>");
2948 }
2949
2950 #[test]
2951 fn test_lit_str() {
2952 let column = lit_str("hello");
2953 assert_eq!(column.name(), "<expr>");
2954 }
2955
2956 #[test]
2957 fn test_create_map_empty() {
2958 let empty_col = create_map(&[]).unwrap();
2960 let df = df!("id" => &[1i64, 2i64]).unwrap();
2961 let out = df
2962 .lazy()
2963 .with_columns([empty_col.into_expr().alias("m")])
2964 .collect()
2965 .unwrap();
2966 assert_eq!(out.height(), 2);
2967 let m = out.column("m").unwrap();
2968 assert_eq!(m.len(), 2);
2969 let list = m.list().unwrap();
2970 for i in 0..2 {
2971 let row = list.get(i).unwrap();
2972 assert_eq!(row.len(), 0);
2973 }
2974 }
2975
2976 #[test]
2977 fn test_count_aggregation() {
2978 let column = col("value");
2979 let result = count(&column);
2980 assert_eq!(result.name(), "count");
2981 }
2982
2983 #[test]
2984 fn test_sum_aggregation() {
2985 let column = col("value");
2986 let result = sum(&column);
2987 assert_eq!(result.name(), "sum");
2988 }
2989
2990 #[test]
2991 fn test_avg_aggregation() {
2992 let column = col("value");
2993 let result = avg(&column);
2994 assert_eq!(result.name(), "avg");
2995 }
2996
2997 #[test]
2998 fn test_max_aggregation() {
2999 let column = col("value");
3000 let result = max(&column);
3001 assert_eq!(result.name(), "max");
3002 }
3003
3004 #[test]
3005 fn test_min_aggregation() {
3006 let column = col("value");
3007 let result = min(&column);
3008 assert_eq!(result.name(), "min");
3009 }
3010
3011 #[test]
3012 fn test_when_then_otherwise() {
3013 let df = df!(
3015 "age" => &[15, 25, 35]
3016 )
3017 .unwrap();
3018
3019 let age_col = col("age");
3021 let condition = age_col.gt(polars::prelude::lit(18));
3022 let result = when(&condition)
3023 .then(&lit_str("adult"))
3024 .otherwise(&lit_str("minor"));
3025
3026 let result_df = df
3028 .lazy()
3029 .with_column(result.into_expr().alias("status"))
3030 .collect()
3031 .unwrap();
3032
3033 let status_col = result_df.column("status").unwrap();
3035 let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
3036
3037 assert_eq!(values[0], Some("minor")); assert_eq!(values[1], Some("adult")); assert_eq!(values[2], Some("adult")); }
3041
3042 #[test]
3043 fn test_coalesce_returns_first_non_null() {
3044 let df = df!(
3046 "a" => &[Some(1), None, None],
3047 "b" => &[None, Some(2), None],
3048 "c" => &[None, None, Some(3)]
3049 )
3050 .unwrap();
3051
3052 let col_a = col("a");
3053 let col_b = col("b");
3054 let col_c = col("c");
3055 let result = coalesce(&[&col_a, &col_b, &col_c]);
3056
3057 let result_df = df
3059 .lazy()
3060 .with_column(result.into_expr().alias("coalesced"))
3061 .collect()
3062 .unwrap();
3063
3064 let coalesced_col = result_df.column("coalesced").unwrap();
3066 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3067
3068 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(2)); assert_eq!(values[2], Some(3)); }
3072
3073 #[test]
3074 fn test_coalesce_with_literal_fallback() {
3075 let df = df!(
3077 "a" => &[Some(1), None],
3078 "b" => &[None::<i32>, None::<i32>]
3079 )
3080 .unwrap();
3081
3082 let col_a = col("a");
3083 let col_b = col("b");
3084 let fallback = lit_i32(0);
3085 let result = coalesce(&[&col_a, &col_b, &fallback]);
3086
3087 let result_df = df
3089 .lazy()
3090 .with_column(result.into_expr().alias("coalesced"))
3091 .collect()
3092 .unwrap();
3093
3094 let coalesced_col = result_df.column("coalesced").unwrap();
3096 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3097
3098 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(0)); }
3101
3102 #[test]
3103 #[should_panic(expected = "coalesce requires at least one column")]
3104 fn test_coalesce_empty_panics() {
3105 let columns: [&Column; 0] = [];
3106 let _ = coalesce(&columns);
3107 }
3108
3109 #[test]
3110 fn test_cast_double_string_column_strict_ok() {
3111 let df = df!(
3113 "s" => &["123", " 45.5 ", "0"]
3114 )
3115 .unwrap();
3116
3117 let s_col = col("s");
3118 let cast_col = cast(&s_col, "double").unwrap();
3119
3120 let out = df
3121 .lazy()
3122 .with_column(cast_col.into_expr().alias("v"))
3123 .collect()
3124 .unwrap();
3125
3126 let v = out.column("v").unwrap();
3127 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3128 assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3129 }
3130
3131 #[test]
3132 fn test_try_cast_double_string_column_invalid_to_null() {
3133 let df = df!(
3135 "s" => &["123", " 45.5 ", "abc", ""]
3136 )
3137 .unwrap();
3138
3139 let s_col = col("s");
3140 let try_cast_col = try_cast(&s_col, "double").unwrap();
3141
3142 let out = df
3143 .lazy()
3144 .with_column(try_cast_col.into_expr().alias("v"))
3145 .collect()
3146 .unwrap();
3147
3148 let v = out.column("v").unwrap();
3149 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3150 assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3151 }
3152
3153 #[test]
3154 fn test_to_number_and_try_to_number_numerics_and_strings() {
3155 let df = df!(
3157 "i" => &[1i32, 2, 3],
3158 "f" => &[1.5f64, 2.5, 3.5],
3159 "s" => &["10", "20.5", "xyz"]
3160 )
3161 .unwrap();
3162
3163 let i_col = col("i");
3164 let f_col = col("f");
3165 let s_col = col("s");
3166
3167 let to_number_i = to_number(&i_col, None).unwrap();
3168 let to_number_f = to_number(&f_col, None).unwrap();
3169 let try_to_number_s = try_to_number(&s_col, None).unwrap();
3170
3171 let out = df
3172 .lazy()
3173 .with_columns([
3174 to_number_i.into_expr().alias("i_num"),
3175 to_number_f.into_expr().alias("f_num"),
3176 try_to_number_s.into_expr().alias("s_num"),
3177 ])
3178 .collect()
3179 .unwrap();
3180
3181 let i_num = out.column("i_num").unwrap();
3182 let f_num = out.column("f_num").unwrap();
3183 let s_num = out.column("s_num").unwrap();
3184
3185 let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3186 let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3187 let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3188
3189 assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3190 assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3191 assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3192 }
3193}