1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5#[derive(Debug, Clone)]
10pub struct SortOrder {
11 pub(crate) expr: Expr,
12 pub(crate) descending: bool,
13 pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17 pub fn expr(&self) -> &Expr {
18 &self.expr
19 }
20}
21
22pub fn asc(column: &Column) -> SortOrder {
24 SortOrder {
25 expr: column.expr().clone(),
26 descending: false,
27 nulls_last: false,
28 }
29}
30
31pub fn asc_nulls_first(column: &Column) -> SortOrder {
33 SortOrder {
34 expr: column.expr().clone(),
35 descending: false,
36 nulls_last: false,
37 }
38}
39
40pub fn asc_nulls_last(column: &Column) -> SortOrder {
42 SortOrder {
43 expr: column.expr().clone(),
44 descending: false,
45 nulls_last: true,
46 }
47}
48
49pub fn desc(column: &Column) -> SortOrder {
51 SortOrder {
52 expr: column.expr().clone(),
53 descending: true,
54 nulls_last: true,
55 }
56}
57
58pub fn desc_nulls_first(column: &Column) -> SortOrder {
60 SortOrder {
61 expr: column.expr().clone(),
62 descending: true,
63 nulls_last: false,
64 }
65}
66
67pub fn desc_nulls_last(column: &Column) -> SortOrder {
69 SortOrder {
70 expr: column.expr().clone(),
71 descending: true,
72 nulls_last: true,
73 }
74}
75
76pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80 let s = name.trim().to_lowercase();
81 Ok(match s.as_str() {
82 "int" | "integer" => DataType::Int32,
83 "long" | "bigint" => DataType::Int64,
84 "float" => DataType::Float32,
85 "double" => DataType::Float64,
86 "string" | "str" => DataType::String,
87 "boolean" | "bool" => DataType::Boolean,
88 "date" => DataType::Date,
89 "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
90 _ => return Err(format!("unknown type name: {name}")),
91 })
92}
93
94pub fn col(name: &str) -> Column {
96 Column::new(name.to_string())
97}
98
99pub fn grouping(column: &Column) -> Column {
101 let _ = column;
102 Column::from_expr(lit(0i32), Some("grouping".to_string()))
103}
104
105pub fn grouping_id(_columns: &[Column]) -> Column {
107 Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
108}
109
110pub fn lit_i32(value: i32) -> Column {
112 let expr: Expr = lit(value);
113 Column::from_expr(expr, None)
114}
115
116pub fn lit_i64(value: i64) -> Column {
117 let expr: Expr = lit(value);
118 Column::from_expr(expr, None)
119}
120
121pub fn lit_f64(value: f64) -> Column {
122 let expr: Expr = lit(value);
123 Column::from_expr(expr, None)
124}
125
126pub fn lit_bool(value: bool) -> Column {
127 let expr: Expr = lit(value);
128 Column::from_expr(expr, None)
129}
130
131pub fn lit_str(value: &str) -> Column {
132 let expr: Expr = lit(value);
133 Column::from_expr(expr, None)
134}
135
136pub fn count(col: &Column) -> Column {
138 Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
139}
140
141pub fn sum(col: &Column) -> Column {
143 Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
144}
145
146pub fn avg(col: &Column) -> Column {
148 Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
149}
150
151pub fn mean(col: &Column) -> Column {
153 avg(col)
154}
155
156pub fn max(col: &Column) -> Column {
158 Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
159}
160
161pub fn min(col: &Column) -> Column {
163 Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
164}
165
166pub fn stddev(col: &Column) -> Column {
168 Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
169}
170
171pub fn variance(col: &Column) -> Column {
173 Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
174}
175
176pub fn stddev_pop(col: &Column) -> Column {
178 Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
179}
180
181pub fn stddev_samp(col: &Column) -> Column {
183 stddev(col)
184}
185
186pub fn std(col: &Column) -> Column {
188 stddev(col)
189}
190
191pub fn var_pop(col: &Column) -> Column {
193 Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
194}
195
196pub fn var_samp(col: &Column) -> Column {
198 variance(col)
199}
200
201pub fn median(col: &Column) -> Column {
203 use polars::prelude::QuantileMethod;
204 Column::from_expr(
205 col.expr()
206 .clone()
207 .quantile(lit(0.5), QuantileMethod::Linear),
208 Some("median".to_string()),
209 )
210}
211
212pub fn approx_percentile(col: &Column, percentage: f64) -> Column {
214 use polars::prelude::QuantileMethod;
215 Column::from_expr(
216 col.expr()
217 .clone()
218 .quantile(lit(percentage), QuantileMethod::Linear),
219 Some(format!("approx_percentile({percentage})")),
220 )
221}
222
223pub fn percentile_approx(col: &Column, percentage: f64) -> Column {
225 approx_percentile(col, percentage)
226}
227
228pub fn mode(col: &Column) -> Column {
230 col.clone().mode()
231}
232
233pub fn count_distinct(col: &Column) -> Column {
235 use polars::prelude::DataType;
236 Column::from_expr(
237 col.expr().clone().n_unique().cast(DataType::Int64),
238 Some("count_distinct".to_string()),
239 )
240}
241
242pub fn kurtosis(col: &Column) -> Column {
244 Column::from_expr(
245 col.expr()
246 .clone()
247 .cast(DataType::Float64)
248 .kurtosis(true, true),
249 Some("kurtosis".to_string()),
250 )
251}
252
253pub fn skewness(col: &Column) -> Column {
255 Column::from_expr(
256 col.expr().clone().cast(DataType::Float64).skew(true),
257 Some("skewness".to_string()),
258 )
259}
260
261pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
263 use polars::prelude::{col as pl_col, len};
264 let c1 = pl_col(col1).cast(DataType::Float64);
265 let c2 = pl_col(col2).cast(DataType::Float64);
266 let n = len().cast(DataType::Float64);
267 let sum_ab = (c1.clone() * c2.clone()).sum();
268 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
269 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
270 (sum_ab - sum_a * sum_b / n.clone()) / n
271}
272
273pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
275 use polars::prelude::{col as pl_col, len, lit, when};
276 let c1 = pl_col(col1).cast(DataType::Float64);
277 let c2 = pl_col(col2).cast(DataType::Float64);
278 let n = len().cast(DataType::Float64);
279 let sum_ab = (c1.clone() * c2.clone()).sum();
280 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
281 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
282 when(len().gt(lit(1)))
283 .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
284 .otherwise(lit(f64::NAN))
285}
286
287pub fn corr_expr(col1: &str, col2: &str) -> Expr {
289 use polars::prelude::{col as pl_col, len, lit, when};
290 let c1 = pl_col(col1).cast(DataType::Float64);
291 let c2 = pl_col(col2).cast(DataType::Float64);
292 let n = len().cast(DataType::Float64);
293 let n1 = (len() - lit(1)).cast(DataType::Float64);
294 let sum_ab = (c1.clone() * c2.clone()).sum();
295 let sum_a = pl_col(col1).sum().cast(DataType::Float64);
296 let sum_b = pl_col(col2).sum().cast(DataType::Float64);
297 let sum_a2 = (c1.clone() * c1).sum();
298 let sum_b2 = (c2.clone() * c2).sum();
299 let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
300 let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
301 let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
302 let std_a = var_a.sqrt();
303 let std_b = var_b.sqrt();
304 when(len().gt(lit(1)))
305 .then(cov_samp / (std_a * std_b))
306 .otherwise(lit(f64::NAN))
307}
308
309fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
312 use polars::prelude::col as pl_col;
313 let y = pl_col(y_col).cast(DataType::Float64);
314 let x = pl_col(x_col).cast(DataType::Float64);
315 let cond = y.clone().is_not_null().and(x.clone().is_not_null());
316 let n = y
317 .clone()
318 .filter(cond.clone())
319 .count()
320 .cast(DataType::Float64);
321 let sum_x = x.clone().filter(cond.clone()).sum();
322 let sum_y = y.clone().filter(cond.clone()).sum();
323 let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
324 let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
325 let sum_xy = (x * y).filter(cond).sum();
326 (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
327}
328
329pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
331 let (n, ..) = regr_cond_and_sums(y_col, x_col);
332 n
333}
334
335pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
337 use polars::prelude::{lit, when};
338 let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
339 when(n.clone().gt(lit(0.0)))
340 .then(sum_x / n)
341 .otherwise(lit(f64::NAN))
342}
343
344pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
346 use polars::prelude::{lit, when};
347 let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
348 when(n.clone().gt(lit(0.0)))
349 .then(sum_y / n)
350 .otherwise(lit(f64::NAN))
351}
352
353pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
355 use polars::prelude::{lit, when};
356 let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
357 when(n.clone().gt(lit(0.0)))
358 .then(sum_xx - sum_x.clone() * sum_x / n)
359 .otherwise(lit(f64::NAN))
360}
361
362pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
364 use polars::prelude::{lit, when};
365 let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
366 when(n.clone().gt(lit(0.0)))
367 .then(sum_yy - sum_y.clone() * sum_y / n)
368 .otherwise(lit(f64::NAN))
369}
370
371pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
373 use polars::prelude::{lit, when};
374 let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
375 when(n.clone().gt(lit(0.0)))
376 .then(sum_xy - sum_x * sum_y / n)
377 .otherwise(lit(f64::NAN))
378}
379
380pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
382 use polars::prelude::{lit, when};
383 let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
384 let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
385 let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
386 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
387 .then(regr_sxy / regr_sxx)
388 .otherwise(lit(f64::NAN))
389}
390
391pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
393 use polars::prelude::{lit, when};
394 let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
395 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
396 let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
397 let slope = regr_sxy.clone() / regr_sxx.clone();
398 let avg_y = sum_y / n.clone();
399 let avg_x = sum_x / n.clone();
400 when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
401 .then(avg_y - slope * avg_x)
402 .otherwise(lit(f64::NAN))
403}
404
405pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
407 use polars::prelude::{lit, when};
408 let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
409 let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
410 let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
411 let regr_sxy = sum_xy - sum_x * sum_y / n;
412 when(
413 regr_sxx
414 .clone()
415 .gt(lit(0.0))
416 .and(regr_syy.clone().gt(lit(0.0))),
417 )
418 .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
419 .otherwise(lit(f64::NAN))
420}
421
422pub fn when(condition: &Column) -> WhenBuilder {
434 WhenBuilder::new(condition.expr().clone())
435}
436
437pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
439 use polars::prelude::*;
440 let null_expr = Expr::Literal(LiteralValue::Null);
441 let expr = polars::prelude::when(condition.expr().clone())
442 .then(value.expr().clone())
443 .otherwise(null_expr);
444 crate::column::Column::from_expr(expr, None)
445}
446
447pub struct WhenBuilder {
449 condition: Expr,
450}
451
452impl WhenBuilder {
453 fn new(condition: Expr) -> Self {
454 WhenBuilder { condition }
455 }
456
457 pub fn then(self, value: &Column) -> ThenBuilder {
459 use polars::prelude::*;
460 let when_then = when(self.condition).then(value.expr().clone());
461 ThenBuilder::new(when_then)
462 }
463
464 pub fn otherwise(self, _value: &Column) -> Column {
469 panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
472 }
473}
474
475pub struct ThenBuilder {
477 when_then: polars::prelude::Then, }
479
480impl ThenBuilder {
481 fn new(when_then: polars::prelude::Then) -> Self {
482 ThenBuilder { when_then }
483 }
484
485 pub fn when(self, _condition: &Column) -> ThenBuilder {
489 self
492 }
493
494 pub fn otherwise(self, value: &Column) -> Column {
496 let expr = self.when_then.otherwise(value.expr().clone());
497 crate::column::Column::from_expr(expr, None)
498 }
499}
500
501pub fn upper(column: &Column) -> Column {
503 column.clone().upper()
504}
505
506pub fn lower(column: &Column) -> Column {
508 column.clone().lower()
509}
510
511pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
513 column.clone().substr(start, length)
514}
515
516pub fn length(column: &Column) -> Column {
518 column.clone().length()
519}
520
521pub fn trim(column: &Column) -> Column {
523 column.clone().trim()
524}
525
526pub fn ltrim(column: &Column) -> Column {
528 column.clone().ltrim()
529}
530
531pub fn rtrim(column: &Column) -> Column {
533 column.clone().rtrim()
534}
535
536pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
538 column.clone().btrim(trim_str)
539}
540
541pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
543 column.clone().locate(substr, pos)
544}
545
546pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
548 column.clone().conv(from_base, to_base)
549}
550
551pub fn hex(column: &Column) -> Column {
553 column.clone().hex()
554}
555
556pub fn unhex(column: &Column) -> Column {
558 column.clone().unhex()
559}
560
561pub fn encode(column: &Column, charset: &str) -> Column {
563 column.clone().encode(charset)
564}
565
566pub fn decode(column: &Column, charset: &str) -> Column {
568 column.clone().decode(charset)
569}
570
571pub fn to_binary(column: &Column, fmt: &str) -> Column {
573 column.clone().to_binary(fmt)
574}
575
576pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
578 column.clone().try_to_binary(fmt)
579}
580
581pub fn aes_encrypt(column: &Column, key: &str) -> Column {
583 column.clone().aes_encrypt(key)
584}
585
586pub fn aes_decrypt(column: &Column, key: &str) -> Column {
588 column.clone().aes_decrypt(key)
589}
590
591pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
593 column.clone().try_aes_decrypt(key)
594}
595
596pub fn bin(column: &Column) -> Column {
598 column.clone().bin()
599}
600
601pub fn getbit(column: &Column, pos: i64) -> Column {
603 column.clone().getbit(pos)
604}
605
606pub fn bit_and(left: &Column, right: &Column) -> Column {
608 left.clone().bit_and(right)
609}
610
611pub fn bit_or(left: &Column, right: &Column) -> Column {
613 left.clone().bit_or(right)
614}
615
616pub fn bit_xor(left: &Column, right: &Column) -> Column {
618 left.clone().bit_xor(right)
619}
620
621pub fn bit_count(column: &Column) -> Column {
623 column.clone().bit_count()
624}
625
626pub fn bitwise_not(column: &Column) -> Column {
628 column.clone().bitwise_not()
629}
630
631pub fn bitmap_bit_position(column: &Column) -> Column {
635 use polars::prelude::DataType;
636 let expr = column.expr().clone().cast(DataType::Int32);
637 Column::from_expr(expr, None)
638}
639
640pub fn bitmap_bucket_number(column: &Column) -> Column {
642 use polars::prelude::DataType;
643 let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
644 Column::from_expr(expr, None)
645}
646
647pub fn bitmap_count(column: &Column) -> Column {
649 use polars::prelude::{DataType, GetOutput};
650 let expr = column.expr().clone().map(
651 crate::udfs::apply_bitmap_count,
652 GetOutput::from_type(DataType::Int64),
653 );
654 Column::from_expr(expr, None)
655}
656
657pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
660 use polars::prelude::{DataType, GetOutput};
661 column.expr().clone().implode().map(
662 crate::udfs::apply_bitmap_construct_agg,
663 GetOutput::from_type(DataType::Binary),
664 )
665}
666
667pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
669 use polars::prelude::{DataType, GetOutput};
670 column.expr().clone().implode().map(
671 crate::udfs::apply_bitmap_or_agg,
672 GetOutput::from_type(DataType::Binary),
673 )
674}
675
676pub fn bit_get(column: &Column, pos: i64) -> Column {
678 getbit(column, pos)
679}
680
681pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
684 column.clone().assert_true(err_msg)
685}
686
687pub fn raise_error(message: &str) -> Column {
689 let msg = message.to_string();
690 let expr = lit(0i64).map(
691 move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
692 Err(PolarsError::ComputeError(msg.clone().into()))
693 },
694 GetOutput::from_type(DataType::Int64),
695 );
696 Column::from_expr(expr, Some("raise_error".to_string()))
697}
698
699pub fn broadcast(df: &DataFrame) -> DataFrame {
701 df.clone()
702}
703
704pub fn spark_partition_id() -> Column {
706 Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
707}
708
709pub fn input_file_name() -> Column {
711 Column::from_expr(lit(""), Some("input_file_name".to_string()))
712}
713
714pub fn monotonically_increasing_id() -> Column {
717 Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
718}
719
720pub fn current_catalog() -> Column {
722 Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
723}
724
725pub fn current_database() -> Column {
727 Column::from_expr(lit("default"), Some("current_database".to_string()))
728}
729
730pub fn current_schema() -> Column {
732 Column::from_expr(lit("default"), Some("current_schema".to_string()))
733}
734
735pub fn current_user() -> Column {
737 Column::from_expr(lit("unknown"), Some("current_user".to_string()))
738}
739
740pub fn user() -> Column {
742 Column::from_expr(lit("unknown"), Some("user".to_string()))
743}
744
745pub fn rand(seed: Option<u64>) -> Column {
748 Column::from_rand(seed)
749}
750
751pub fn randn(seed: Option<u64>) -> Column {
754 Column::from_randn(seed)
755}
756
757pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
760 use polars::prelude::Column as PlColumn;
761
762 let session = crate::session::get_thread_udf_session().ok_or_else(|| {
763 PolarsError::InvalidOperation(
764 "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
765 )
766 })?;
767 let case_sensitive = session.is_case_sensitive();
768
769 #[cfg(feature = "pyo3")]
771 if session
772 .udf_registry
773 .get_python_udf(name, case_sensitive)
774 .is_some()
775 {
776 return Ok(Column::from_udf_call(name.to_string(), cols.to_vec()));
777 }
778
779 let udf = session
781 .udf_registry
782 .get_rust_udf(name, case_sensitive)
783 .ok_or_else(|| {
784 PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
785 })?;
786
787 let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
788 let output_type = DataType::String; let expr = if exprs.len() == 1 {
791 let udf = udf.clone();
792 exprs.into_iter().next().unwrap().map(
793 move |c| {
794 let s = c.take_materialized_series();
795 udf.apply(&[s])
796 .map(|out| Some(PlColumn::new("_".into(), out)))
797 },
798 GetOutput::from_type(output_type),
799 )
800 } else {
801 let udf = udf.clone();
802 let first = exprs[0].clone();
803 let rest: Vec<Expr> = exprs[1..].to_vec();
804 first.map_many(
805 move |columns| {
806 let series: Vec<Series> = columns
807 .iter_mut()
808 .map(|c| std::mem::take(c).take_materialized_series())
809 .collect();
810 udf.apply(&series)
811 .map(|out| Some(PlColumn::new("_".into(), out)))
812 },
813 &rest,
814 GetOutput::from_type(output_type),
815 )
816 };
817
818 Ok(Column::from_expr(expr, Some(format!("{name}()"))))
819}
820
821pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
823 left.clone().arrays_overlap(right)
824}
825
826pub fn arrays_zip(left: &Column, right: &Column) -> Column {
828 left.clone().arrays_zip(right)
829}
830
831pub fn explode_outer(column: &Column) -> Column {
833 column.clone().explode_outer()
834}
835
836pub fn posexplode_outer(column: &Column) -> (Column, Column) {
838 column.clone().posexplode_outer()
839}
840
841pub fn array_agg(column: &Column) -> Column {
843 column.clone().array_agg()
844}
845
846pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
848 column.clone().transform_keys(key_expr)
849}
850
851pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
853 column.clone().transform_values(value_expr)
854}
855
856pub fn str_to_map(
858 column: &Column,
859 pair_delim: Option<&str>,
860 key_value_delim: Option<&str>,
861) -> Column {
862 let pd = pair_delim.unwrap_or(",");
863 let kvd = key_value_delim.unwrap_or(":");
864 column.clone().str_to_map(pd, kvd)
865}
866
867pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
869 column.clone().regexp_extract(pattern, group_index)
870}
871
872pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
874 column.clone().regexp_replace(pattern, replacement)
875}
876
877pub fn split(column: &Column, delimiter: &str) -> Column {
879 column.clone().split(delimiter)
880}
881
882pub fn initcap(column: &Column) -> Column {
884 column.clone().initcap()
885}
886
887pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
889 column.clone().regexp_extract_all(pattern)
890}
891
892pub fn regexp_like(column: &Column, pattern: &str) -> Column {
894 column.clone().regexp_like(pattern)
895}
896
897pub fn regexp_count(column: &Column, pattern: &str) -> Column {
899 column.clone().regexp_count(pattern)
900}
901
902pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
904 column.clone().regexp_substr(pattern)
905}
906
907pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
909 column.clone().split_part(delimiter, part_num)
910}
911
912pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
914 column.clone().regexp_instr(pattern, group_idx)
915}
916
917pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
919 str_column.clone().find_in_set(set_column)
920}
921
922pub fn format_string(format: &str, columns: &[&Column]) -> Column {
924 use polars::prelude::*;
925 if columns.is_empty() {
926 panic!("format_string needs at least one column");
927 }
928 let format_owned = format.to_string();
929 let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
930 let expr = columns[0].expr().clone().map_many(
931 move |cols| crate::udfs::apply_format_string(cols, &format_owned),
932 &args,
933 GetOutput::from_type(DataType::String),
934 );
935 crate::column::Column::from_expr(expr, None)
936}
937
938pub fn printf(format: &str, columns: &[&Column]) -> Column {
940 format_string(format, columns)
941}
942
943pub fn repeat(column: &Column, n: i32) -> Column {
945 column.clone().repeat(n)
946}
947
948pub fn reverse(column: &Column) -> Column {
950 column.clone().reverse()
951}
952
953pub fn instr(column: &Column, substr: &str) -> Column {
955 column.clone().instr(substr)
956}
957
958pub fn position(substr: &str, column: &Column) -> Column {
960 column.clone().instr(substr)
961}
962
963pub fn ascii(column: &Column) -> Column {
965 column.clone().ascii()
966}
967
968pub fn format_number(column: &Column, decimals: u32) -> Column {
970 column.clone().format_number(decimals)
971}
972
973pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
975 column.clone().overlay(replace, pos, length)
976}
977
978pub fn char(column: &Column) -> Column {
980 column.clone().char()
981}
982
983pub fn chr(column: &Column) -> Column {
985 column.clone().chr()
986}
987
988pub fn base64(column: &Column) -> Column {
990 column.clone().base64()
991}
992
993pub fn unbase64(column: &Column) -> Column {
995 column.clone().unbase64()
996}
997
998pub fn sha1(column: &Column) -> Column {
1000 column.clone().sha1()
1001}
1002
1003pub fn sha2(column: &Column, bit_length: i32) -> Column {
1005 column.clone().sha2(bit_length)
1006}
1007
1008pub fn md5(column: &Column) -> Column {
1010 column.clone().md5()
1011}
1012
1013pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1015 column.clone().lpad(length, pad)
1016}
1017
1018pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1020 column.clone().rpad(length, pad)
1021}
1022
1023pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1025 column.clone().translate(from_str, to_str)
1026}
1027
1028pub fn mask(
1030 column: &Column,
1031 upper_char: Option<char>,
1032 lower_char: Option<char>,
1033 digit_char: Option<char>,
1034 other_char: Option<char>,
1035) -> Column {
1036 column
1037 .clone()
1038 .mask(upper_char, lower_char, digit_char, other_char)
1039}
1040
1041pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1043 column.clone().substring_index(delimiter, count)
1044}
1045
1046pub fn left(column: &Column, n: i64) -> Column {
1048 column.clone().left(n)
1049}
1050
1051pub fn right(column: &Column, n: i64) -> Column {
1053 column.clone().right(n)
1054}
1055
1056pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1058 column.clone().replace(search, replacement)
1059}
1060
1061pub fn startswith(column: &Column, prefix: &str) -> Column {
1063 column.clone().startswith(prefix)
1064}
1065
1066pub fn endswith(column: &Column, suffix: &str) -> Column {
1068 column.clone().endswith(suffix)
1069}
1070
1071pub fn contains(column: &Column, substring: &str) -> Column {
1073 column.clone().contains(substring)
1074}
1075
1076pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1079 column.clone().like(pattern, escape_char)
1080}
1081
1082pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1085 column.clone().ilike(pattern, escape_char)
1086}
1087
1088pub fn rlike(column: &Column, pattern: &str) -> Column {
1090 column.clone().regexp_like(pattern)
1091}
1092
1093pub fn regexp(column: &Column, pattern: &str) -> Column {
1095 rlike(column, pattern)
1096}
1097
1098pub fn soundex(column: &Column) -> Column {
1100 column.clone().soundex()
1101}
1102
1103pub fn levenshtein(column: &Column, other: &Column) -> Column {
1105 column.clone().levenshtein(other)
1106}
1107
1108pub fn crc32(column: &Column) -> Column {
1110 column.clone().crc32()
1111}
1112
1113pub fn xxhash64(column: &Column) -> Column {
1115 column.clone().xxhash64()
1116}
1117
1118pub fn abs(column: &Column) -> Column {
1120 column.clone().abs()
1121}
1122
1123pub fn ceil(column: &Column) -> Column {
1125 column.clone().ceil()
1126}
1127
1128pub fn floor(column: &Column) -> Column {
1130 column.clone().floor()
1131}
1132
1133pub fn round(column: &Column, decimals: u32) -> Column {
1135 column.clone().round(decimals)
1136}
1137
1138pub fn bround(column: &Column, scale: i32) -> Column {
1140 column.clone().bround(scale)
1141}
1142
1143pub fn negate(column: &Column) -> Column {
1145 column.clone().negate()
1146}
1147
1148pub fn negative(column: &Column) -> Column {
1150 negate(column)
1151}
1152
1153pub fn positive(column: &Column) -> Column {
1155 column.clone()
1156}
1157
1158pub fn cot(column: &Column) -> Column {
1160 column.clone().cot()
1161}
1162
1163pub fn csc(column: &Column) -> Column {
1165 column.clone().csc()
1166}
1167
1168pub fn sec(column: &Column) -> Column {
1170 column.clone().sec()
1171}
1172
1173pub fn e() -> Column {
1175 Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1176}
1177
1178pub fn pi() -> Column {
1180 Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1181}
1182
1183pub fn sqrt(column: &Column) -> Column {
1185 column.clone().sqrt()
1186}
1187
1188pub fn pow(column: &Column, exp: i64) -> Column {
1190 column.clone().pow(exp)
1191}
1192
1193pub fn exp(column: &Column) -> Column {
1195 column.clone().exp()
1196}
1197
1198pub fn log(column: &Column) -> Column {
1200 column.clone().log()
1201}
1202
1203pub fn log_with_base(column: &Column, base: f64) -> Column {
1205 crate::column::Column::from_expr(column.expr().clone().log(base), None)
1206}
1207
1208pub fn sin(column: &Column) -> Column {
1210 column.clone().sin()
1211}
1212
1213pub fn cos(column: &Column) -> Column {
1215 column.clone().cos()
1216}
1217
1218pub fn tan(column: &Column) -> Column {
1220 column.clone().tan()
1221}
1222
1223pub fn asin(column: &Column) -> Column {
1225 column.clone().asin()
1226}
1227
1228pub fn acos(column: &Column) -> Column {
1230 column.clone().acos()
1231}
1232
1233pub fn atan(column: &Column) -> Column {
1235 column.clone().atan()
1236}
1237
1238pub fn atan2(y: &Column, x: &Column) -> Column {
1240 y.clone().atan2(x)
1241}
1242
1243pub fn degrees(column: &Column) -> Column {
1245 column.clone().degrees()
1246}
1247
1248pub fn radians(column: &Column) -> Column {
1250 column.clone().radians()
1251}
1252
1253pub fn signum(column: &Column) -> Column {
1255 column.clone().signum()
1256}
1257
1258pub fn sign(column: &Column) -> Column {
1260 signum(column)
1261}
1262
1263pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1267 let dtype = parse_type_name(type_name)?;
1268 if dtype == DataType::Boolean {
1269 use polars::prelude::GetOutput;
1270 let expr = column.expr().clone().map(
1271 |col| crate::udfs::apply_string_to_boolean(col, true),
1272 GetOutput::from_type(DataType::Boolean),
1273 );
1274 return Ok(Column::from_expr(expr, None));
1275 }
1276 if dtype == DataType::Date {
1277 use polars::prelude::GetOutput;
1278 let expr = column.expr().clone().map(
1279 |col| crate::udfs::apply_string_to_date(col, true),
1280 GetOutput::from_type(DataType::Date),
1281 );
1282 return Ok(Column::from_expr(expr, None));
1283 }
1284 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1285 use polars::prelude::GetOutput;
1286 let target = dtype.clone();
1287 let expr = column.expr().clone().map(
1288 move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
1289 GetOutput::from_type(dtype),
1290 );
1291 return Ok(Column::from_expr(expr, None));
1292 }
1293 if dtype == DataType::Float64 {
1294 use polars::prelude::GetOutput;
1295 let expr = column.expr().clone().map(
1297 |col| crate::udfs::apply_string_to_double(col, true),
1298 GetOutput::from_type(DataType::Float64),
1299 );
1300 return Ok(Column::from_expr(expr, None));
1301 }
1302 Ok(Column::from_expr(
1303 column.expr().clone().strict_cast(dtype),
1304 None,
1305 ))
1306}
1307
1308pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1312 let dtype = parse_type_name(type_name)?;
1313 if dtype == DataType::Boolean {
1314 use polars::prelude::GetOutput;
1315 let expr = column.expr().clone().map(
1316 |col| crate::udfs::apply_string_to_boolean(col, false),
1317 GetOutput::from_type(DataType::Boolean),
1318 );
1319 return Ok(Column::from_expr(expr, None));
1320 }
1321 if dtype == DataType::Date {
1322 use polars::prelude::GetOutput;
1323 let expr = column.expr().clone().map(
1324 |col| crate::udfs::apply_string_to_date(col, false),
1325 GetOutput::from_type(DataType::Date),
1326 );
1327 return Ok(Column::from_expr(expr, None));
1328 }
1329 if dtype == DataType::Int32 || dtype == DataType::Int64 {
1330 use polars::prelude::GetOutput;
1331 let target = dtype.clone();
1332 let expr = column.expr().clone().map(
1333 move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
1334 GetOutput::from_type(dtype),
1335 );
1336 return Ok(Column::from_expr(expr, None));
1337 }
1338 if dtype == DataType::Float64 {
1339 use polars::prelude::GetOutput;
1340 let expr = column.expr().clone().map(
1341 |col| crate::udfs::apply_string_to_double(col, false),
1342 GetOutput::from_type(DataType::Float64),
1343 );
1344 return Ok(Column::from_expr(expr, None));
1345 }
1346 Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1347}
1348
1349pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1353 match format {
1354 Some(fmt) => Ok(column
1355 .clone()
1356 .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1357 None => cast(column, "string"),
1358 }
1359}
1360
1361pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1363 to_char(column, format)
1364}
1365
1366pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1369 cast(column, "double")
1370}
1371
1372pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1375 try_cast(column, "double")
1376}
1377
1378pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1380 use polars::prelude::{DataType, GetOutput, TimeUnit};
1381 match format {
1382 None => crate::cast(column, "timestamp"),
1383 Some(fmt) => {
1384 let fmt_owned = fmt.to_string();
1385 let expr = column.expr().clone().map(
1386 move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), true),
1387 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1388 );
1389 Ok(crate::column::Column::from_expr(expr, None))
1390 }
1391 }
1392}
1393
1394pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1397 use polars::prelude::*;
1398 match format {
1399 None => try_cast(column, "timestamp"),
1400 Some(fmt) => {
1401 let fmt_owned = fmt.to_string();
1402 let expr = column.expr().clone().map(
1403 move |s| crate::udfs::apply_to_timestamp_format(s, Some(&fmt_owned), false),
1404 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1405 );
1406 Ok(crate::column::Column::from_expr(expr, None))
1407 }
1408 }
1409}
1410
1411pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1413 use polars::prelude::{DataType, GetOutput, TimeUnit};
1414 match format {
1415 None => crate::cast(column, "timestamp"),
1416 Some(fmt) => {
1417 let fmt_owned = fmt.to_string();
1418 let expr = column.expr().clone().map(
1419 move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
1420 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1421 );
1422 Ok(crate::column::Column::from_expr(expr, None))
1423 }
1424 }
1425}
1426
1427pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1429 use polars::prelude::{DataType, GetOutput, TimeUnit};
1430 match format {
1431 None => crate::cast(column, "timestamp"),
1432 Some(fmt) => {
1433 let fmt_owned = fmt.to_string();
1434 let expr = column.expr().clone().map(
1435 move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
1436 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1437 );
1438 Ok(crate::column::Column::from_expr(expr, None))
1439 }
1440 }
1441}
1442
1443pub fn try_divide(left: &Column, right: &Column) -> Column {
1445 use polars::prelude::*;
1446 let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1447 let null_expr = Expr::Literal(LiteralValue::Null);
1448 let div_expr =
1449 left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1450 let expr = polars::prelude::when(zero_cond)
1451 .then(null_expr)
1452 .otherwise(div_expr);
1453 crate::column::Column::from_expr(expr, None)
1454}
1455
1456pub fn try_add(left: &Column, right: &Column) -> Column {
1458 let args = [right.expr().clone()];
1459 let expr =
1460 left.expr()
1461 .clone()
1462 .map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
1463 Column::from_expr(expr, None)
1464}
1465
1466pub fn try_subtract(left: &Column, right: &Column) -> Column {
1468 let args = [right.expr().clone()];
1469 let expr = left.expr().clone().map_many(
1470 crate::udfs::apply_try_subtract,
1471 &args,
1472 GetOutput::same_type(),
1473 );
1474 Column::from_expr(expr, None)
1475}
1476
1477pub fn try_multiply(left: &Column, right: &Column) -> Column {
1479 let args = [right.expr().clone()];
1480 let expr = left.expr().clone().map_many(
1481 crate::udfs::apply_try_multiply,
1482 &args,
1483 GetOutput::same_type(),
1484 );
1485 Column::from_expr(expr, None)
1486}
1487
1488pub fn try_element_at(column: &Column, index: i64) -> Column {
1490 column.clone().element_at(index)
1491}
1492
1493pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1495 if num_bucket <= 0 {
1496 panic!(
1497 "width_bucket: num_bucket must be positive, got {}",
1498 num_bucket
1499 );
1500 }
1501 use polars::prelude::*;
1502 let v = value.expr().clone().cast(DataType::Float64);
1503 let min_expr = lit(min_val);
1504 let max_expr = lit(max_val);
1505 let nb = num_bucket as f64;
1506 let width = (max_val - min_val) / nb;
1507 let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1508 let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1509 let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1510 let expr = polars::prelude::when(v.clone().lt(min_expr))
1511 .then(lit(0i64))
1512 .when(v.gt_eq(max_expr))
1513 .then(lit(num_bucket + 1))
1514 .otherwise(bucket_clamped);
1515 crate::column::Column::from_expr(expr, None)
1516}
1517
1518pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1520 use polars::prelude::*;
1521 if columns.is_empty() {
1522 panic!("elt requires at least one column");
1523 }
1524 let idx_expr = index.expr().clone();
1525 let null_expr = Expr::Literal(LiteralValue::Null);
1526 let mut expr = null_expr;
1527 for (i, c) in columns.iter().enumerate().rev() {
1528 let n = (i + 1) as i64;
1529 expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1530 .then(c.expr().clone())
1531 .otherwise(expr);
1532 }
1533 crate::column::Column::from_expr(expr, None)
1534}
1535
1536pub fn bit_length(column: &Column) -> Column {
1538 column.clone().bit_length()
1539}
1540
1541pub fn octet_length(column: &Column) -> Column {
1543 column.clone().octet_length()
1544}
1545
1546pub fn char_length(column: &Column) -> Column {
1548 column.clone().char_length()
1549}
1550
1551pub fn character_length(column: &Column) -> Column {
1553 column.clone().character_length()
1554}
1555
1556pub fn typeof_(column: &Column) -> Column {
1558 column.clone().typeof_()
1559}
1560
1561pub fn isnan(column: &Column) -> Column {
1563 column.clone().is_nan()
1564}
1565
1566pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1568 if columns.is_empty() {
1569 return Err("greatest requires at least one column".to_string());
1570 }
1571 if columns.len() == 1 {
1572 return Ok((*columns[0]).clone());
1573 }
1574 let mut expr = columns[0].expr().clone();
1575 for c in columns.iter().skip(1) {
1576 let args = [c.expr().clone()];
1577 expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
1578 }
1579 Ok(Column::from_expr(expr, None))
1580}
1581
1582pub fn least(columns: &[&Column]) -> Result<Column, String> {
1584 if columns.is_empty() {
1585 return Err("least requires at least one column".to_string());
1586 }
1587 if columns.len() == 1 {
1588 return Ok((*columns[0]).clone());
1589 }
1590 let mut expr = columns[0].expr().clone();
1591 for c in columns.iter().skip(1) {
1592 let args = [c.expr().clone()];
1593 expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
1594 }
1595 Ok(Column::from_expr(expr, None))
1596}
1597
1598pub fn year(column: &Column) -> Column {
1600 column.clone().year()
1601}
1602
1603pub fn month(column: &Column) -> Column {
1605 column.clone().month()
1606}
1607
1608pub fn day(column: &Column) -> Column {
1610 column.clone().day()
1611}
1612
1613pub fn to_date(column: &Column) -> Column {
1615 column.clone().to_date()
1616}
1617
1618pub fn date_format(column: &Column, format: &str) -> Column {
1620 column
1621 .clone()
1622 .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1623}
1624
1625pub fn current_date() -> Column {
1627 use polars::prelude::*;
1628 let today = chrono::Utc::now().date_naive();
1629 let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1630 crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
1631}
1632
1633pub fn current_timestamp() -> Column {
1635 use polars::prelude::*;
1636 let ts = chrono::Utc::now().timestamp_micros();
1637 crate::column::Column::from_expr(
1638 Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
1639 None,
1640 )
1641}
1642
1643pub fn curdate() -> Column {
1645 current_date()
1646}
1647
1648pub fn now() -> Column {
1650 current_timestamp()
1651}
1652
1653pub fn localtimestamp() -> Column {
1655 current_timestamp()
1656}
1657
1658pub fn date_diff(end: &Column, start: &Column) -> Column {
1660 datediff(end, start)
1661}
1662
1663pub fn dateadd(column: &Column, n: i32) -> Column {
1665 date_add(column, n)
1666}
1667
1668pub fn extract(column: &Column, field: &str) -> Column {
1670 column.clone().extract(field)
1671}
1672
1673pub fn date_part(column: &Column, field: &str) -> Column {
1675 extract(column, field)
1676}
1677
1678pub fn datepart(column: &Column, field: &str) -> Column {
1680 extract(column, field)
1681}
1682
1683pub fn unix_micros(column: &Column) -> Column {
1685 column.clone().unix_micros()
1686}
1687
1688pub fn unix_millis(column: &Column) -> Column {
1690 column.clone().unix_millis()
1691}
1692
1693pub fn unix_seconds(column: &Column) -> Column {
1695 column.clone().unix_seconds()
1696}
1697
1698pub fn dayname(column: &Column) -> Column {
1700 column.clone().dayname()
1701}
1702
1703pub fn weekday(column: &Column) -> Column {
1705 column.clone().weekday()
1706}
1707
1708pub fn hour(column: &Column) -> Column {
1710 column.clone().hour()
1711}
1712
1713pub fn minute(column: &Column) -> Column {
1715 column.clone().minute()
1716}
1717
1718pub fn second(column: &Column) -> Column {
1720 column.clone().second()
1721}
1722
1723pub fn date_add(column: &Column, n: i32) -> Column {
1725 column.clone().date_add(n)
1726}
1727
1728pub fn date_sub(column: &Column, n: i32) -> Column {
1730 column.clone().date_sub(n)
1731}
1732
1733pub fn datediff(end: &Column, start: &Column) -> Column {
1735 start.clone().datediff(end)
1736}
1737
1738pub fn last_day(column: &Column) -> Column {
1740 column.clone().last_day()
1741}
1742
1743pub fn trunc(column: &Column, format: &str) -> Column {
1745 column.clone().trunc(format)
1746}
1747
1748pub fn date_trunc(format: &str, column: &Column) -> Column {
1750 trunc(column, format)
1751}
1752
1753pub fn quarter(column: &Column) -> Column {
1755 column.clone().quarter()
1756}
1757
1758pub fn weekofyear(column: &Column) -> Column {
1760 column.clone().weekofyear()
1761}
1762
1763pub fn dayofweek(column: &Column) -> Column {
1765 column.clone().dayofweek()
1766}
1767
1768pub fn dayofyear(column: &Column) -> Column {
1770 column.clone().dayofyear()
1771}
1772
1773pub fn add_months(column: &Column, n: i32) -> Column {
1775 column.clone().add_months(n)
1776}
1777
1778pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
1781 end.clone().months_between(start, round_off)
1782}
1783
1784pub fn next_day(column: &Column, day_of_week: &str) -> Column {
1786 column.clone().next_day(day_of_week)
1787}
1788
1789pub fn unix_timestamp_now() -> Column {
1791 use polars::prelude::*;
1792 let secs = chrono::Utc::now().timestamp();
1793 crate::column::Column::from_expr(lit(secs), None)
1794}
1795
1796pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1798 column.clone().unix_timestamp(format)
1799}
1800
1801pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
1803 unix_timestamp(column, format)
1804}
1805
1806pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
1808 column.clone().from_unixtime(format)
1809}
1810
1811pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
1813 use polars::prelude::*;
1814 let args = [month.expr().clone(), day.expr().clone()];
1815 let expr = year.expr().clone().map_many(
1816 crate::udfs::apply_make_date,
1817 &args,
1818 GetOutput::from_type(DataType::Date),
1819 );
1820 crate::column::Column::from_expr(expr, None)
1821}
1822
1823pub fn make_timestamp(
1826 year: &Column,
1827 month: &Column,
1828 day: &Column,
1829 hour: &Column,
1830 minute: &Column,
1831 sec: &Column,
1832 timezone: Option<&str>,
1833) -> Column {
1834 use polars::prelude::*;
1835 let tz_owned = timezone.map(|s| s.to_string());
1836 let args = [
1837 month.expr().clone(),
1838 day.expr().clone(),
1839 hour.expr().clone(),
1840 minute.expr().clone(),
1841 sec.expr().clone(),
1842 ];
1843 let expr = year.expr().clone().map_many(
1844 move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
1845 &args,
1846 GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
1847 );
1848 crate::column::Column::from_expr(expr, None)
1849}
1850
1851pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
1853 ts.clone().timestampadd(unit, amount)
1854}
1855
1856pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
1858 start.clone().timestampdiff(unit, end)
1859}
1860
1861pub fn days(n: i64) -> Column {
1863 make_interval(0, 0, 0, n, 0, 0, 0)
1864}
1865
1866pub fn hours(n: i64) -> Column {
1868 make_interval(0, 0, 0, 0, n, 0, 0)
1869}
1870
1871pub fn minutes(n: i64) -> Column {
1873 make_interval(0, 0, 0, 0, 0, n, 0)
1874}
1875
1876pub fn months(n: i64) -> Column {
1878 make_interval(0, n, 0, 0, 0, 0, 0)
1879}
1880
1881pub fn years(n: i64) -> Column {
1883 make_interval(n, 0, 0, 0, 0, 0, 0)
1884}
1885
1886pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
1888 column.clone().from_utc_timestamp(tz)
1889}
1890
1891pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
1893 column.clone().to_utc_timestamp(tz)
1894}
1895
1896pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
1898 let source_tz = source_tz.to_string();
1899 let target_tz = target_tz.to_string();
1900 let expr = column.expr().clone().map(
1901 move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
1902 GetOutput::same_type(),
1903 );
1904 crate::column::Column::from_expr(expr, None)
1905}
1906
1907pub fn current_timezone() -> Column {
1909 use polars::prelude::*;
1910 crate::column::Column::from_expr(lit("UTC"), None)
1911}
1912
1913pub fn make_interval(
1915 years: i64,
1916 months: i64,
1917 weeks: i64,
1918 days: i64,
1919 hours: i64,
1920 mins: i64,
1921 secs: i64,
1922) -> Column {
1923 use polars::prelude::*;
1924 let total_days = years * 365 + months * 30 + weeks * 7 + days;
1926 let args = DurationArgs::new()
1927 .with_days(lit(total_days))
1928 .with_hours(lit(hours))
1929 .with_minutes(lit(mins))
1930 .with_seconds(lit(secs));
1931 let dur = duration(args);
1932 crate::column::Column::from_expr(dur, None)
1933}
1934
1935pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
1937 use polars::prelude::*;
1938 let args = DurationArgs::new()
1939 .with_days(lit(days))
1940 .with_hours(lit(hours))
1941 .with_minutes(lit(minutes))
1942 .with_seconds(lit(seconds));
1943 let dur = duration(args);
1944 crate::column::Column::from_expr(dur, None)
1945}
1946
1947pub fn make_ym_interval(years: i32, months: i32) -> Column {
1949 use polars::prelude::*;
1950 let total_months = years * 12 + months;
1951 crate::column::Column::from_expr(lit(total_months), None)
1952}
1953
1954pub fn make_timestamp_ntz(
1956 year: &Column,
1957 month: &Column,
1958 day: &Column,
1959 hour: &Column,
1960 minute: &Column,
1961 sec: &Column,
1962) -> Column {
1963 make_timestamp(year, month, day, hour, minute, sec, None)
1964}
1965
1966pub fn timestamp_seconds(column: &Column) -> Column {
1968 column.clone().timestamp_seconds()
1969}
1970
1971pub fn timestamp_millis(column: &Column) -> Column {
1973 column.clone().timestamp_millis()
1974}
1975
1976pub fn timestamp_micros(column: &Column) -> Column {
1978 column.clone().timestamp_micros()
1979}
1980
1981pub fn unix_date(column: &Column) -> Column {
1983 column.clone().unix_date()
1984}
1985
1986pub fn date_from_unix_date(column: &Column) -> Column {
1988 column.clone().date_from_unix_date()
1989}
1990
1991pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
1993 dividend.clone().pmod(divisor)
1994}
1995
1996pub fn factorial(column: &Column) -> Column {
1998 column.clone().factorial()
1999}
2000
2001pub fn concat(columns: &[&Column]) -> Column {
2003 use polars::prelude::*;
2004 if columns.is_empty() {
2005 panic!("concat requires at least one column");
2006 }
2007 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2008 crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2009}
2010
2011pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2013 use polars::prelude::*;
2014 if columns.is_empty() {
2015 panic!("concat_ws requires at least one column");
2016 }
2017 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2018 crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2019}
2020
2021pub fn row_number(column: &Column) -> Column {
2031 column.clone().row_number(false)
2032}
2033
2034pub fn rank(column: &Column, descending: bool) -> Column {
2036 column.clone().rank(descending)
2037}
2038
2039pub fn dense_rank(column: &Column, descending: bool) -> Column {
2041 column.clone().dense_rank(descending)
2042}
2043
2044pub fn lag(column: &Column, n: i64) -> Column {
2046 column.clone().lag(n)
2047}
2048
2049pub fn lead(column: &Column, n: i64) -> Column {
2051 column.clone().lead(n)
2052}
2053
2054pub fn first_value(column: &Column) -> Column {
2056 column.clone().first_value()
2057}
2058
2059pub fn last_value(column: &Column) -> Column {
2061 column.clone().last_value()
2062}
2063
2064pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2066 column.clone().percent_rank(partition_by, descending)
2067}
2068
2069pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2071 column.clone().cume_dist(partition_by, descending)
2072}
2073
2074pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2076 column.clone().ntile(n, partition_by, descending)
2077}
2078
2079pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2081 column.clone().nth_value(n, partition_by, descending)
2082}
2083
2084pub fn coalesce(columns: &[&Column]) -> Column {
2094 use polars::prelude::*;
2095 if columns.is_empty() {
2096 panic!("coalesce requires at least one column");
2097 }
2098 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2099 let expr = coalesce(&exprs);
2100 crate::column::Column::from_expr(expr, None)
2101}
2102
2103pub fn nvl(column: &Column, value: &Column) -> Column {
2105 coalesce(&[column, value])
2106}
2107
2108pub fn ifnull(column: &Column, value: &Column) -> Column {
2110 nvl(column, value)
2111}
2112
2113pub fn nullif(column: &Column, value: &Column) -> Column {
2115 use polars::prelude::*;
2116 let cond = column.expr().clone().eq(value.expr().clone());
2117 let null_lit = Expr::Literal(LiteralValue::Null);
2118 let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2119 crate::column::Column::from_expr(expr, None)
2120}
2121
2122pub fn nanvl(column: &Column, value: &Column) -> Column {
2124 use polars::prelude::*;
2125 let cond = column.expr().clone().is_nan();
2126 let expr = when(cond)
2127 .then(value.expr().clone())
2128 .otherwise(column.expr().clone());
2129 crate::column::Column::from_expr(expr, None)
2130}
2131
2132pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2134 use polars::prelude::*;
2135 let cond = col1.expr().clone().is_not_null();
2136 let expr = when(cond)
2137 .then(col2.expr().clone())
2138 .otherwise(col3.expr().clone());
2139 crate::column::Column::from_expr(expr, None)
2140}
2141
2142pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2144 substring(column, start, length)
2145}
2146
2147pub fn power(column: &Column, exp: i64) -> Column {
2149 pow(column, exp)
2150}
2151
2152pub fn ln(column: &Column) -> Column {
2154 log(column)
2155}
2156
2157pub fn ceiling(column: &Column) -> Column {
2159 ceil(column)
2160}
2161
2162pub fn lcase(column: &Column) -> Column {
2164 lower(column)
2165}
2166
2167pub fn ucase(column: &Column) -> Column {
2169 upper(column)
2170}
2171
2172pub fn dayofmonth(column: &Column) -> Column {
2174 day(column)
2175}
2176
2177pub fn to_degrees(column: &Column) -> Column {
2179 degrees(column)
2180}
2181
2182pub fn to_radians(column: &Column) -> Column {
2184 radians(column)
2185}
2186
2187pub fn cosh(column: &Column) -> Column {
2189 column.clone().cosh()
2190}
2191pub fn sinh(column: &Column) -> Column {
2193 column.clone().sinh()
2194}
2195pub fn tanh(column: &Column) -> Column {
2197 column.clone().tanh()
2198}
2199pub fn acosh(column: &Column) -> Column {
2201 column.clone().acosh()
2202}
2203pub fn asinh(column: &Column) -> Column {
2205 column.clone().asinh()
2206}
2207pub fn atanh(column: &Column) -> Column {
2209 column.clone().atanh()
2210}
2211pub fn cbrt(column: &Column) -> Column {
2213 column.clone().cbrt()
2214}
2215pub fn expm1(column: &Column) -> Column {
2217 column.clone().expm1()
2218}
2219pub fn log1p(column: &Column) -> Column {
2221 column.clone().log1p()
2222}
2223pub fn log10(column: &Column) -> Column {
2225 column.clone().log10()
2226}
2227pub fn log2(column: &Column) -> Column {
2229 column.clone().log2()
2230}
2231pub fn rint(column: &Column) -> Column {
2233 column.clone().rint()
2234}
2235pub fn hypot(x: &Column, y: &Column) -> Column {
2237 let xx = x.expr().clone() * x.expr().clone();
2238 let yy = y.expr().clone() * y.expr().clone();
2239 crate::column::Column::from_expr((xx + yy).sqrt(), None)
2240}
2241
2242pub fn isnull(column: &Column) -> Column {
2244 column.clone().is_null()
2245}
2246
2247pub fn isnotnull(column: &Column) -> Column {
2249 column.clone().is_not_null()
2250}
2251
2252pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2254 use polars::prelude::*;
2255 if columns.is_empty() {
2256 return Err(PolarsError::ComputeError(
2257 "array requires at least one column".into(),
2258 ));
2259 }
2260 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2261 let expr = concat_list(exprs)
2262 .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2263 Ok(crate::column::Column::from_expr(expr, None))
2264}
2265
2266pub fn array_size(column: &Column) -> Column {
2268 column.clone().array_size()
2269}
2270
2271pub fn size(column: &Column) -> Column {
2273 column.clone().array_size()
2274}
2275
2276pub fn cardinality(column: &Column) -> Column {
2278 column.clone().cardinality()
2279}
2280
2281pub fn array_contains(column: &Column, value: &Column) -> Column {
2283 column.clone().array_contains(value.expr().clone())
2284}
2285
2286pub fn array_join(column: &Column, separator: &str) -> Column {
2288 column.clone().array_join(separator)
2289}
2290
2291pub fn array_max(column: &Column) -> Column {
2293 column.clone().array_max()
2294}
2295
2296pub fn array_min(column: &Column) -> Column {
2298 column.clone().array_min()
2299}
2300
2301pub fn element_at(column: &Column, index: i64) -> Column {
2303 column.clone().element_at(index)
2304}
2305
2306pub fn array_sort(column: &Column) -> Column {
2308 column.clone().array_sort()
2309}
2310
2311pub fn array_distinct(column: &Column) -> Column {
2313 column.clone().array_distinct()
2314}
2315
2316pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2318 column.clone().array_slice(start, length)
2319}
2320
2321pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2324 use polars::prelude::{as_struct, lit, DataType, GetOutput};
2325 let step_expr = step
2326 .map(|c| c.expr().clone().alias("2"))
2327 .unwrap_or_else(|| lit(1i64).alias("2"));
2328 let struct_expr = as_struct(vec![
2329 start.expr().clone().alias("0"),
2330 stop.expr().clone().alias("1"),
2331 step_expr,
2332 ]);
2333 let out_dtype = DataType::List(Box::new(DataType::Int64));
2334 let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
2335 crate::column::Column::from_expr(expr, None)
2336}
2337
2338pub fn shuffle(column: &Column) -> Column {
2340 use polars::prelude::GetOutput;
2341 let expr = column
2342 .expr()
2343 .clone()
2344 .map(crate::udfs::apply_shuffle, GetOutput::same_type());
2345 crate::column::Column::from_expr(expr, None)
2346}
2347
2348pub fn inline(column: &Column) -> Column {
2351 column.clone().explode()
2352}
2353
2354pub fn inline_outer(column: &Column) -> Column {
2356 column.clone().explode_outer()
2357}
2358
2359pub fn explode(column: &Column) -> Column {
2361 column.clone().explode()
2362}
2363
2364pub fn array_position(column: &Column, value: &Column) -> Column {
2367 column.clone().array_position(value.expr().clone())
2368}
2369
2370pub fn array_compact(column: &Column) -> Column {
2372 column.clone().array_compact()
2373}
2374
2375pub fn array_remove(column: &Column, value: &Column) -> Column {
2378 column.clone().array_remove(value.expr().clone())
2379}
2380
2381pub fn array_repeat(column: &Column, n: i64) -> Column {
2383 column.clone().array_repeat(n)
2384}
2385
2386pub fn array_flatten(column: &Column) -> Column {
2388 column.clone().array_flatten()
2389}
2390
2391pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2393 column.clone().array_exists(predicate)
2394}
2395
2396pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2398 column.clone().array_forall(predicate)
2399}
2400
2401pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2403 column.clone().array_filter(predicate)
2404}
2405
2406pub fn array_transform(column: &Column, f: Expr) -> Column {
2408 column.clone().array_transform(f)
2409}
2410
2411pub fn array_sum(column: &Column) -> Column {
2413 column.clone().array_sum()
2414}
2415
2416pub fn aggregate(column: &Column, zero: &Column) -> Column {
2418 column.clone().array_aggregate(zero)
2419}
2420
2421pub fn array_mean(column: &Column) -> Column {
2423 column.clone().array_mean()
2424}
2425
2426pub fn posexplode(column: &Column) -> (Column, Column) {
2429 column.clone().posexplode()
2430}
2431
2432pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2435 use polars::prelude::{as_struct, concat_list};
2436 if key_values.is_empty() {
2437 return Err(PolarsError::ComputeError(
2438 "create_map requires at least one key-value pair".into(),
2439 ));
2440 }
2441 let mut struct_exprs: Vec<Expr> = Vec::new();
2442 for i in (0..key_values.len()).step_by(2) {
2443 if i + 1 < key_values.len() {
2444 let k = key_values[i].expr().clone().alias("key");
2445 let v = key_values[i + 1].expr().clone().alias("value");
2446 struct_exprs.push(as_struct(vec![k, v]));
2447 }
2448 }
2449 let expr = concat_list(struct_exprs)
2450 .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2451 Ok(crate::column::Column::from_expr(expr, None))
2452}
2453
2454pub fn map_keys(column: &Column) -> Column {
2456 column.clone().map_keys()
2457}
2458
2459pub fn map_values(column: &Column) -> Column {
2461 column.clone().map_values()
2462}
2463
2464pub fn map_entries(column: &Column) -> Column {
2466 column.clone().map_entries()
2467}
2468
2469pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2471 keys.clone().map_from_arrays(values)
2472}
2473
2474pub fn map_concat(a: &Column, b: &Column) -> Column {
2476 a.clone().map_concat(b)
2477}
2478
2479pub fn map_from_entries(column: &Column) -> Column {
2481 column.clone().map_from_entries()
2482}
2483
2484pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2486 map_col.clone().map_contains_key(key)
2487}
2488
2489pub fn get(map_col: &Column, key: &Column) -> Column {
2491 map_col.clone().get(key)
2492}
2493
2494pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2496 map_col.clone().map_filter(predicate)
2497}
2498
2499pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2501 map1.clone().map_zip_with(map2, merge)
2502}
2503
2504pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2506 use polars::prelude::col;
2507 let left_field = col("").struct_().field_by_name("left");
2508 let right_field = col("").struct_().field_by_name("right");
2509 let merge = crate::column::Column::from_expr(
2510 coalesce(&[
2511 &crate::column::Column::from_expr(left_field, None),
2512 &crate::column::Column::from_expr(right_field, None),
2513 ])
2514 .into_expr(),
2515 None,
2516 );
2517 left.clone().zip_with(right, merge.into_expr())
2518}
2519
2520pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2522 use polars::prelude::col;
2523 let v1 = col("").struct_().field_by_name("value1");
2524 let v2 = col("").struct_().field_by_name("value2");
2525 let merge = coalesce(&[
2526 &crate::column::Column::from_expr(v1, None),
2527 &crate::column::Column::from_expr(v2, None),
2528 ])
2529 .into_expr();
2530 map1.clone().map_zip_with(map2, merge)
2531}
2532
2533pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2535 use polars::prelude::{col, lit};
2536 let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2537 map_col.clone().map_filter(pred)
2538}
2539
2540pub fn struct_(columns: &[&Column]) -> Column {
2542 use polars::prelude::as_struct;
2543 if columns.is_empty() {
2544 panic!("struct requires at least one column");
2545 }
2546 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2547 crate::column::Column::from_expr(as_struct(exprs), None)
2548}
2549
2550pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2552 use polars::prelude::as_struct;
2553 if pairs.is_empty() {
2554 panic!("named_struct requires at least one (name, column) pair");
2555 }
2556 let exprs: Vec<Expr> = pairs
2557 .iter()
2558 .map(|(name, col)| col.expr().clone().alias(*name))
2559 .collect();
2560 crate::column::Column::from_expr(as_struct(exprs), None)
2561}
2562
2563pub fn array_append(array: &Column, elem: &Column) -> Column {
2565 array.clone().array_append(elem)
2566}
2567
2568pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2570 array.clone().array_prepend(elem)
2571}
2572
2573pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2575 array.clone().array_insert(pos, elem)
2576}
2577
2578pub fn array_except(a: &Column, b: &Column) -> Column {
2580 a.clone().array_except(b)
2581}
2582
2583pub fn array_intersect(a: &Column, b: &Column) -> Column {
2585 a.clone().array_intersect(b)
2586}
2587
2588pub fn array_union(a: &Column, b: &Column) -> Column {
2590 a.clone().array_union(b)
2591}
2592
2593pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2595 left.clone().zip_with(right, merge)
2596}
2597
2598pub fn get_json_object(column: &Column, path: &str) -> Column {
2600 column.clone().get_json_object(path)
2601}
2602
2603pub fn json_object_keys(column: &Column) -> Column {
2605 column.clone().json_object_keys()
2606}
2607
2608pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2610 column.clone().json_tuple(keys)
2611}
2612
2613pub fn from_csv(column: &Column) -> Column {
2615 column.clone().from_csv()
2616}
2617
2618pub fn to_csv(column: &Column) -> Column {
2620 column.clone().to_csv()
2621}
2622
2623pub fn schema_of_csv(_column: &Column) -> Column {
2625 Column::from_expr(
2626 lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2627 Some("schema_of_csv".to_string()),
2628 )
2629}
2630
2631pub fn schema_of_json(_column: &Column) -> Column {
2633 Column::from_expr(
2634 lit("STRUCT<>".to_string()),
2635 Some("schema_of_json".to_string()),
2636 )
2637}
2638
2639pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2641 column.clone().from_json(schema)
2642}
2643
2644pub fn to_json(column: &Column) -> Column {
2646 column.clone().to_json()
2647}
2648
2649pub fn isin(column: &Column, other: &Column) -> Column {
2651 column.clone().isin(other)
2652}
2653
2654pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2656 let s = Series::from_iter(values.iter().cloned());
2657 Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2658}
2659
2660pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2662 let s: Series = Series::from_iter(values.iter().copied());
2663 Column::from_expr(column.expr().clone().is_in(lit(s)), None)
2664}
2665
2666pub fn url_decode(column: &Column) -> Column {
2668 column.clone().url_decode()
2669}
2670
2671pub fn url_encode(column: &Column) -> Column {
2673 column.clone().url_encode()
2674}
2675
2676pub fn shift_left(column: &Column, n: i32) -> Column {
2678 column.clone().shift_left(n)
2679}
2680
2681pub fn shift_right(column: &Column, n: i32) -> Column {
2683 column.clone().shift_right(n)
2684}
2685
2686pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2688 column.clone().shift_right_unsigned(n)
2689}
2690
2691pub fn version() -> Column {
2693 Column::from_expr(
2694 lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2695 None,
2696 )
2697}
2698
2699pub fn equal_null(left: &Column, right: &Column) -> Column {
2701 left.clone().eq_null_safe(right)
2702}
2703
2704pub fn json_array_length(column: &Column, path: &str) -> Column {
2706 column.clone().json_array_length(path)
2707}
2708
2709pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2712 column.clone().parse_url(part, key)
2713}
2714
2715pub fn hash(columns: &[&Column]) -> Column {
2717 use polars::prelude::*;
2718 if columns.is_empty() {
2719 return crate::column::Column::from_expr(lit(0i64), None);
2720 }
2721 if columns.len() == 1 {
2722 return columns[0].clone().hash();
2723 }
2724 let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2725 let struct_expr = polars::prelude::as_struct(exprs);
2726 let name = columns[0].name().to_string();
2727 let expr = struct_expr.map(
2728 crate::udfs::apply_hash_struct,
2729 GetOutput::from_type(DataType::Int64),
2730 );
2731 crate::column::Column::from_expr(expr, Some(name))
2732}
2733
2734pub fn stack(columns: &[&Column]) -> Column {
2736 struct_(columns)
2737}
2738
2739#[cfg(test)]
2740mod tests {
2741 use super::*;
2742 use polars::prelude::{df, IntoLazy};
2743
2744 #[test]
2745 fn test_col_creates_column() {
2746 let column = col("test");
2747 assert_eq!(column.name(), "test");
2748 }
2749
2750 #[test]
2751 fn test_lit_i32() {
2752 let column = lit_i32(42);
2753 assert_eq!(column.name(), "<expr>");
2755 }
2756
2757 #[test]
2758 fn test_lit_i64() {
2759 let column = lit_i64(123456789012345i64);
2760 assert_eq!(column.name(), "<expr>");
2761 }
2762
2763 #[test]
2764 fn test_lit_f64() {
2765 let column = lit_f64(std::f64::consts::PI);
2766 assert_eq!(column.name(), "<expr>");
2767 }
2768
2769 #[test]
2770 fn test_lit_bool() {
2771 let column = lit_bool(true);
2772 assert_eq!(column.name(), "<expr>");
2773 }
2774
2775 #[test]
2776 fn test_lit_str() {
2777 let column = lit_str("hello");
2778 assert_eq!(column.name(), "<expr>");
2779 }
2780
2781 #[test]
2782 fn test_count_aggregation() {
2783 let column = col("value");
2784 let result = count(&column);
2785 assert_eq!(result.name(), "count");
2786 }
2787
2788 #[test]
2789 fn test_sum_aggregation() {
2790 let column = col("value");
2791 let result = sum(&column);
2792 assert_eq!(result.name(), "sum");
2793 }
2794
2795 #[test]
2796 fn test_avg_aggregation() {
2797 let column = col("value");
2798 let result = avg(&column);
2799 assert_eq!(result.name(), "avg");
2800 }
2801
2802 #[test]
2803 fn test_max_aggregation() {
2804 let column = col("value");
2805 let result = max(&column);
2806 assert_eq!(result.name(), "max");
2807 }
2808
2809 #[test]
2810 fn test_min_aggregation() {
2811 let column = col("value");
2812 let result = min(&column);
2813 assert_eq!(result.name(), "min");
2814 }
2815
2816 #[test]
2817 fn test_when_then_otherwise() {
2818 let df = df!(
2820 "age" => &[15, 25, 35]
2821 )
2822 .unwrap();
2823
2824 let age_col = col("age");
2826 let condition = age_col.gt(polars::prelude::lit(18));
2827 let result = when(&condition)
2828 .then(&lit_str("adult"))
2829 .otherwise(&lit_str("minor"));
2830
2831 let result_df = df
2833 .lazy()
2834 .with_column(result.into_expr().alias("status"))
2835 .collect()
2836 .unwrap();
2837
2838 let status_col = result_df.column("status").unwrap();
2840 let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
2841
2842 assert_eq!(values[0], Some("minor")); assert_eq!(values[1], Some("adult")); assert_eq!(values[2], Some("adult")); }
2846
2847 #[test]
2848 fn test_coalesce_returns_first_non_null() {
2849 let df = df!(
2851 "a" => &[Some(1), None, None],
2852 "b" => &[None, Some(2), None],
2853 "c" => &[None, None, Some(3)]
2854 )
2855 .unwrap();
2856
2857 let col_a = col("a");
2858 let col_b = col("b");
2859 let col_c = col("c");
2860 let result = coalesce(&[&col_a, &col_b, &col_c]);
2861
2862 let result_df = df
2864 .lazy()
2865 .with_column(result.into_expr().alias("coalesced"))
2866 .collect()
2867 .unwrap();
2868
2869 let coalesced_col = result_df.column("coalesced").unwrap();
2871 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2872
2873 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(2)); assert_eq!(values[2], Some(3)); }
2877
2878 #[test]
2879 fn test_coalesce_with_literal_fallback() {
2880 let df = df!(
2882 "a" => &[Some(1), None],
2883 "b" => &[None::<i32>, None::<i32>]
2884 )
2885 .unwrap();
2886
2887 let col_a = col("a");
2888 let col_b = col("b");
2889 let fallback = lit_i32(0);
2890 let result = coalesce(&[&col_a, &col_b, &fallback]);
2891
2892 let result_df = df
2894 .lazy()
2895 .with_column(result.into_expr().alias("coalesced"))
2896 .collect()
2897 .unwrap();
2898
2899 let coalesced_col = result_df.column("coalesced").unwrap();
2901 let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
2902
2903 assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(0)); }
2906
2907 #[test]
2908 #[should_panic(expected = "coalesce requires at least one column")]
2909 fn test_coalesce_empty_panics() {
2910 let columns: [&Column; 0] = [];
2911 let _ = coalesce(&columns);
2912 }
2913
2914 #[test]
2915 fn test_cast_double_string_column_strict_ok() {
2916 let df = df!(
2918 "s" => &["123", " 45.5 ", "0"]
2919 )
2920 .unwrap();
2921
2922 let s_col = col("s");
2923 let cast_col = cast(&s_col, "double").unwrap();
2924
2925 let out = df
2926 .lazy()
2927 .with_column(cast_col.into_expr().alias("v"))
2928 .collect()
2929 .unwrap();
2930
2931 let v = out.column("v").unwrap();
2932 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
2933 assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
2934 }
2935
2936 #[test]
2937 fn test_try_cast_double_string_column_invalid_to_null() {
2938 let df = df!(
2940 "s" => &["123", " 45.5 ", "abc", ""]
2941 )
2942 .unwrap();
2943
2944 let s_col = col("s");
2945 let try_cast_col = try_cast(&s_col, "double").unwrap();
2946
2947 let out = df
2948 .lazy()
2949 .with_column(try_cast_col.into_expr().alias("v"))
2950 .collect()
2951 .unwrap();
2952
2953 let v = out.column("v").unwrap();
2954 let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
2955 assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
2956 }
2957
2958 #[test]
2959 fn test_to_number_and_try_to_number_numerics_and_strings() {
2960 let df = df!(
2962 "i" => &[1i32, 2, 3],
2963 "f" => &[1.5f64, 2.5, 3.5],
2964 "s" => &["10", "20.5", "xyz"]
2965 )
2966 .unwrap();
2967
2968 let i_col = col("i");
2969 let f_col = col("f");
2970 let s_col = col("s");
2971
2972 let to_number_i = to_number(&i_col, None).unwrap();
2973 let to_number_f = to_number(&f_col, None).unwrap();
2974 let try_to_number_s = try_to_number(&s_col, None).unwrap();
2975
2976 let out = df
2977 .lazy()
2978 .with_columns([
2979 to_number_i.into_expr().alias("i_num"),
2980 to_number_f.into_expr().alias("f_num"),
2981 try_to_number_s.into_expr().alias("s_num"),
2982 ])
2983 .collect()
2984 .unwrap();
2985
2986 let i_num = out.column("i_num").unwrap();
2987 let f_num = out.column("f_num").unwrap();
2988 let s_num = out.column("s_num").unwrap();
2989
2990 let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
2991 let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
2992 let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
2993
2994 assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
2995 assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
2996 assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
2997 }
2998}