1use polars::prelude::{
2 DataType, Expr, Field, PolarsError, PolarsResult, RankMethod, RankOptions, SortOptions,
3 TimeUnit, WindowMapping, col, lit,
4};
5use polars_plan::dsl::AggExpr;
6use std::ops::Neg;
7
8#[inline]
10pub(crate) fn expect_col(
11 r: PolarsResult<Option<polars::prelude::Column>>,
12) -> PolarsResult<polars::prelude::Column> {
13 r.and_then(|o| o.ok_or_else(|| PolarsError::ComputeError("expected column".into())))
14}
15
16fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
19 let mut out = String::with_capacity(pattern.len() * 2);
20 let mut it = pattern.chars();
21 while let Some(c) = it.next() {
22 if escape_char == Some(c) {
23 if let Some(next) = it.next() {
24 if "\\.*+?[](){}^$|".contains(next) {
26 out.push('\\');
27 }
28 out.push(next);
29 } else {
30 out.push('\\');
31 out.push(c);
32 }
33 } else {
34 match c {
35 '%' => out.push_str(".*"),
36 '_' => out.push('.'),
37 '\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
38 | '|' => {
39 out.push('\\');
40 out.push(c);
41 }
42 _ => out.push(c),
43 }
44 }
45 }
46 format!("^{out}$")
47}
48
49fn pyspark_trunc_format_to_polars_duration(format: &str) -> String {
52 match format.to_lowercase().as_str() {
53 "year" | "years" => "1y".to_string(),
54 "month" | "months" => "1mo".to_string(),
55 "week" | "weeks" | "wk" => "1w".to_string(),
56 "day" | "days" => "1d".to_string(),
57 "hour" | "hours" => "1h".to_string(),
58 "minute" | "minutes" | "min" => "1m".to_string(),
59 "second" | "seconds" | "sec" => "1s".to_string(),
60 "quarter" | "quarters" | "q" => "1q".to_string(),
61 _ => format.to_string(), }
63}
64
65#[derive(Debug, Clone, Copy)]
67pub enum DeferredRandom {
68 Rand(Option<u64>),
69 Randn(Option<u64>),
70}
71
72#[derive(Debug, Clone)]
74pub struct FirstLastValue {
75 pub value_expr: Expr,
77 pub is_last: bool,
79}
80
81#[derive(Debug, Clone)]
85pub struct Column {
86 name: String,
87 expr: Expr, pub(crate) is_array_expr: bool,
91 pub deferred: Option<DeferredRandom>,
93 pub udf_call: Option<(String, Vec<Column>)>,
95 pub source_for_running: Option<String>,
97 pub source_for_running_mean: Option<String>,
99 pub first_last_value: Option<FirstLastValue>,
101 pub source_for_running_count: Option<String>,
103}
104
105fn expr_is_or_contains_n_unique(expr: &Expr) -> bool {
107 match expr {
108 Expr::Agg(AggExpr::NUnique(_)) => true,
109 Expr::Cast { expr: inner, .. } => expr_is_or_contains_n_unique(inner.as_ref()),
110 Expr::Alias(inner, _) => expr_is_or_contains_n_unique(inner.as_ref()),
111 _ => false,
112 }
113}
114
115impl Column {
116 pub fn new(name: String) -> Self {
118 Column {
119 name: name.clone(),
120 expr: col(&name),
121 is_array_expr: false,
122 deferred: None,
123 udf_call: None,
124 source_for_running: None,
125 source_for_running_mean: None,
126 first_last_value: None,
127 source_for_running_count: None,
128 }
129 }
130
131 pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
133 let display_name = name.unwrap_or_else(|| "<expr>".to_string());
134 Column {
135 name: display_name,
136 expr,
137 is_array_expr: false,
138 deferred: None,
139 udf_call: None,
140 source_for_running: None,
141 source_for_running_mean: None,
142 first_last_value: None,
143 source_for_running_count: None,
144 }
145 }
146
147 pub fn from_last_agg(col: &Column) -> Self {
150 let value_expr = col.expr().clone();
151 let expr = value_expr.clone().last();
152 Column {
153 name: "last".to_string(),
154 expr,
155 is_array_expr: false,
156 deferred: None,
157 udf_call: None,
158 source_for_running: None,
159 source_for_running_mean: None,
160 first_last_value: Some(FirstLastValue {
161 value_expr,
162 is_last: true,
163 }),
164 source_for_running_count: None,
165 }
166 }
167
168 pub fn from_udf_call(name: String, args: Vec<Column>) -> Self {
170 Column {
171 name: format!("{name}()"),
172 expr: lit(0i32), is_array_expr: false,
174 deferred: None,
175 udf_call: Some((name, args)),
176 source_for_running: None,
177 source_for_running_mean: None,
178 first_last_value: None,
179 source_for_running_count: None,
180 }
181 }
182
183 pub fn from_rand(seed: Option<u64>) -> Self {
185 let expr = lit(1i64).cum_sum(false).map(
186 move |c| expect_col(crate::udfs::apply_rand_with_seed(c, seed)),
187 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
188 );
189 Column {
190 name: "rand".to_string(),
191 expr,
192 is_array_expr: false,
193 deferred: Some(DeferredRandom::Rand(seed)),
194 udf_call: None,
195 source_for_running: None,
196 source_for_running_mean: None,
197 first_last_value: None,
198 source_for_running_count: None,
199 }
200 }
201
202 pub fn from_randn(seed: Option<u64>) -> Self {
204 let expr = lit(1i64).cum_sum(false).map(
205 move |c| expect_col(crate::udfs::apply_randn_with_seed(c, seed)),
206 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
207 );
208 Column {
209 name: "randn".to_string(),
210 expr,
211 is_array_expr: false,
212 deferred: Some(DeferredRandom::Randn(seed)),
213 udf_call: None,
214 source_for_running: None,
215 source_for_running_mean: None,
216 first_last_value: None,
217 source_for_running_count: None,
218 }
219 }
220
221 pub fn expr(&self) -> &Expr {
223 &self.expr
224 }
225
226 pub fn into_expr(self) -> Expr {
229 self.expr.alias(&self.name)
230 }
231
232 pub fn name(&self) -> &str {
234 &self.name
235 }
236
237 pub fn udf_call_info(&self) -> Option<(String, Vec<String>)> {
239 self.udf_call.as_ref().map(|(name, args)| {
240 (
241 name.clone(),
242 args.iter().map(|c| c.name().to_string()).collect(),
243 )
244 })
245 }
246
247 pub fn udf_call_with_args(&self) -> Option<(&str, &[Column])> {
249 self.udf_call
250 .as_ref()
251 .map(|(name, args)| (name.as_str(), args.as_slice()))
252 }
253
254 pub fn literal_as_json_string(&self) -> Option<String> {
256 match &self.expr {
257 Expr::Literal(lv) => crate::dataframe::literal_value_to_serde_value(lv)
258 .and_then(|v| serde_json::to_string(&v).ok()),
259 _ => None,
260 }
261 }
262
263 pub fn udf_call_info_with_literals(
266 &self,
267 ) -> Option<(String, Vec<String>, Vec<Option<String>>)> {
268 self.udf_call.as_ref().map(|(name, args)| {
269 let arg_names: Vec<String> = args.iter().map(|c| c.name().to_string()).collect();
270 let literals: Vec<Option<String>> =
271 args.iter().map(|c| c.literal_as_json_string()).collect();
272 (name.clone(), arg_names, literals)
273 })
274 }
275
276 pub fn alias(&self, name: &str) -> Column {
278 Column {
279 name: name.to_string(),
280 expr: self.expr.clone().alias(name),
281 is_array_expr: self.is_array_expr,
282 deferred: self.deferred,
283 udf_call: self.udf_call.clone(),
284 source_for_running: self.source_for_running.clone(),
285 source_for_running_mean: self.source_for_running_mean.clone(),
286 first_last_value: self.first_last_value.clone(),
287 source_for_running_count: self.source_for_running_count.clone(),
288 }
289 }
290
291 pub fn asc(&self) -> crate::functions::SortOrder {
293 crate::functions::asc(self)
294 }
295
296 pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
298 crate::functions::asc_nulls_first(self)
299 }
300
301 pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
303 crate::functions::asc_nulls_last(self)
304 }
305
306 pub fn desc(&self) -> crate::functions::SortOrder {
308 crate::functions::desc(self)
309 }
310
311 pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
313 crate::functions::desc_nulls_first(self)
314 }
315
316 pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
318 crate::functions::desc_nulls_last(self)
319 }
320
321 pub fn is_null(&self) -> Column {
323 Column {
324 name: format!("({} IS NULL)", self.name),
325 expr: self.expr.clone().is_null(),
326 is_array_expr: false,
327 deferred: None,
328 udf_call: None,
329 source_for_running: None,
330 source_for_running_mean: None,
331 first_last_value: None,
332 source_for_running_count: None,
333 }
334 }
335
336 pub fn is_not_null(&self) -> Column {
338 Column {
339 name: format!("({} IS NOT NULL)", self.name),
340 expr: self.expr.clone().is_not_null(),
341 is_array_expr: false,
342 deferred: None,
343 udf_call: None,
344 source_for_running: None,
345 source_for_running_mean: None,
346 first_last_value: None,
347 source_for_running_count: None,
348 }
349 }
350
351 pub fn isnull(&self) -> Column {
353 self.is_null()
354 }
355
356 pub fn isnotnull(&self) -> Column {
358 self.is_not_null()
359 }
360
361 fn null_boolean_expr() -> Expr {
363 use polars::prelude::*;
364 lit(NULL).cast(DataType::Boolean)
366 }
367
368 pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
371 let regex = like_pattern_to_regex(pattern, escape_char);
372 self.regexp_like(®ex)
373 }
374
375 pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
378 use polars::prelude::*;
379 let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
380 Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
381 }
382
383 pub fn eq_pyspark(&self, other: &Column) -> Column {
389 let left_null = self.expr().clone().is_null();
391 let right_null = other.expr().clone().is_null();
392 let either_null = left_null.clone().or(right_null.clone());
393
394 let eq_result = self.expr().clone().eq(other.expr().clone());
396
397 let null_boolean = Self::null_boolean_expr();
399 let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
400 .then(&Self::from_expr(null_boolean, None))
401 .otherwise(&Self::from_expr(eq_result, None));
402
403 Self::from_expr(null_aware_expr.into_expr(), None)
404 }
405
406 pub fn ne_pyspark(&self, other: &Column) -> Column {
409 let left_null = self.expr().clone().is_null();
411 let right_null = other.expr().clone().is_null();
412 let either_null = left_null.clone().or(right_null.clone());
413
414 let ne_result = self.expr().clone().neq(other.expr().clone());
416
417 let null_boolean = Self::null_boolean_expr();
419 let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
420 .then(&Self::from_expr(null_boolean, None))
421 .otherwise(&Self::from_expr(ne_result, None));
422
423 Self::from_expr(null_aware_expr.into_expr(), None)
424 }
425
426 pub fn eq_null_safe(&self, other: &Column) -> Column {
429 use crate::functions::{lit_bool, when};
430
431 let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(
432 self.expr().clone(),
433 other.expr().clone(),
434 )
435 .unwrap_or_else(|_| (self.expr().clone(), other.expr().clone()));
436
437 let left_null = left_c.clone().is_null();
438 let right_null = right_c.clone().is_null();
439 let both_null = left_null.clone().and(right_null.clone());
440 let either_null = left_null.clone().or(right_null.clone());
441
442 let eq_result = left_c.eq(right_c);
444
445 when(&Self::from_expr(both_null, None))
449 .then(&lit_bool(true))
450 .otherwise(
451 &when(&Self::from_expr(either_null, None))
452 .then(&lit_bool(false))
453 .otherwise(&Self::from_expr(eq_result, None)),
454 )
455 }
456
457 pub fn null_boolean() -> Column {
461 Column::from_expr(Self::null_boolean_expr(), None)
462 }
463
464 pub fn lit_null(dtype: &str) -> Result<Column, String> {
469 use polars::prelude::{NULL, lit};
470 let dt = crate::functions::parse_type_name(dtype)?;
471 Ok(Column::from_expr(lit(NULL).cast(dt), None))
472 }
473
474 pub fn from_bool(b: bool) -> Column {
476 crate::functions::lit_bool(b)
477 }
478
479 pub fn from_i64(n: i64) -> Column {
481 crate::functions::lit_i64(n)
482 }
483
484 pub fn from_string(s: &str) -> Column {
486 crate::functions::lit_str(s)
487 }
488
489 pub fn gt_pyspark(&self, other: &Column) -> Column {
492 let left_null = self.expr().clone().is_null();
494 let right_null = other.expr().clone().is_null();
495 let either_null = left_null.clone().or(right_null.clone());
496
497 let gt_result = self.expr().clone().gt(other.expr().clone());
499
500 let null_boolean = Self::null_boolean_expr();
502 let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
503 .then(&Self::from_expr(null_boolean, None))
504 .otherwise(&Self::from_expr(gt_result, None));
505
506 Self::from_expr(null_aware_expr.into_expr(), None)
507 }
508
509 pub fn ge_pyspark(&self, other: &Column) -> Column {
512 let left_null = self.expr().clone().is_null();
514 let right_null = other.expr().clone().is_null();
515 let either_null = left_null.clone().or(right_null.clone());
516
517 let ge_result = self.expr().clone().gt_eq(other.expr().clone());
519
520 let null_boolean = Self::null_boolean_expr();
522 let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
523 .then(&Self::from_expr(null_boolean, None))
524 .otherwise(&Self::from_expr(ge_result, None));
525
526 Self::from_expr(null_aware_expr.into_expr(), None)
527 }
528
529 pub fn lt_pyspark(&self, other: &Column) -> Column {
532 let left_null = self.expr().clone().is_null();
534 let right_null = other.expr().clone().is_null();
535 let either_null = left_null.clone().or(right_null.clone());
536
537 let lt_result = self.expr().clone().lt(other.expr().clone());
539
540 let null_boolean = Self::null_boolean_expr();
542 let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
543 .then(&Self::from_expr(null_boolean, None))
544 .otherwise(&Self::from_expr(lt_result, None));
545
546 Self::from_expr(null_aware_expr.into_expr(), None)
547 }
548
549 pub fn le_pyspark(&self, other: &Column) -> Column {
552 let left_null = self.expr().clone().is_null();
554 let right_null = other.expr().clone().is_null();
555 let either_null = left_null.clone().or(right_null.clone());
556
557 let le_result = self.expr().clone().lt_eq(other.expr().clone());
559
560 let null_boolean = Self::null_boolean_expr();
562 let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
563 .then(&Self::from_expr(null_boolean, None))
564 .otherwise(&Self::from_expr(le_result, None));
565
566 Self::from_expr(null_aware_expr.into_expr(), None)
567 }
568
569 pub fn gt(&self, other: Expr) -> Column {
575 Self::from_expr(self.expr().clone().gt(other), None)
576 }
577
578 pub fn gt_eq(&self, other: Expr) -> Column {
580 Self::from_expr(self.expr().clone().gt_eq(other), None)
581 }
582
583 pub fn lt(&self, other: Expr) -> Column {
585 Self::from_expr(self.expr().clone().lt(other), None)
586 }
587
588 pub fn lt_eq(&self, other: Expr) -> Column {
590 Self::from_expr(self.expr().clone().lt_eq(other), None)
591 }
592
593 pub fn between(&self, lower: &Column, upper: &Column) -> Column {
596 use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
597 use polars::prelude::*;
598
599 let left = self.expr().clone();
600 let lower_expr = lower.expr().clone();
601 let upper_expr = upper.expr().clone();
602
603 let infer_lit_type = |e: &Expr| -> Option<DataType> {
604 if let Expr::Literal(lv) = e {
605 let dt = lv.get_datatype();
606 if matches!(dt, DataType::Unknown(_)) {
607 None
608 } else {
609 Some(dt)
610 }
611 } else {
612 None
613 }
614 };
615
616 let lower_ty = infer_lit_type(&lower_expr).unwrap_or(DataType::String);
617 let upper_ty = infer_lit_type(&upper_expr).unwrap_or(DataType::String);
618 let lt = DataType::String;
619
620 let (left_c, lower_c) = match coerce_for_pyspark_comparison(
621 left.clone(),
622 lower_expr.clone(),
623 <,
624 &lower_ty,
625 &CompareOp::GtEq,
626 ) {
627 Ok((a, b)) => (a, b),
628 Err(_) => (left.clone(), lower_expr),
629 };
630
631 let upper_clone = upper.expr().clone();
632 let (left_cc, upper_c) = match coerce_for_pyspark_comparison(
633 left_c.clone(),
634 upper_expr,
635 <,
636 &upper_ty,
637 &CompareOp::LtEq,
638 ) {
639 Ok((a, b)) => (a, b),
640 Err(_) => (left_c.clone(), upper_clone),
641 };
642
643 let ge = left_cc.clone().gt_eq(lower_c);
644 let le = left_cc.lt_eq(upper_c);
645 Self::from_expr(ge.and(le), None)
646 }
647
648 pub fn eq(&self, other: Expr) -> Column {
650 Self::from_expr(self.expr().clone().eq(other), None)
651 }
652
653 pub fn neq(&self, other: Expr) -> Column {
655 Self::from_expr(self.expr().clone().neq(other), None)
656 }
657
658 pub fn and_(&self, other: &Column) -> Column {
660 Self::from_expr(self.expr().clone().and(other.expr().clone()), None)
661 }
662
663 pub fn or_(&self, other: &Column) -> Column {
665 Self::from_expr(self.expr().clone().or(other.expr().clone()), None)
666 }
667
668 pub fn upper(&self) -> Column {
679 Self::from_expr(self.expr().clone().str().to_uppercase(), None)
680 }
681
682 pub fn lower(&self) -> Column {
684 Self::from_expr(self.expr().clone().str().to_lowercase(), None)
685 }
686
687 pub fn lcase(&self) -> Column {
689 self.lower()
690 }
691
692 pub fn ucase(&self) -> Column {
694 self.upper()
695 }
696
697 pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
703 use polars::prelude::*;
704 if length.map(|l| l < 1).unwrap_or(false) {
706 let expr = when(self.expr().clone().is_null())
707 .then(lit(NULL))
708 .otherwise(lit(""));
709 return Self::from_expr(expr, None);
710 }
711 let len_chars = self.expr().clone().str().len_chars();
712 let offset_expr = if start == 0 {
714 lit(0i64)
715 } else if start >= 1 {
716 lit((start - 1).max(0))
717 } else {
718 let from_end = len_chars + lit(start);
719 when(from_end.clone().lt(lit(0i64)))
720 .then(lit(0i64))
721 .otherwise(from_end)
722 };
723 let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
724 Self::from_expr(
725 self.expr().clone().str().slice(offset_expr, length_expr),
726 None,
727 )
728 }
729
730 pub fn length(&self) -> Column {
732 Self::from_expr(self.expr().clone().str().len_chars(), None)
733 }
734
735 pub fn bit_length(&self) -> Column {
737 use polars::prelude::*;
738 let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
739 Self::from_expr((len_bytes * lit(8i32)).cast(DataType::Int32), None)
740 }
741
742 pub fn octet_length(&self) -> Column {
744 use polars::prelude::*;
745 Self::from_expr(
746 self.expr().clone().str().len_bytes().cast(DataType::Int32),
747 None,
748 )
749 }
750
751 pub fn char_length(&self) -> Column {
753 self.length()
754 }
755
756 pub fn character_length(&self) -> Column {
758 self.length()
759 }
760
761 pub fn encode(&self, charset: &str) -> Column {
763 let charset = charset.to_string();
764 let expr = self.expr().clone().map(
765 move |s| expect_col(crate::udfs::apply_encode(s, &charset)),
766 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
767 );
768 Self::from_expr(expr, None)
769 }
770
771 pub fn decode(&self, charset: &str) -> Column {
773 let charset = charset.to_string();
774 let expr = self.expr().clone().map(
775 move |s| expect_col(crate::udfs::apply_decode(s, &charset)),
776 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
777 );
778 Self::from_expr(expr, None)
779 }
780
781 pub fn to_binary(&self, fmt: &str) -> Column {
783 let fmt = fmt.to_string();
784 let expr = self.expr().clone().map(
785 move |s| expect_col(crate::udfs::apply_to_binary(s, &fmt)),
786 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
787 );
788 Self::from_expr(expr, None)
789 }
790
791 pub fn try_to_binary(&self, fmt: &str) -> Column {
793 let fmt = fmt.to_string();
794 let expr = self.expr().clone().map(
795 move |s| expect_col(crate::udfs::apply_try_to_binary(s, &fmt)),
796 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
797 );
798 Self::from_expr(expr, None)
799 }
800
801 pub fn aes_encrypt(&self, key: &str) -> Column {
803 let key = key.to_string();
804 let expr = self.expr().clone().map(
805 move |s| expect_col(crate::udfs::apply_aes_encrypt(s, &key)),
806 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
807 );
808 Self::from_expr(expr, None)
809 }
810
811 pub fn aes_decrypt(&self, key: &str) -> Column {
813 let key = key.to_string();
814 let expr = self.expr().clone().map(
815 move |s| expect_col(crate::udfs::apply_aes_decrypt(s, &key)),
816 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
817 );
818 Self::from_expr(expr, None)
819 }
820
821 pub fn try_aes_decrypt(&self, key: &str) -> Column {
823 let key = key.to_string();
824 let expr = self.expr().clone().map(
825 move |s| expect_col(crate::udfs::apply_try_aes_decrypt(s, &key)),
826 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
827 );
828 Self::from_expr(expr, None)
829 }
830
831 pub fn typeof_(&self) -> Column {
833 Self::from_expr(
834 self.expr().clone().map(
835 |s| expect_col(crate::udfs::apply_typeof(s)),
836 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
837 ),
838 None,
839 )
840 }
841
842 pub fn trim(&self) -> Column {
845 use polars::prelude::*;
846 Self::from_expr(self.expr().clone().str().strip_chars(lit(" ")), None)
847 }
848
849 pub fn ltrim(&self) -> Column {
851 use polars::prelude::*;
852 Self::from_expr(self.expr().clone().str().strip_chars_start(lit(" ")), None)
853 }
854
855 pub fn rtrim(&self) -> Column {
857 use polars::prelude::*;
858 Self::from_expr(self.expr().clone().str().strip_chars_end(lit(" ")), None)
859 }
860
861 pub fn btrim(&self, trim_str: Option<&str>) -> Column {
863 use polars::prelude::*;
864 let chars = trim_str.unwrap_or(" ");
865 Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
866 }
867
868 pub fn locate(&self, substr: &str, pos: i64) -> Column {
870 use polars::prelude::*;
871 if substr.is_empty() {
872 return Self::from_expr(lit(1i64), None);
873 }
874 let start = (pos - 1).max(0);
875 let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
876 let found = slice_expr.str().find_literal(lit(substr.to_string()));
877 let expr = (found.cast(DataType::Int64) + lit(start + 1))
878 .fill_null(lit(0i64))
879 .cast(DataType::Int64);
880 Self::from_expr(expr, None)
881 }
882
883 pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
885 let expr = self.expr().clone().map(
886 move |s| expect_col(crate::udfs::apply_conv(s, from_base, to_base)),
887 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
888 );
889 Self::from_expr(expr, None)
890 }
891
892 pub fn hex(&self) -> Column {
894 let expr = self.expr().clone().map(
895 |s| expect_col(crate::udfs::apply_hex(s)),
896 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
897 );
898 Self::from_expr(expr, None)
899 }
900
901 pub fn unhex(&self) -> Column {
903 let expr = self.expr().clone().map(
904 |s| expect_col(crate::udfs::apply_unhex(s)),
905 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
906 );
907 Self::from_expr(expr, None)
908 }
909
910 pub fn bin(&self) -> Column {
912 let expr = self.expr().clone().map(
913 |s| expect_col(crate::udfs::apply_bin(s)),
914 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
915 );
916 Self::from_expr(expr, None)
917 }
918
919 pub fn getbit(&self, pos: i64) -> Column {
921 let expr = self.expr().clone().map(
922 move |s| expect_col(crate::udfs::apply_getbit(s, pos)),
923 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
924 );
925 Self::from_expr(expr, None)
926 }
927
928 pub fn bit_and(&self, other: &Column) -> Column {
930 let args = [other.expr().clone()];
931 let expr = self.expr().clone().cast(DataType::Int64).map_many(
932 |cols| expect_col(crate::udfs::apply_bit_and(cols)),
933 &args,
934 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
935 );
936 Self::from_expr(expr, None)
937 }
938
939 pub fn bit_or(&self, other: &Column) -> Column {
941 let args = [other.expr().clone()];
942 let expr = self.expr().clone().cast(DataType::Int64).map_many(
943 |cols| expect_col(crate::udfs::apply_bit_or(cols)),
944 &args,
945 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
946 );
947 Self::from_expr(expr, None)
948 }
949
950 pub fn bit_xor(&self, other: &Column) -> Column {
952 let args = [other.expr().clone()];
953 let expr = self.expr().clone().cast(DataType::Int64).map_many(
954 |cols| expect_col(crate::udfs::apply_bit_xor(cols)),
955 &args,
956 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
957 );
958 Self::from_expr(expr, None)
959 }
960
961 pub fn bit_count(&self) -> Column {
963 let expr = self.expr().clone().map(
964 |s| expect_col(crate::udfs::apply_bit_count(s)),
965 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
966 );
967 Self::from_expr(expr, None)
968 }
969
970 pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
973 let msg = err_msg.map(String::from);
974 let expr = self.expr().clone().map(
975 move |c| expect_col(crate::udfs::apply_assert_true(c, msg.as_deref())),
976 |_schema, field| Ok(field.clone()),
977 );
978 Self::from_expr(expr, None)
979 }
980
981 pub fn bitwise_not(&self) -> Column {
984 use polars::prelude::Field;
985 let expr = self.expr().clone().map(
986 move |col| expect_col(crate::udfs::apply_coerce_to_int64_for_bitwise(col)),
987 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
988 );
989 let expr = (lit(-1i64) - expr).cast(DataType::Int64);
990 Self::from_expr(expr, None)
991 }
992
993 pub fn logical_not(&self) -> Column {
996 let expr = self.expr().clone().map(
997 move |col| expect_col(crate::udfs::apply_logical_not_boolean_only(col)),
998 |_schema, field| {
999 if field.dtype() == &DataType::Boolean {
1000 Ok(field.clone())
1001 } else {
1002 Err(PolarsError::ComputeError(
1003 "logical NOT (~) requires boolean type".into(),
1004 ))
1005 }
1006 },
1007 );
1008 Self::from_expr(expr, None)
1009 }
1010
1011 pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
1013 let pair_delim = pair_delim.to_string();
1014 let key_value_delim = key_value_delim.to_string();
1015 let expr = self.expr().clone().map(
1016 move |s| {
1017 expect_col(crate::udfs::apply_str_to_map(
1018 s,
1019 &pair_delim,
1020 &key_value_delim,
1021 ))
1022 },
1023 |_schema, field| Ok(field.clone()),
1024 );
1025 Self::from_expr(expr, None)
1026 }
1027
1028 fn pattern_has_lookaround(pattern: &str) -> bool {
1030 let p = pattern.as_bytes();
1031 let n = p.len();
1032 let mut i = 0;
1033 while i + 2 < n {
1034 if p[i] == b'(' && p[i + 1] == b'?' {
1035 match p[i + 2] {
1036 b'=' | b'!' => return true, b'<' if i + 4 <= n && (p[i + 3] == b'=' || p[i + 3] == b'!') => return true, _ => {}
1039 }
1040 }
1041 i += 1;
1042 }
1043 false
1044 }
1045
1046 pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
1049 use polars::prelude::*;
1050 if Self::pattern_has_lookaround(pattern) {
1051 let pat = pattern.to_string();
1052 let group = group_index;
1053 Self::from_expr(
1054 self.expr().clone().map(
1055 move |s| {
1056 expect_col(crate::udfs::apply_regexp_extract_lookaround(s, &pat, group))
1057 },
1058 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1059 ),
1060 None,
1061 )
1062 } else {
1063 let pat = pattern.to_string();
1064 Self::from_expr(
1065 self.expr().clone().map(
1068 move |s| expect_col(crate::udfs::apply_regexp_extract(s, &pat, group_index)),
1069 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1070 ),
1071 None,
1072 )
1073 }
1074 }
1075
1076 pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
1078 use polars::prelude::*;
1079 let pat = pattern.to_string();
1080 let rep = replacement.to_string();
1081 Self::from_expr(
1082 self.expr()
1084 .clone()
1085 .cast(DataType::String)
1086 .str()
1087 .replace_all(lit(pat), lit(rep), false),
1088 None,
1089 )
1090 }
1091
1092 pub fn left(&self, n: i64) -> Column {
1094 use polars::prelude::*;
1095 let len = n.max(0) as u32;
1096 Self::from_expr(
1097 self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
1098 None,
1099 )
1100 }
1101
1102 pub fn right(&self, n: i64) -> Column {
1104 use polars::prelude::*;
1105 let n_val = n.max(0);
1106 let n_expr = lit(n_val);
1107 let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
1108 let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
1109 .then(lit(0i64))
1110 .otherwise(len_chars - n_expr.clone());
1111 Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
1112 }
1113
1114 pub fn replace(&self, search: &str, replacement: &str) -> Column {
1116 use polars::prelude::*;
1117 Self::from_expr(
1118 self.expr().clone().str().replace_all(
1119 lit(search.to_string()),
1120 lit(replacement.to_string()),
1121 true,
1122 ),
1123 None,
1124 )
1125 }
1126
1127 pub fn replace_many(&self, pairs: &[(String, String)]) -> Column {
1129 let mut out = self.clone();
1130 for (search, replacement) in pairs {
1131 out = out.replace(search, replacement);
1132 }
1133 out
1134 }
1135
1136 pub fn startswith(&self, prefix: &str) -> Column {
1138 use polars::prelude::*;
1139 Self::from_expr(
1140 self.expr()
1141 .clone()
1142 .str()
1143 .starts_with(lit(prefix.to_string())),
1144 None,
1145 )
1146 }
1147
1148 pub fn endswith(&self, suffix: &str) -> Column {
1150 use polars::prelude::*;
1151 Self::from_expr(
1152 self.expr().clone().str().ends_with(lit(suffix.to_string())),
1153 None,
1154 )
1155 }
1156
1157 pub fn contains(&self, substring: &str) -> Column {
1159 use polars::prelude::*;
1160 Self::from_expr(
1161 self.expr()
1162 .clone()
1163 .str()
1164 .contains(lit(substring.to_string()), true),
1165 None,
1166 )
1167 }
1168
1169 pub fn split(&self, delimiter: &str, limit: Option<i32>) -> Column {
1173 use polars::prelude::*;
1174 let use_limit = limit.is_some_and(|l| l > 0);
1175 if use_limit {
1176 let delim = delimiter.to_string();
1177 let lim = limit.unwrap_or(0);
1178 let expr = self.expr().clone().map(
1179 move |col| expect_col(crate::udfs::apply_split_with_limit(col, &delim, lim)),
1180 |_schema, field| {
1181 Ok(Field::new(
1182 field.name().clone(),
1183 DataType::List(Box::new(DataType::String)),
1184 ))
1185 },
1186 );
1187 Self::from_expr(expr, None)
1188 } else {
1189 Self::from_expr(
1190 self.expr().clone().str().split(lit(delimiter.to_string())),
1191 None,
1192 )
1193 }
1194 }
1195
1196 pub fn initcap(&self) -> Column {
1198 let expr = self.expr().clone().map(
1199 |s| expect_col(crate::udfs::apply_initcap(s)),
1200 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1201 );
1202 Self::from_expr(expr, None)
1203 }
1204
1205 pub fn regexp_extract_all(&self, pattern: &str) -> Column {
1207 use polars::prelude::*;
1208 Self::from_expr(
1209 self.expr()
1210 .clone()
1211 .str()
1212 .extract_all(lit(pattern.to_string())),
1213 None,
1214 )
1215 }
1216
1217 pub fn regexp_extract_all_group(&self, pattern: &str, group_index: usize) -> Column {
1220 if group_index == 0 {
1221 return self.regexp_extract_all(pattern);
1222 }
1223 use polars::prelude::*;
1224 let pat = pattern.to_string();
1225 let idx = group_index;
1226 let expr = self.expr().clone().map(
1227 move |s| expect_col(crate::udfs::apply_regexp_extract_all_group(s, &pat, idx)),
1228 |_schema, field| {
1229 Ok(Field::new(
1230 field.name().clone(),
1231 DataType::List(Box::new(DataType::String)),
1232 ))
1233 },
1234 );
1235 Self::from_expr(expr, None)
1236 }
1237
1238 pub fn regexp_like(&self, pattern: &str) -> Column {
1240 use polars::prelude::*;
1241 if pattern.contains("(?=")
1243 || pattern.contains("(?!")
1244 || pattern.contains("(?<=")
1245 || pattern.contains("(?<!")
1246 {
1247 let pat = pattern.to_string();
1248 let expr = self.expr().clone().map(
1249 move |s| expect_col(crate::udfs::apply_regexp_like_lookaround(s, &pat)),
1250 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1251 );
1252 return Self::from_expr(expr, None);
1253 }
1254 Self::from_expr(
1255 self.expr()
1256 .clone()
1257 .str()
1258 .contains(lit(pattern.to_string()), false),
1259 None,
1260 )
1261 }
1262
1263 pub fn regexp_count(&self, pattern: &str) -> Column {
1265 use polars::prelude::*;
1266 Self::from_expr(
1267 self.expr()
1268 .clone()
1269 .str()
1270 .count_matches(lit(pattern.to_string()), false)
1271 .cast(DataType::Int64),
1272 None,
1273 )
1274 }
1275
1276 pub fn regexp_substr(&self, pattern: &str) -> Column {
1278 self.regexp_extract(pattern, 0)
1279 }
1280
1281 pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
1283 let idx = group_idx.unwrap_or(0);
1284 let pattern = pattern.to_string();
1285 let expr = self.expr().clone().map(
1286 move |s| expect_col(crate::udfs::apply_regexp_instr(s, pattern.clone(), idx)),
1287 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1288 );
1289 Self::from_expr(expr, None)
1290 }
1291
1292 pub fn find_in_set(&self, set_column: &Column) -> Column {
1294 let args = [set_column.expr().clone()];
1295 let expr = self.expr().clone().map_many(
1296 |cols| expect_col(crate::udfs::apply_find_in_set(cols)),
1297 &args,
1298 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
1299 );
1300 Self::from_expr(expr, None)
1301 }
1302
1303 pub fn repeat(&self, n: i32) -> Column {
1305 use polars::prelude::*;
1306 Self::from_expr(
1308 self.expr()
1309 .clone()
1310 .repeat_by(lit(n as u32))
1311 .list()
1312 .join(lit(""), false),
1313 None,
1314 )
1315 }
1316
1317 pub fn reverse(&self) -> Column {
1319 Self::from_expr(self.expr().clone().str().reverse(), None)
1320 }
1321
1322 pub fn instr(&self, substr: &str) -> Column {
1324 use polars::prelude::*;
1325 let found = self
1326 .expr()
1327 .clone()
1328 .str()
1329 .find_literal(lit(substr.to_string()));
1330 Self::from_expr(
1332 (found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
1333 None,
1334 )
1335 }
1336
1337 pub fn lpad(&self, length: i32, pad: &str) -> Column {
1339 let pad_str = if pad.is_empty() { " " } else { pad };
1340 let fill = pad_str.chars().next().unwrap_or(' ');
1341 Self::from_expr(
1342 self.expr()
1343 .clone()
1344 .str()
1345 .pad_start(lit(length as i64), fill),
1346 None,
1347 )
1348 }
1349
1350 pub fn rpad(&self, length: i32, pad: &str) -> Column {
1352 let pad_str = if pad.is_empty() { " " } else { pad };
1353 let fill = pad_str.chars().next().unwrap_or(' ');
1354 Self::from_expr(
1355 self.expr().clone().str().pad_end(lit(length as i64), fill),
1356 None,
1357 )
1358 }
1359
1360 pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
1362 use polars::prelude::*;
1363 let mut e = self.expr().clone();
1364 let from_chars: Vec<char> = from_str.chars().collect();
1365 let to_chars: Vec<char> = to_str.chars().collect();
1366 for (i, fc) in from_chars.iter().enumerate() {
1367 let f = fc.to_string();
1368 let t = to_chars
1369 .get(i)
1370 .map(|c| c.to_string())
1371 .unwrap_or_else(String::new); e = e.str().replace_all(lit(f), lit(t), true);
1373 }
1374 Self::from_expr(e, None)
1375 }
1376
1377 pub fn mask(
1380 &self,
1381 upper_char: Option<char>,
1382 lower_char: Option<char>,
1383 digit_char: Option<char>,
1384 other_char: Option<char>,
1385 ) -> Column {
1386 use polars::prelude::*;
1387 let upper = upper_char.unwrap_or('X').to_string();
1388 let lower = lower_char.unwrap_or('x').to_string();
1389 let digit = digit_char.unwrap_or('n').to_string();
1390 let other = other_char.map(|c| c.to_string());
1391 let mut e = self
1392 .expr()
1393 .clone()
1394 .str()
1395 .replace_all(lit("[A-Z]".to_string()), lit(upper), false)
1396 .str()
1397 .replace_all(lit("[a-z]".to_string()), lit(lower), false)
1398 .str()
1399 .replace_all(lit(r"\d".to_string()), lit(digit), false);
1400 if let Some(o) = other {
1401 e = e
1402 .str()
1403 .replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
1404 }
1405 Self::from_expr(e, None)
1406 }
1407
1408 pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
1411 use polars::prelude::*;
1412 if part_num == 0 {
1413 return Self::from_expr(lit(NULL), None);
1414 }
1415 let use_regex = delimiter == "|";
1416 if use_regex {
1417 let pattern = delimiter.to_string();
1418 let part = part_num;
1419 let get_expr = self.expr().clone().map(
1420 move |col| expect_col(crate::udfs::apply_split_part_regex(col, &pattern, part)),
1421 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1422 );
1423 let expr = when(self.expr().clone().is_null())
1424 .then(lit(NULL))
1425 .otherwise(get_expr.fill_null(lit("")));
1426 return Self::from_expr(expr, None);
1427 }
1428 let delim = delimiter.to_string();
1429 let split_expr = self.expr().clone().str().split(lit(delim));
1430 let index = if part_num > 0 {
1431 lit(part_num - 1)
1432 } else {
1433 lit(part_num)
1434 };
1435 let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
1436 let expr = when(self.expr().clone().is_null())
1437 .then(lit(NULL))
1438 .otherwise(get_expr);
1439 Self::from_expr(expr, None)
1440 }
1441
1442 pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
1444 use polars::prelude::*;
1445 if delimiter.is_empty() {
1448 let expr = when(self.expr().clone().is_null())
1449 .then(lit(NULL))
1450 .otherwise(lit("").cast(DataType::String));
1451 return Self::from_expr(expr, None);
1452 }
1453 let delim = delimiter.to_string();
1454 let split_expr = self.expr().clone().str().split(lit(delim.clone()));
1455 let n = count.unsigned_abs() as i64;
1456 let expr = if count > 0 {
1457 split_expr
1458 .clone()
1459 .list()
1460 .slice(lit(0i64), lit(n))
1461 .list()
1462 .join(lit(delim), false)
1463 } else {
1464 let len = split_expr.clone().list().len();
1465 let start = when(len.clone().gt(lit(n)))
1466 .then(len.clone() - lit(n))
1467 .otherwise(lit(0i64));
1468 let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
1469 split_expr
1470 .list()
1471 .slice(start, slice_len)
1472 .list()
1473 .join(lit(delim), false)
1474 };
1475 Self::from_expr(expr, None)
1476 }
1477
1478 pub fn soundex(&self) -> Column {
1480 let expr = self.expr().clone().map(
1481 |s| expect_col(crate::udfs::apply_soundex(s)),
1482 |_schema, field| Ok(field.clone()),
1483 );
1484 Self::from_expr(expr, None)
1485 }
1486
1487 pub fn levenshtein(&self, other: &Column) -> Column {
1489 let args = [other.expr().clone()];
1490 let expr = self.expr().clone().map_many(
1491 |cols| expect_col(crate::udfs::apply_levenshtein(cols)),
1492 &args,
1493 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
1494 );
1495 Self::from_expr(expr, None)
1496 }
1497
1498 pub fn crc32(&self) -> Column {
1500 let expr = self.expr().clone().map(
1501 |s| expect_col(crate::udfs::apply_crc32(s)),
1502 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1503 );
1504 Self::from_expr(expr, None)
1505 }
1506
1507 pub fn xxhash64(&self) -> Column {
1509 let expr = self.expr().clone().map(
1510 |s| expect_col(crate::udfs::apply_xxhash64(s)),
1511 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
1512 );
1513 Self::from_expr(expr, None)
1514 }
1515
1516 pub fn ascii(&self) -> Column {
1518 let expr = self.expr().clone().map(
1519 |s| expect_col(crate::udfs::apply_ascii(s)),
1520 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
1521 );
1522 Self::from_expr(expr, None)
1523 }
1524
1525 pub fn format_number(&self, decimals: u32) -> Column {
1527 let expr = self.expr().clone().map(
1528 move |s| expect_col(crate::udfs::apply_format_number(s, decimals)),
1529 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1530 );
1531 Self::from_expr(expr, None)
1532 }
1533
1534 pub fn char(&self) -> Column {
1536 let expr = self.expr().clone().map(
1537 |s| expect_col(crate::udfs::apply_char(s)),
1538 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
1539 );
1540 Self::from_expr(expr, None)
1541 }
1542
1543 pub fn chr(&self) -> Column {
1545 self.char()
1546 }
1547
1548 pub fn base64(&self) -> Column {
1550 let expr = self.expr().clone().map(
1551 |s| expect_col(crate::udfs::apply_base64(s)),
1552 |_schema, field| Ok(field.clone()),
1553 );
1554 Self::from_expr(expr, None)
1555 }
1556
1557 pub fn unbase64(&self) -> Column {
1559 let expr = self.expr().clone().map(
1560 |s| expect_col(crate::udfs::apply_unbase64(s)),
1561 |_schema, field| Ok(field.clone()),
1562 );
1563 Self::from_expr(expr, None)
1564 }
1565
1566 pub fn sha1(&self) -> Column {
1568 let expr = self.expr().clone().map(
1569 |s| expect_col(crate::udfs::apply_sha1(s)),
1570 |_schema, field| Ok(field.clone()),
1571 );
1572 Self::from_expr(expr, None)
1573 }
1574
1575 pub fn sha2(&self, bit_length: i32) -> Column {
1577 let expr = self.expr().clone().map(
1578 move |s| expect_col(crate::udfs::apply_sha2(s, bit_length)),
1579 |_schema, field| Ok(field.clone()),
1580 );
1581 Self::from_expr(expr, None)
1582 }
1583
1584 pub fn md5(&self) -> Column {
1586 let expr = self.expr().clone().map(
1587 |s| expect_col(crate::udfs::apply_md5(s)),
1588 |_schema, field| Ok(field.clone()),
1589 );
1590 Self::from_expr(expr, None)
1591 }
1592
1593 pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
1595 use polars::prelude::*;
1596 let pos = pos.max(1);
1597 let replace_len = length.max(0);
1598 let start_left = 0i64;
1599 let len_left = (pos - 1).max(0);
1600 let start_right = (pos - 1 + replace_len).max(0);
1601 let len_right = 1_000_000i64; let left = self
1603 .expr()
1604 .clone()
1605 .str()
1606 .slice(lit(start_left), lit(len_left));
1607 let mid = lit(replace.to_string());
1608 let right = self
1609 .expr()
1610 .clone()
1611 .str()
1612 .slice(lit(start_right), lit(len_right));
1613 let exprs = [left, mid, right];
1614 let concat_expr = polars::prelude::concat_str(&exprs, "", false);
1615 Self::from_expr(concat_expr, None)
1616 }
1617
1618 pub fn abs(&self) -> Column {
1622 Self::from_expr(self.expr().clone().abs(), None)
1623 }
1624
1625 pub fn ceil(&self) -> Column {
1627 use polars::prelude::*;
1628 let expr = self.expr().clone().ceil().cast(DataType::Int64);
1631 Self::from_expr(expr, None)
1632 }
1633
1634 pub fn ceiling(&self) -> Column {
1636 self.ceil()
1637 }
1638
1639 pub fn floor(&self) -> Column {
1641 use polars::prelude::*;
1642 let expr = self.expr().clone().floor().cast(DataType::Int64);
1645 Self::from_expr(expr, None)
1646 }
1647
1648 pub fn round(&self, scale: i32) -> Column {
1651 let expr = self.expr().clone().map(
1652 move |s| expect_col(crate::udfs::apply_round(s, scale)),
1653 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1654 );
1655 Self::from_expr(expr, None)
1656 }
1657
1658 pub fn bround(&self, scale: i32) -> Column {
1660 let expr = self.expr().clone().map(
1661 move |s| expect_col(crate::udfs::apply_bround(s, scale)),
1662 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1663 );
1664 Self::from_expr(expr, None)
1665 }
1666
1667 pub fn negate(&self) -> Column {
1669 use polars::prelude::*;
1670 Self::from_expr(self.expr().clone() * lit(-1), None)
1671 }
1672
1673 pub fn multiply_pyspark(&self, other: &Column) -> Column {
1678 let args = [other.expr().clone()];
1679 let expr = self.expr().clone().map_many(
1680 |cols| expect_col(crate::udfs::apply_pyspark_multiply(cols)),
1681 &args,
1682 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1683 );
1684 Self::from_expr(expr, None)
1685 }
1686
1687 pub fn add_pyspark(&self, other: &Column) -> Column {
1690 let args = [other.expr().clone()];
1691 let expr = self.expr().clone().map_many(
1692 |cols| expect_col(crate::udfs::apply_pyspark_add(cols)),
1693 &args,
1694 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1695 );
1696 Self::from_expr(expr, None)
1697 }
1698
1699 pub fn subtract_pyspark(&self, other: &Column) -> Column {
1701 let args = [other.expr().clone()];
1702 let expr = self.expr().clone().map_many(
1703 |cols| expect_col(crate::udfs::apply_pyspark_subtract(cols)),
1704 &args,
1705 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1706 );
1707 Self::from_expr(expr, None)
1708 }
1709
1710 pub fn divide_pyspark(&self, other: &Column) -> Column {
1712 let args = [other.expr().clone()];
1713 let expr = self.expr().clone().map_many(
1714 |cols| expect_col(crate::udfs::apply_pyspark_divide(cols)),
1715 &args,
1716 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1717 );
1718 Self::from_expr(expr, None)
1719 }
1720
1721 pub fn mod_pyspark(&self, other: &Column) -> Column {
1723 let args = [other.expr().clone()];
1724 let expr = self.expr().clone().map_many(
1725 |cols| expect_col(crate::udfs::apply_pyspark_mod(cols)),
1726 &args,
1727 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1728 );
1729 Self::from_expr(expr, None)
1730 }
1731
1732 pub fn multiply(&self, other: &Column) -> Column {
1734 Self::from_expr(self.expr().clone() * other.expr().clone(), None)
1735 }
1736
1737 pub fn add(&self, other: &Column) -> Column {
1739 Self::from_expr(self.expr().clone() + other.expr().clone(), None)
1740 }
1741
1742 pub fn subtract(&self, other: &Column) -> Column {
1744 Self::from_expr(self.expr().clone() - other.expr().clone(), None)
1745 }
1746
1747 pub fn divide(&self, other: &Column) -> Column {
1749 Self::from_expr(self.expr().clone() / other.expr().clone(), None)
1750 }
1751
1752 pub fn mod_(&self, other: &Column) -> Column {
1754 Self::from_expr(self.expr().clone() % other.expr().clone(), None)
1755 }
1756
1757 pub fn sqrt(&self) -> Column {
1759 Self::from_expr(self.expr().clone().sqrt(), None)
1760 }
1761
1762 pub fn pow(&self, exp: i64) -> Column {
1764 use polars::prelude::*;
1765 Self::from_expr(self.expr().clone().pow(lit(exp)), None)
1766 }
1767
1768 pub fn pow_with(&self, exponent: &Column) -> Column {
1771 let args = [exponent.expr().clone()];
1772 let expr = self.expr().clone().map_many(
1773 move |cols| expect_col(crate::udfs::apply_pow_pyspark(cols)),
1774 &args,
1775 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1776 );
1777 Self::from_expr(expr, None)
1778 }
1779
1780 pub fn power(&self, exp: i64) -> Column {
1782 self.pow(exp)
1783 }
1784
1785 pub fn exp(&self) -> Column {
1787 Self::from_expr(self.expr().clone().exp(), None)
1788 }
1789
1790 pub fn log(&self) -> Column {
1792 Self::from_expr(self.expr().clone().log(lit(std::f64::consts::E)), None)
1793 }
1794
1795 pub fn ln(&self) -> Column {
1797 self.log()
1798 }
1799
1800 pub fn sin(&self) -> Column {
1802 let expr = self.expr().clone().map(
1803 |s| expect_col(crate::udfs::apply_sin(s)),
1804 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1805 );
1806 Self::from_expr(expr, None)
1807 }
1808
1809 pub fn cos(&self) -> Column {
1811 let expr = self.expr().clone().map(
1812 |s| expect_col(crate::udfs::apply_cos(s)),
1813 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1814 );
1815 Self::from_expr(expr, None)
1816 }
1817
1818 pub fn tan(&self) -> Column {
1820 let expr = self.expr().clone().map(
1821 |s| expect_col(crate::udfs::apply_tan(s)),
1822 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1823 );
1824 Self::from_expr(expr, None)
1825 }
1826
1827 pub fn cot(&self) -> Column {
1829 let expr = self.expr().clone().map(
1830 |s| expect_col(crate::udfs::apply_cot(s)),
1831 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1832 );
1833 Self::from_expr(expr, None)
1834 }
1835
1836 pub fn csc(&self) -> Column {
1838 let expr = self.expr().clone().map(
1839 |s| expect_col(crate::udfs::apply_csc(s)),
1840 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1841 );
1842 Self::from_expr(expr, None)
1843 }
1844
1845 pub fn sec(&self) -> Column {
1847 let expr = self.expr().clone().map(
1848 |s| expect_col(crate::udfs::apply_sec(s)),
1849 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1850 );
1851 Self::from_expr(expr, None)
1852 }
1853
1854 pub fn asin(&self) -> Column {
1856 let expr = self.expr().clone().map(
1857 |s| expect_col(crate::udfs::apply_asin(s)),
1858 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1859 );
1860 Self::from_expr(expr, None)
1861 }
1862
1863 pub fn acos(&self) -> Column {
1865 let expr = self.expr().clone().map(
1866 |s| expect_col(crate::udfs::apply_acos(s)),
1867 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1868 );
1869 Self::from_expr(expr, None)
1870 }
1871
1872 pub fn atan(&self) -> Column {
1874 let expr = self.expr().clone().map(
1875 |s| expect_col(crate::udfs::apply_atan(s)),
1876 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1877 );
1878 Self::from_expr(expr, None)
1879 }
1880
1881 pub fn atan2(&self, x: &Column) -> Column {
1883 let args = [x.expr().clone()];
1884 let expr = self.expr().clone().map_many(
1885 |cols| expect_col(crate::udfs::apply_atan2(cols)),
1886 &args,
1887 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
1888 );
1889 Self::from_expr(expr, None)
1890 }
1891
1892 pub fn degrees(&self) -> Column {
1894 let expr = self.expr().clone().map(
1895 |s| expect_col(crate::udfs::apply_degrees(s)),
1896 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1897 );
1898 Self::from_expr(expr, None)
1899 }
1900
1901 pub fn to_degrees(&self) -> Column {
1903 self.degrees()
1904 }
1905
1906 pub fn radians(&self) -> Column {
1908 let expr = self.expr().clone().map(
1909 |s| expect_col(crate::udfs::apply_radians(s)),
1910 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1911 );
1912 Self::from_expr(expr, None)
1913 }
1914
1915 pub fn to_radians(&self) -> Column {
1917 self.radians()
1918 }
1919
1920 pub fn signum(&self) -> Column {
1922 let expr = self.expr().clone().map(
1923 |s| expect_col(crate::udfs::apply_signum(s)),
1924 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1925 );
1926 Self::from_expr(expr, None)
1927 }
1928
1929 pub fn cosh(&self) -> Column {
1931 let expr = self.expr().clone().map(
1932 |s| expect_col(crate::udfs::apply_cosh(s)),
1933 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1934 );
1935 Self::from_expr(expr, None)
1936 }
1937 pub fn sinh(&self) -> Column {
1939 let expr = self.expr().clone().map(
1940 |s| expect_col(crate::udfs::apply_sinh(s)),
1941 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1942 );
1943 Self::from_expr(expr, None)
1944 }
1945 pub fn tanh(&self) -> Column {
1947 let expr = self.expr().clone().map(
1948 |s| expect_col(crate::udfs::apply_tanh(s)),
1949 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1950 );
1951 Self::from_expr(expr, None)
1952 }
1953 pub fn acosh(&self) -> Column {
1955 let expr = self.expr().clone().map(
1956 |s| expect_col(crate::udfs::apply_acosh(s)),
1957 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1958 );
1959 Self::from_expr(expr, None)
1960 }
1961 pub fn asinh(&self) -> Column {
1963 let expr = self.expr().clone().map(
1964 |s| expect_col(crate::udfs::apply_asinh(s)),
1965 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1966 );
1967 Self::from_expr(expr, None)
1968 }
1969 pub fn atanh(&self) -> Column {
1971 let expr = self.expr().clone().map(
1972 |s| expect_col(crate::udfs::apply_atanh(s)),
1973 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1974 );
1975 Self::from_expr(expr, None)
1976 }
1977 pub fn cbrt(&self) -> Column {
1979 let expr = self.expr().clone().map(
1980 |s| expect_col(crate::udfs::apply_cbrt(s)),
1981 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1982 );
1983 Self::from_expr(expr, None)
1984 }
1985 pub fn expm1(&self) -> Column {
1987 let expr = self.expr().clone().map(
1988 |s| expect_col(crate::udfs::apply_expm1(s)),
1989 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1990 );
1991 Self::from_expr(expr, None)
1992 }
1993 pub fn log1p(&self) -> Column {
1995 let expr = self.expr().clone().map(
1996 |s| expect_col(crate::udfs::apply_log1p(s)),
1997 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1998 );
1999 Self::from_expr(expr, None)
2000 }
2001 pub fn log10(&self) -> Column {
2003 let expr = self.expr().clone().map(
2004 |s| expect_col(crate::udfs::apply_log10(s)),
2005 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
2006 );
2007 Self::from_expr(expr, None)
2008 }
2009 pub fn log2(&self) -> Column {
2011 let expr = self.expr().clone().map(
2012 |s| expect_col(crate::udfs::apply_log2(s)),
2013 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
2014 );
2015 Self::from_expr(expr, None)
2016 }
2017 pub fn rint(&self) -> Column {
2019 let expr = self.expr().clone().map(
2020 |s| expect_col(crate::udfs::apply_rint(s)),
2021 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
2022 );
2023 Self::from_expr(expr, None)
2024 }
2025
2026 pub fn hypot(&self, other: &Column) -> Column {
2028 let xx = self.expr().clone() * self.expr().clone();
2029 let yy = other.expr().clone() * other.expr().clone();
2030 Self::from_expr((xx + yy).sqrt(), None)
2031 }
2032
2033 pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
2035 crate::functions::cast(self, type_name)
2036 }
2037
2038 pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
2040 crate::functions::try_cast(self, type_name)
2041 }
2042
2043 pub fn is_nan(&self) -> Column {
2045 let expr = self.expr().clone().map(
2046 |s| expect_col(crate::udfs::apply_isnan_pyspark_parity(s)),
2047 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
2048 );
2049 Self::from_expr(expr, None)
2050 }
2051
2052 pub fn year(&self) -> Column {
2056 let name = format!("year({})", self.name());
2057 use polars::prelude::*;
2058 let parsed = self.expr().clone().map(
2059 |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2060 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2061 );
2062 Self::from_expr(parsed.dt().year().alias(&name), Some(name))
2063 }
2064
2065 pub fn month(&self) -> Column {
2067 let name = format!("month({})", self.name());
2068 use polars::prelude::*;
2069 let parsed = self.expr().clone().map(
2070 |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2071 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2072 );
2073 let month_expr = parsed.dt().month().cast(DataType::Int32);
2074 Self::from_expr(month_expr.alias(&name), Some(name))
2075 }
2076
2077 pub fn day(&self) -> Column {
2079 Self::from_expr(self.expr().clone().dt().day(), None)
2080 }
2081
2082 pub fn dayofmonth(&self) -> Column {
2084 let name = format!("dayofmonth({})", self.name());
2085 use polars::prelude::*;
2086 let parsed = self.expr().clone().map(
2087 |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2088 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2089 );
2090 let day_expr = parsed.dt().day().cast(DataType::Int32);
2091 Self::from_expr(day_expr.alias(&name), Some(name))
2092 }
2093
2094 pub fn quarter(&self) -> Column {
2096 Self::from_expr(self.expr().clone().dt().quarter(), None)
2097 }
2098
2099 pub fn weekofyear(&self) -> Column {
2101 Self::from_expr(self.expr().clone().dt().week(), None)
2102 }
2103
2104 pub fn week(&self) -> Column {
2106 self.weekofyear()
2107 }
2108
2109 pub fn dayofweek(&self) -> Column {
2112 use polars::prelude::*;
2113 let parsed = self.expr().clone().map(
2114 |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2115 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2116 );
2117 let w = parsed.dt().weekday().cast(DataType::Int32);
2118 let dayofweek = ((w % lit(7i32)) + lit(1i32)).cast(DataType::Int32);
2119 let name = format!("dayofweek({})", self.name());
2120 Self::from_expr(dayofweek.alias(&name), Some(name))
2121 }
2122
2123 pub fn dayofyear(&self) -> Column {
2125 Self::from_expr(
2126 self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
2127 None,
2128 )
2129 }
2130
2131 pub fn to_date(&self) -> Column {
2133 use polars::prelude::DataType;
2134 Self::from_expr(self.expr().clone().cast(DataType::Date), None)
2135 }
2136
2137 pub fn date_format(&self, format: &str) -> Column {
2139 Self::from_expr(self.expr().clone().dt().strftime(format), None)
2140 }
2141
2142 pub fn hour(&self) -> Column {
2144 let expr = self.expr().clone().map(
2145 |s| expect_col(crate::udfs::apply_hour(s)),
2146 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2147 );
2148 Self::from_expr(expr, None)
2149 }
2150
2151 pub fn minute(&self) -> Column {
2153 let expr = self.expr().clone().map(
2154 |s| expect_col(crate::udfs::apply_minute(s)),
2155 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2156 );
2157 Self::from_expr(expr, None)
2158 }
2159
2160 pub fn second(&self) -> Column {
2162 let expr = self.expr().clone().map(
2163 |s| expect_col(crate::udfs::apply_second(s)),
2164 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2165 );
2166 Self::from_expr(expr, None)
2167 }
2168
2169 pub fn extract(&self, field: &str) -> Column {
2171 use polars::prelude::*;
2172 let e = self.expr().clone();
2173 let expr = match field.trim().to_lowercase().as_str() {
2174 "year" => e.dt().year(),
2175 "month" => e.dt().month(),
2176 "day" => e.dt().day(),
2177 "hour" => e.dt().hour(),
2178 "minute" => e.dt().minute(),
2179 "second" => e.dt().second(),
2180 "quarter" => e.dt().quarter(),
2181 "week" | "weekofyear" => e.dt().week(),
2182 "dayofweek" | "dow" => {
2183 let w = e.dt().weekday();
2184 (w % lit(7i32)) + lit(1i32)
2185 }
2186 "dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
2187 _ => e.dt().year(), };
2189 Self::from_expr(expr, None)
2190 }
2191
2192 pub fn unix_micros(&self) -> Column {
2194 use polars::prelude::*;
2195 Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
2196 }
2197
2198 pub fn unix_millis(&self) -> Column {
2200 use polars::prelude::*;
2201 let micros = self.expr().clone().cast(DataType::Int64);
2202 Self::from_expr(micros / lit(1000i64), None)
2203 }
2204
2205 pub fn unix_seconds(&self) -> Column {
2207 use polars::prelude::*;
2208 let micros = self.expr().clone().cast(DataType::Int64);
2209 Self::from_expr(micros / lit(1_000_000i64), None)
2210 }
2211
2212 pub fn dayname(&self) -> Column {
2214 let expr = self.expr().clone().map(
2215 |s| expect_col(crate::udfs::apply_dayname(s)),
2216 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2217 );
2218 Self::from_expr(expr, None)
2219 }
2220
2221 pub fn weekday(&self) -> Column {
2223 let expr = self.expr().clone().map(
2224 |s| expect_col(crate::udfs::apply_weekday(s)),
2225 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2226 );
2227 Self::from_expr(expr, None)
2228 }
2229
2230 pub fn date_add(&self, n: i32) -> Column {
2232 use polars::prelude::*;
2233 let date_expr = self.expr().clone().map(
2234 |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2235 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2236 );
2237 let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
2238 let name = format!("date_add({}, {n})", self.name());
2239 Self::from_expr((date_expr + dur).alias(&name), Some(name))
2240 }
2241
2242 pub fn date_sub(&self, n: i32) -> Column {
2244 use polars::prelude::*;
2245 let date_expr = self.expr().clone().map(
2246 |s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
2247 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2248 );
2249 let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
2250 let name = format!("date_sub({}, {n})", self.name());
2251 Self::from_expr((date_expr - dur).alias(&name), Some(name))
2252 }
2253
2254 pub fn datediff(&self, other: &Column) -> Column {
2256 use polars::prelude::*;
2257 let start = self.expr().clone().cast(DataType::Date);
2258 let end = other.expr().clone().cast(DataType::Date);
2259 let expr = (end - start).dt().total_days(false).cast(DataType::Int32);
2261 Self::from_expr(expr, None)
2262 }
2263
2264 pub fn last_day(&self) -> Column {
2266 Self::from_expr(self.expr().clone().dt().month_end(), None)
2267 }
2268
2269 pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
2271 use polars::prelude::*;
2272 let ts = self.expr().clone();
2273 let amt = amount.expr().clone().cast(DataType::Int64);
2274 let dur = match unit.trim().to_uppercase().as_str() {
2275 "DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
2276 "HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
2277 "MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
2278 "SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
2279 "WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
2280 _ => duration(DurationArgs::new().with_days(amt)),
2281 };
2282 Self::from_expr(ts + dur, None)
2283 }
2284
2285 pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
2287 let start = self.expr().clone();
2288 let end = other.expr().clone();
2289 let diff = end - start;
2290 let expr = match unit.trim().to_uppercase().as_str() {
2291 "HOUR" | "HOURS" => diff.dt().total_hours(false),
2292 "MINUTE" | "MINUTES" => diff.dt().total_minutes(false),
2293 "SECOND" | "SECONDS" => diff.dt().total_seconds(false),
2294 "DAY" | "DAYS" => diff.dt().total_days(false),
2295 _ => diff.dt().total_days(false),
2296 };
2297 Self::from_expr(expr, None)
2298 }
2299
2300 pub fn from_utc_timestamp(&self, tz: &str) -> Column {
2302 let tz = tz.to_string();
2303 let expr = self.expr().clone().map(
2304 move |s| expect_col(crate::udfs::apply_from_utc_timestamp(s, &tz)),
2305 |_schema, field| Ok(field.clone()),
2306 );
2307 Self::from_expr(expr, None)
2308 }
2309
2310 pub fn to_utc_timestamp(&self, tz: &str) -> Column {
2312 let tz = tz.to_string();
2313 let expr = self.expr().clone().map(
2314 move |s| expect_col(crate::udfs::apply_to_utc_timestamp(s, &tz)),
2315 |_schema, field| Ok(field.clone()),
2316 );
2317 Self::from_expr(expr, None)
2318 }
2319
2320 pub fn trunc(&self, format: &str) -> Column {
2324 use polars::prelude::*;
2325 let polars_duration = pyspark_trunc_format_to_polars_duration(format);
2326 let duration = polars_duration.clone();
2327 let expr = self.expr().clone().map(
2328 move |c| expect_col(crate::udfs::apply_date_trunc(c, &duration)),
2329 |_schema, field| {
2330 Ok(Field::new(
2331 field.name().clone(),
2332 DataType::Datetime(TimeUnit::Microseconds, None),
2333 ))
2334 },
2335 );
2336 Self::from_expr(expr, Some(self.name().to_string()))
2337 }
2338
2339 pub fn add_months(&self, n: i32) -> Column {
2341 let expr = self.expr().clone().map(
2342 move |col| expect_col(crate::udfs::apply_add_months(col, n)),
2343 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2344 );
2345 Self::from_expr(expr, None)
2346 }
2347
2348 pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
2351 let args = [start.expr().clone()];
2352 let expr = self.expr().clone().map_many(
2353 move |cols| expect_col(crate::udfs::apply_months_between(cols, round_off)),
2354 &args,
2355 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
2356 );
2357 Self::from_expr(expr, None)
2358 }
2359
2360 pub fn next_day(&self, day_of_week: &str) -> Column {
2362 let day = day_of_week.to_string();
2363 let expr = self.expr().clone().map(
2364 move |col| expect_col(crate::udfs::apply_next_day(col, &day)),
2365 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2366 );
2367 Self::from_expr(expr, None)
2368 }
2369
2370 pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
2372 let fmt = format.map(String::from);
2373 let expr = self.expr().clone().map(
2374 move |col| expect_col(crate::udfs::apply_unix_timestamp(col, fmt.as_deref())),
2375 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2376 );
2377 Self::from_expr(expr, None)
2378 }
2379
2380 pub fn from_unixtime(&self, format: Option<&str>) -> Column {
2382 let fmt = format.map(String::from);
2383 let expr = self.expr().clone().map(
2384 move |col| expect_col(crate::udfs::apply_from_unixtime(col, fmt.as_deref())),
2385 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
2386 );
2387 Self::from_expr(expr, None)
2388 }
2389
2390 pub fn timestamp_seconds(&self) -> Column {
2392 let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
2393 .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2394 Self::from_expr(expr, None)
2395 }
2396
2397 pub fn timestamp_millis(&self) -> Column {
2399 let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
2400 .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2401 Self::from_expr(expr, None)
2402 }
2403
2404 pub fn timestamp_micros(&self) -> Column {
2406 let expr = self
2407 .expr()
2408 .clone()
2409 .cast(DataType::Int64)
2410 .cast(DataType::Datetime(TimeUnit::Microseconds, None));
2411 Self::from_expr(expr, None)
2412 }
2413
2414 pub fn unix_date(&self) -> Column {
2416 let expr = self.expr().clone().map(
2417 |s| expect_col(crate::udfs::apply_unix_date(s)),
2418 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
2419 );
2420 Self::from_expr(expr, None)
2421 }
2422
2423 pub fn date_from_unix_date(&self) -> Column {
2425 let expr = self.expr().clone().map(
2426 |s| expect_col(crate::udfs::apply_date_from_unix_date(s)),
2427 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
2428 );
2429 Self::from_expr(expr, None)
2430 }
2431
2432 pub fn pmod(&self, divisor: &Column) -> Column {
2434 let args = [divisor.expr().clone()];
2435 let expr = self.expr().clone().map_many(
2436 |cols| expect_col(crate::udfs::apply_pmod(cols)),
2437 &args,
2438 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
2439 );
2440 Self::from_expr(expr, None)
2441 }
2442
2443 pub fn factorial(&self) -> Column {
2445 let expr = self.expr().clone().map(
2446 |s| expect_col(crate::udfs::apply_factorial(s)),
2447 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2448 );
2449 Self::from_expr(expr, None)
2450 }
2451
2452 pub fn over(&self, partition_by: &[&str]) -> Column {
2457 let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2458 vec![lit(1i32)]
2459 } else {
2460 partition_by.iter().map(|s| col(*s)).collect()
2461 };
2462 Self::from_expr(self.expr().clone().over(partition_exprs), None)
2463 }
2464
2465 pub fn over_window(
2471 &self,
2472 partition_by: &[&str],
2473 order_by_encoded: &[String],
2474 use_running_aggregate: bool,
2475 is_full_partition_frame: bool,
2476 ) -> Result<Column, PolarsError> {
2477 if expr_is_or_contains_n_unique(self.expr()) && self.name.starts_with("count_distinct(") {
2479 return Err(PolarsError::InvalidOperation(
2480 "Distinct window functions are not supported".into(),
2481 ));
2482 }
2483 let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2484 vec![lit(1i32)]
2485 } else {
2486 partition_by.iter().map(|s| col(*s)).collect()
2487 };
2488
2489 if let Some(ref fl) = self.first_last_value {
2491 if fl.is_last && !order_by_encoded.is_empty() && !is_full_partition_frame {
2492 let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
2493 let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
2494 for s in order_by_encoded.iter() {
2495 let s = s.trim();
2496 let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
2497 (stripped.trim(), true)
2498 } else {
2499 (s.trim(), false)
2500 };
2501 order_exprs.push(col(name));
2502 descending_multi.push(descending);
2503 }
2504 let default_opts = SortOptions {
2505 descending: descending_multi.first().copied().unwrap_or(false),
2506 nulls_last: descending_multi.first().copied().unwrap_or(false),
2507 ..Default::default()
2508 };
2509 let expr = fl.value_expr.clone().over_with_options(
2510 Some(partition_exprs),
2511 Some((order_exprs, default_opts)),
2512 WindowMapping::default(),
2513 )?;
2514 return Ok(Self::from_expr(expr, None));
2515 }
2516 }
2517
2518 let base_expr = if use_running_aggregate {
2519 if let Some(ref src) = self.source_for_running_mean {
2520 let sum_expr = col(src).cast(DataType::Float64).cum_sum(false);
2522 let count_expr = col(src).cum_count(false).cast(DataType::Float64);
2523 sum_expr / count_expr
2524 } else if let Some(ref src) = self.source_for_running {
2525 col(src).cast(DataType::Float64).cum_sum(false)
2528 } else if let Some(ref src) = self.source_for_running_count {
2529 col(src).cum_count(false).cast(DataType::Int64)
2531 } else {
2532 self.expr().clone()
2533 }
2534 } else {
2535 self.expr().clone()
2539 };
2540 let expr = if order_by_encoded.is_empty() {
2541 base_expr.over(partition_exprs)
2542 } else {
2543 let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
2546 let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
2547 for s in order_by_encoded.iter() {
2548 let s = s.trim();
2549 let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
2550 (stripped.trim(), true)
2551 } else {
2552 (s, false)
2553 };
2554 order_exprs.push(col(name));
2555 descending_multi.push(descending);
2556 }
2557 let default_opts = SortOptions {
2559 descending: descending_multi.first().copied().unwrap_or(false),
2560 nulls_last: descending_multi.first().copied().unwrap_or(false),
2561 ..Default::default()
2562 };
2563 base_expr.over_with_options(
2564 Some(partition_exprs),
2565 Some((order_exprs, default_opts)),
2566 WindowMapping::default(),
2567 )?
2568 };
2569 Ok(Self::from_expr(expr, None))
2570 }
2571
2572 pub fn rank(&self, descending: bool) -> Column {
2574 let opts = RankOptions {
2575 method: RankMethod::Min,
2576 descending,
2577 };
2578 Self::from_expr(self.expr().clone().rank(opts, None), None)
2579 }
2580
2581 pub fn dense_rank(&self, descending: bool) -> Column {
2583 let opts = RankOptions {
2584 method: RankMethod::Dense,
2585 descending,
2586 };
2587 Self::from_expr(self.expr().clone().rank(opts, None), None)
2588 }
2589
2590 pub fn row_number(&self, descending: bool) -> Column {
2593 use polars::prelude::*;
2594 let opts = RankOptions {
2595 method: RankMethod::Ordinal,
2596 descending,
2597 };
2598 let rank_expr = self
2600 .expr()
2601 .clone()
2602 .cast(DataType::Float64)
2603 .fill_null(lit(if descending {
2604 f64::NEG_INFINITY
2605 } else {
2606 f64::INFINITY
2607 }))
2608 .rank(opts, None);
2609 Self::from_expr(rank_expr, None)
2610 }
2611
2612 pub fn row_number_over(
2615 partition_by: &[&str],
2616 order_by_encoded: &[String],
2617 ) -> Result<Column, PolarsError> {
2618 use polars::prelude::*;
2619 if order_by_encoded.is_empty() {
2620 return Err(PolarsError::InvalidOperation(
2621 "row_number_over: order_by_encoded cannot be empty".into(),
2622 ));
2623 }
2624 let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2625 vec![lit(1i32)]
2626 } else {
2627 partition_by.iter().map(|s| col(*s)).collect()
2628 };
2629 fn parse_order_key(s: &str) -> (&str, bool) {
2631 let s = s.trim();
2632 let descending = s.starts_with('-');
2633 let name = if descending {
2634 s.trim_start_matches('-').trim()
2635 } else {
2636 s
2637 };
2638 (name, descending)
2639 }
2640 let all_asc = order_by_encoded.iter().all(|s| !s.trim().starts_with('-'));
2641 let rank_expr = if order_by_encoded.len() == 1 {
2643 let (first_name, first_desc) = parse_order_key(order_by_encoded[0].trim());
2644 let order_col = col(first_name)
2646 .cast(DataType::Float64)
2647 .fill_null(lit(if first_desc {
2648 f64::NEG_INFINITY
2649 } else {
2650 f64::INFINITY
2651 }));
2652 let rank_input = if first_desc {
2653 order_col.neg()
2654 } else {
2655 order_col
2656 };
2657 let opts = RankOptions {
2658 method: RankMethod::Ordinal,
2659 descending: false,
2660 };
2661 rank_input.rank(opts, None)
2662 } else if all_asc {
2663 let struct_fields: Vec<Expr> = order_by_encoded
2664 .iter()
2665 .map(|s| col(parse_order_key(s).0))
2666 .collect();
2667 let opts = RankOptions {
2668 method: RankMethod::Ordinal,
2669 descending: false,
2670 };
2671 as_struct(struct_fields).rank(opts, None)
2672 } else {
2673 let struct_fields: Vec<Expr> = order_by_encoded
2675 .iter()
2676 .map(|s| {
2677 let (name, desc) = parse_order_key(s);
2678 if desc {
2679 (col(name)
2680 .cast(DataType::Float64)
2681 .fill_null(lit(f64::NEG_INFINITY)))
2682 .neg()
2683 } else {
2684 col(name)
2685 .cast(DataType::Float64)
2686 .fill_null(lit(f64::INFINITY))
2687 }
2688 })
2689 .collect();
2690 let opts = RankOptions {
2691 method: RankMethod::Ordinal,
2692 descending: false,
2693 };
2694 as_struct(struct_fields).rank(opts, None)
2695 };
2696 let expr = rank_expr.over(partition_exprs);
2697 Ok(Self::from_expr(expr, None))
2698 }
2699
2700 pub fn lag(&self, n: i64) -> Column {
2702 Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
2703 }
2704
2705 pub fn lead(&self, n: i64) -> Column {
2707 Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
2708 }
2709
2710 pub fn first_value(&self) -> Column {
2713 let value_expr = self.expr().clone();
2714 Column {
2715 name: "first_value".to_string(),
2716 expr: value_expr.clone().first(),
2717 is_array_expr: false,
2718 deferred: None,
2719 udf_call: None,
2720 source_for_running: None,
2721 source_for_running_mean: None,
2722 first_last_value: Some(FirstLastValue {
2723 value_expr,
2724 is_last: false,
2725 }),
2726 source_for_running_count: None,
2727 }
2728 }
2729
2730 pub fn last_value(&self) -> Column {
2733 let value_expr = self.expr().clone();
2734 Column {
2735 name: "last_value".to_string(),
2736 expr: value_expr.clone().last(),
2737 is_array_expr: false,
2738 deferred: None,
2739 udf_call: None,
2740 source_for_running: None,
2741 source_for_running_mean: None,
2742 first_last_value: Some(FirstLastValue {
2743 value_expr,
2744 is_last: true,
2745 }),
2746 source_for_running_count: None,
2747 }
2748 }
2749
2750 pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
2752 use polars::prelude::*;
2753 let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2754 let opts = RankOptions {
2755 method: RankMethod::Min,
2756 descending,
2757 };
2758 let rank_expr = self
2759 .expr()
2760 .clone()
2761 .rank(opts, None)
2762 .over(partition_exprs.clone());
2763 let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2764 let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
2765 let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
2766 let pct = when(count_f.clone().gt(lit(0.0)))
2769 .then(rank_f / count_f)
2770 .otherwise(lit(0.0));
2771 Self::from_expr(pct, None)
2772 }
2773
2774 pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
2776 use polars::prelude::*;
2777 let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2778 let opts = RankOptions {
2779 method: RankMethod::Ordinal,
2780 descending,
2781 };
2782 let row_num = self
2783 .expr()
2784 .clone()
2785 .rank(opts, None)
2786 .over(partition_exprs.clone());
2787 let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2788 let count_f = count_expr.clone().cast(DataType::Float64);
2790 let cume = when(count_f.clone().eq(lit(0.0)))
2791 .then(lit(0.0))
2792 .otherwise(row_num.cast(DataType::Float64) / count_f);
2793 Self::from_expr(cume, None)
2794 }
2795
2796 pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
2798 use polars::prelude::*;
2799 let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
2800 vec![lit(1i32)]
2801 } else {
2802 partition_by.iter().map(|s| col(*s)).collect()
2803 };
2804 let opts = RankOptions {
2805 method: RankMethod::Ordinal,
2806 descending,
2807 };
2808 let rank_expr = self
2809 .expr()
2810 .clone()
2811 .rank(opts, None)
2812 .over(partition_exprs.clone());
2813 let count_expr = self.expr().clone().count().over(partition_exprs.clone());
2814 let n_expr = lit(n as f64);
2815 let rank_f = rank_expr.cast(DataType::Float64);
2816 let count_f = count_expr.cast(DataType::Float64);
2817 let bucket = when(count_f.clone().eq(lit(0.0))).then(lit(1.0)).otherwise(
2821 ((rank_f.clone() - lit(1.0)) * n_expr.clone() / count_f.clone()).floor() + lit(1.0),
2822 );
2823 let clamped = bucket.clip(lit(1.0), lit(n as f64));
2824 Self::from_expr(clamped.cast(DataType::Int32), None)
2825 }
2826
2827 pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
2829 use polars::prelude::*;
2830 let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
2831 let opts = RankOptions {
2832 method: RankMethod::Ordinal,
2833 descending,
2834 };
2835 let rank_expr = self
2836 .expr()
2837 .clone()
2838 .rank(opts, None)
2839 .over(partition_exprs.clone());
2840 let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
2841 let null_col = Self::from_expr(lit(NULL), None);
2842 let value_col = Self::from_expr(self.expr().clone(), None);
2843 let when_expr = crate::functions::when(&cond_col)
2844 .then(&value_col)
2845 .otherwise(&null_col)
2846 .into_expr();
2847 let windowed = when_expr.max().over(partition_exprs);
2848 Self::from_expr(windowed, None)
2849 }
2850
2851 pub fn array_size(&self) -> Column {
2853 use polars::prelude::*;
2854 Self::from_expr(
2855 self.expr().clone().list().len().cast(DataType::Int32),
2856 Some("size".to_string()),
2857 )
2858 }
2859
2860 pub fn cardinality(&self) -> Column {
2862 self.array_size()
2863 }
2864
2865 pub fn array_contains(&self, value: Expr) -> Column {
2867 use polars::prelude::*;
2868 let args = [value];
2869 let base_expr = self.expr().clone().map_many(
2870 |cols| expect_col(crate::udfs::apply_array_contains(cols)),
2871 &args,
2872 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
2873 );
2874 let is_null = self.expr().clone().is_null();
2876 let expr = when(is_null)
2877 .then(lit(NULL))
2878 .otherwise(base_expr)
2879 .cast(DataType::Boolean);
2880 Self::from_expr(expr, None)
2881 }
2882
2883 pub fn array_join(&self, separator: &str) -> Column {
2885 use polars::prelude::*;
2886 let elem_to_str = col("").cast(DataType::String);
2889 let list_expr = self.expr().clone().list().eval(elem_to_str);
2890 let joined = list_expr.list().join(lit(separator.to_string()), false);
2891 Self::from_expr(joined, None)
2892 }
2893
2894 pub fn array_max(&self) -> Column {
2896 Self::from_expr(self.expr().clone().list().max(), None)
2897 }
2898
2899 pub fn array_min(&self) -> Column {
2901 Self::from_expr(self.expr().clone().list().min(), None)
2902 }
2903
2904 pub fn element_at(&self, index: i64) -> Column {
2906 use polars::prelude::*;
2907 let idx = if index >= 1 { index - 1 } else { index };
2909 Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
2910 }
2911
2912 pub fn get_item(&self, index: i64) -> Column {
2914 use polars::prelude::*;
2915 Self::from_expr(self.expr().clone().list().get(lit(index), true), None)
2916 }
2917
2918 pub fn get_field(&self, name: &str) -> Column {
2920 Self::from_expr(
2921 self.expr().clone().struct_().field_by_name(name),
2922 Some(name.to_string()),
2923 )
2924 }
2925
2926 pub fn with_field(&self, name: &str, value: &Column) -> Column {
2931 self.try_with_field(name, value)
2932 .expect("with_field: column must be struct type")
2933 }
2934
2935 pub fn try_with_field(
2940 &self,
2941 name: &str,
2942 value: &Column,
2943 ) -> Result<Column, polars::error::PolarsError> {
2944 use polars::prelude::PlSmallStr;
2945 let field_name = name.to_string();
2946 let field_name_schema = field_name.clone();
2947 let args = [value.expr().clone()];
2948 let expr = self.expr().clone().map_many(
2949 move |cols| {
2950 expect_col(crate::udfs::apply_struct_with_field(
2952 cols[0].clone(),
2953 cols[1].clone(),
2954 &field_name,
2955 ))
2956 },
2957 &args,
2958 move |_schema, fields| {
2959 let struct_field = &fields[0];
2960 let struct_dtype = struct_field.dtype();
2961 let inner: &[Field] = match struct_dtype {
2962 DataType::Struct(f) => f.as_ref(),
2963 _ => return Ok(struct_field.clone()),
2964 };
2965 let value_dtype = fields[1].dtype().clone();
2966 let known_value_dtype = if value_dtype.is_known() {
2967 value_dtype
2968 } else if let DataType::Unknown(uk) = &value_dtype {
2969 uk.materialize().unwrap_or(DataType::String)
2970 } else {
2971 DataType::String
2972 };
2973 let mut new_fields: Vec<Field> = inner.to_vec();
2974 let mut replaced = false;
2975 for f in &mut new_fields {
2976 if f.name.as_str() == field_name_schema {
2977 let dtype = if known_value_dtype.is_known() {
2980 known_value_dtype.clone()
2981 } else if f.dtype.is_known() {
2982 f.dtype.clone()
2983 } else {
2984 DataType::String
2985 };
2986 *f = Field::new(PlSmallStr::from(f.name.as_str()), dtype);
2987 replaced = true;
2988 break;
2989 }
2990 }
2991 if !replaced {
2992 new_fields.push(Field::new(
2993 PlSmallStr::from(field_name_schema.as_str()),
2994 known_value_dtype,
2995 ));
2996 }
2997 let out_dtype = DataType::Struct(new_fields);
2998 Ok(Field::new(struct_field.name().clone(), out_dtype))
2999 },
3000 );
3001 Ok(Self::from_expr(expr, None))
3002 }
3003
3004 pub fn array_sort(&self) -> Column {
3006 use polars::prelude::SortOptions;
3007 let opts = SortOptions {
3008 descending: false,
3009 nulls_last: true,
3010 ..Default::default()
3011 };
3012 Self::from_expr(self.expr().clone().list().sort(opts), None)
3013 }
3014
3015 pub fn array_distinct(&self) -> Column {
3017 let expr = self.expr().clone().map(
3018 |s| expect_col(crate::udfs::apply_array_distinct_first_order(s)),
3019 |_schema, field| {
3020 let new_name = format!("array_distinct({})", field.name());
3021 Ok(Field::new(new_name.into(), field.dtype().clone()))
3022 },
3023 );
3024 Self::from_expr(expr, None)
3025 }
3026
3027 pub fn mode(&self) -> Column {
3030 let vc = self
3034 .expr()
3035 .clone()
3036 .value_counts(true, false, "count", false);
3037 let first_struct = vc.first();
3038 let val_expr = first_struct.struct_().field_by_index(0);
3039 Self::from_expr(val_expr, Some("mode".to_string()))
3040 }
3041
3042 pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
3044 use polars::prelude::*;
3045 let start_expr = lit((start - 1).max(0)); let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
3047 Self::from_expr(
3048 self.expr().clone().list().slice(start_expr, length_expr),
3049 None,
3050 )
3051 }
3052
3053 pub fn explode(&self) -> Column {
3055 use polars::prelude::ExplodeOptions;
3056 Self::from_expr(
3057 self.expr().clone().explode(ExplodeOptions {
3058 empty_as_null: false,
3059 keep_nulls: false,
3060 }),
3061 None,
3062 )
3063 }
3064
3065 pub fn explode_outer(&self) -> Column {
3067 use polars::prelude::ExplodeOptions;
3068 Self::from_expr(
3069 self.expr().clone().explode(ExplodeOptions {
3070 empty_as_null: true,
3071 keep_nulls: true,
3072 }),
3073 None,
3074 )
3075 }
3076
3077 pub fn posexplode_outer(&self) -> (Column, Column) {
3086 use polars::prelude::{ExplodeOptions, as_struct};
3087
3088 let opts = ExplodeOptions {
3089 empty_as_null: true,
3090 keep_nulls: true,
3091 };
3092
3093 let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
3096 let val_inner = col("").alias("col");
3097 let struct_expr = as_struct(vec![pos_inner, val_inner]);
3098
3099 let list_struct_expr = self.expr().clone().list().eval(struct_expr);
3101 let struct_exploded = list_struct_expr.explode(opts);
3103
3104 let pos_expr = struct_exploded.clone().struct_().field_by_name("pos");
3105 let val_expr = struct_exploded.struct_().field_by_name("col");
3106
3107 (
3108 Self::from_expr(pos_expr, Some("pos".to_string())),
3109 Self::from_expr(val_expr, Some("col".to_string())),
3110 )
3111 }
3112
3113 pub fn arrays_zip(&self, other: &Column) -> Column {
3115 let args = [other.expr().clone()];
3116 let expr = self.expr().clone().map_many(
3117 |cols| expect_col(crate::udfs::apply_arrays_zip(cols)),
3118 &args,
3119 |_schema, fields| Ok(fields[0].clone()),
3120 );
3121 Self::from_expr(expr, None)
3122 }
3123
3124 pub fn arrays_overlap(&self, other: &Column) -> Column {
3126 use polars::prelude::*;
3127
3128 let args = [other.expr().clone()];
3129 let base_expr = self.expr().clone().map_many(
3130 |cols| expect_col(crate::udfs::apply_arrays_overlap(cols)),
3131 &args,
3132 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
3133 );
3134
3135 let is_null = self
3138 .expr()
3139 .clone()
3140 .is_null()
3141 .or(other.expr().clone().is_null());
3142 let expr = polars::prelude::when(is_null)
3143 .then(lit(NULL))
3144 .otherwise(base_expr)
3145 .cast(DataType::Boolean);
3146
3147 Self::from_expr(expr, None)
3148 }
3149
3150 pub fn array_agg(&self) -> Column {
3152 Self::from_expr(self.expr().clone().implode(), None)
3153 }
3154
3155 pub fn array_position(&self, value: Expr) -> Column {
3158 use polars::prelude::{DataType, NULL};
3159 let cond = Self::from_expr(col("").eq(value), None);
3161 let then_val = Self::from_expr(col("").cum_count(false), None);
3162 let else_val = Self::from_expr(lit(NULL), None);
3163 let idx_expr = crate::functions::when(&cond)
3164 .then(&then_val)
3165 .otherwise(&else_val)
3166 .into_expr();
3167 let list_expr = self
3168 .expr()
3169 .clone()
3170 .list()
3171 .eval(idx_expr)
3172 .list()
3173 .min()
3174 .fill_null(lit(0i64))
3175 .cast(DataType::Int64);
3176 Self::from_expr(list_expr, Some("array_position".to_string()))
3177 }
3178
3179 pub fn array_compact(&self) -> Column {
3181 let list_expr = self.expr().clone().list().drop_nulls();
3182 Self::from_expr(list_expr, None)
3183 }
3184
3185 pub fn array_remove(&self, value: Expr) -> Column {
3188 use polars::prelude::NULL;
3189 let cond = Self::from_expr(col("").neq(value), None);
3191 let then_val = Self::from_expr(col(""), None);
3192 let else_val = Self::from_expr(lit(NULL), None);
3193 let elem_neq = crate::functions::when(&cond)
3194 .then(&then_val)
3195 .otherwise(&else_val)
3196 .into_expr();
3197 let list_expr = self
3198 .expr()
3199 .clone()
3200 .list()
3201 .eval(elem_neq)
3202 .list()
3203 .drop_nulls();
3204 Self::from_expr(list_expr, None)
3205 }
3206
3207 pub fn array_repeat(&self, n: i64) -> Column {
3209 let expr = self.expr().clone().map(
3210 move |c| expect_col(crate::udfs::apply_array_repeat(c, n)),
3211 |_schema, field| Ok(field.clone()),
3212 );
3213 Self::from_expr(expr, None)
3214 }
3215
3216 pub fn array_flatten(&self) -> Column {
3218 let expr = self.expr().clone().map(
3219 |s| expect_col(crate::udfs::apply_array_flatten(s)),
3220 |_schema, field| Ok(field.clone()),
3221 );
3222 Self::from_expr(expr, None)
3223 }
3224
3225 pub fn array_append(&self, elem: &Column) -> Column {
3227 let args = [elem.expr().clone()];
3228 let expr = self.expr().clone().map_many(
3229 |cols| expect_col(crate::udfs::apply_array_append(cols)),
3230 &args,
3231 |_schema, fields| Ok(fields[0].clone()),
3232 );
3233 Self::from_expr(expr, None)
3234 }
3235
3236 pub fn array_prepend(&self, elem: &Column) -> Column {
3238 let args = [elem.expr().clone()];
3239 let expr = self.expr().clone().map_many(
3240 |cols| expect_col(crate::udfs::apply_array_prepend(cols)),
3241 &args,
3242 |_schema, fields| Ok(fields[0].clone()),
3243 );
3244 Self::from_expr(expr, None)
3245 }
3246
3247 pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
3249 let args = [pos.expr().clone(), elem.expr().clone()];
3250 let expr = self.expr().clone().map_many(
3251 |cols| expect_col(crate::udfs::apply_array_insert(cols)),
3252 &args,
3253 |_schema, fields| Ok(fields[0].clone()),
3254 );
3255 Self::from_expr(expr, None)
3256 }
3257
3258 pub fn array_except(&self, other: &Column) -> Column {
3260 let args = [other.expr().clone()];
3261 let expr = self.expr().clone().map_many(
3262 |cols| expect_col(crate::udfs::apply_array_except(cols)),
3263 &args,
3264 |_schema, fields| Ok(fields[0].clone()),
3265 );
3266 Self::from_expr(expr, None)
3267 }
3268
3269 pub fn array_intersect(&self, other: &Column) -> Column {
3271 let args = [other.expr().clone()];
3272 let expr = self.expr().clone().map_many(
3273 |cols| expect_col(crate::udfs::apply_array_intersect(cols)),
3274 &args,
3275 |_schema, fields| Ok(fields[0].clone()),
3276 );
3277 Self::from_expr(expr, None)
3278 }
3279
3280 pub fn array_union(&self, other: &Column) -> Column {
3282 let args = [other.expr().clone()];
3283 let expr = self.expr().clone().map_many(
3284 |cols| expect_col(crate::udfs::apply_array_union(cols)),
3285 &args,
3286 |_schema, fields| Ok(fields[0].clone()),
3287 );
3288 Self::from_expr(expr, None)
3289 }
3290
3291 pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
3294 let args = [other.expr().clone()];
3295 let zip_expr = self.expr().clone().map_many(
3296 |cols| expect_col(crate::udfs::apply_zip_arrays_to_struct(cols)),
3297 &args,
3298 |_schema, fields| {
3299 let left_inner = match &fields[0].dtype {
3300 DataType::List(inner) => *inner.clone(),
3301 _ => DataType::Unknown(Default::default()),
3302 };
3303 let right_inner = match fields.get(1).map(|f| &f.dtype) {
3304 Some(DataType::List(inner)) => *inner.clone(),
3305 _ => DataType::Unknown(Default::default()),
3306 };
3307 let struct_dtype = DataType::Struct(vec![
3308 Field::new("left".into(), left_inner),
3309 Field::new("right".into(), right_inner),
3310 ]);
3311 Ok(Field::new(
3312 fields[0].name().clone(),
3313 DataType::List(Box::new(struct_dtype)),
3314 ))
3315 },
3316 );
3317 let list_expr = zip_expr.list().eval(merge);
3318 Self::from_expr(list_expr, None)
3319 }
3320
3321 pub fn array_exists(&self, predicate: Expr) -> Column {
3323 let pred_expr = self.expr().clone().list().eval(predicate).list().any();
3324 Self::from_expr(pred_expr, Some("exists".to_string()))
3325 }
3326
3327 pub fn array_forall(&self, predicate: Expr) -> Column {
3329 let pred_expr = self.expr().clone().list().eval(predicate).list().all();
3330 Self::from_expr(pred_expr, Some("forall".to_string()))
3331 }
3332
3333 pub fn array_filter(&self, predicate: Expr) -> Column {
3335 use polars::prelude::NULL;
3336 let then_val = Self::from_expr(col(""), None);
3337 let else_val = Self::from_expr(lit(NULL), None);
3338 let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
3339 .then(&then_val)
3340 .otherwise(&else_val)
3341 .into_expr();
3342 let list_expr = self
3343 .expr()
3344 .clone()
3345 .list()
3346 .eval(elem_expr)
3347 .list()
3348 .drop_nulls();
3349 Self::from_expr(list_expr, None)
3350 }
3351
3352 pub fn array_transform(&self, f: Expr) -> Column {
3354 let list_expr = self.expr().clone().list().eval(f);
3355 Self::from_expr(list_expr, None)
3356 }
3357
3358 pub fn array_sum(&self) -> Column {
3360 Self::from_expr(self.expr().clone().list().sum(), None)
3361 }
3362
3363 pub fn array_aggregate(&self, zero: &Column) -> Column {
3365 let sum_expr = self.expr().clone().list().sum();
3366 Self::from_expr(sum_expr + zero.expr().clone(), None)
3367 }
3368
3369 pub fn array_mean(&self) -> Column {
3371 Self::from_expr(self.expr().clone().list().mean(), None)
3372 }
3373
3374 pub fn posexplode(&self) -> (Column, Column) {
3383 use polars::prelude::{ExplodeOptions, as_struct};
3384
3385 let opts = ExplodeOptions {
3386 empty_as_null: false,
3387 keep_nulls: false,
3388 };
3389
3390 let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
3393 let val_inner = col("").alias("col");
3394 let struct_expr = as_struct(vec![pos_inner, val_inner]);
3395
3396 let list_struct_expr = self.expr().clone().list().eval(struct_expr);
3398 let struct_exploded = list_struct_expr.explode(opts);
3400
3401 let pos_expr = struct_exploded.clone().struct_().field_by_name("pos");
3402 let val_expr = struct_exploded.struct_().field_by_name("col");
3403
3404 (
3405 Self::from_expr(pos_expr, Some("pos".to_string())),
3406 Self::from_expr(val_expr, Some("col".to_string())),
3407 )
3408 }
3409
3410 pub fn map_keys(&self) -> Column {
3412 let elem_key = col("").struct_().field_by_name("key");
3413 let list_expr = self.expr().clone().list().eval(elem_key);
3414 Self::from_expr(list_expr, None)
3415 }
3416
3417 pub fn map_values(&self) -> Column {
3419 let elem_val = col("").struct_().field_by_name("value");
3420 let list_expr = self.expr().clone().list().eval(elem_val);
3421 Self::from_expr(list_expr, None)
3422 }
3423
3424 pub fn map_entries(&self) -> Column {
3426 Self::from_expr(self.expr().clone(), None)
3427 }
3428
3429 pub fn map_from_arrays(&self, values: &Column) -> Column {
3431 let args = [values.expr().clone()];
3432 let expr = self.expr().clone().map_many(
3433 |cols| expect_col(crate::udfs::apply_map_from_arrays(cols)),
3434 &args,
3435 |_schema, fields| Ok(fields[0].clone()),
3436 );
3437 Self::from_expr(expr, None)
3438 }
3439
3440 pub fn map_concat(&self, other: &Column) -> Column {
3442 let args = [other.expr().clone()];
3443 let expr = self.expr().clone().map_many(
3444 |cols| expect_col(crate::udfs::apply_map_concat(cols)),
3445 &args,
3446 |_schema, fields| Ok(fields[0].clone()),
3447 );
3448 Self::from_expr(expr, None)
3449 }
3450
3451 pub fn transform_keys(&self, key_expr: Expr) -> Column {
3453 use polars::prelude::as_struct;
3454 let value = col("").struct_().field_by_name("value");
3455 let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
3456 let list_expr = self.expr().clone().list().eval(new_struct);
3457 Self::from_expr(list_expr, None)
3458 }
3459
3460 pub fn transform_values(&self, value_expr: Expr) -> Column {
3462 use polars::prelude::as_struct;
3463 let key = col("").struct_().field_by_name("key");
3464 let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
3465 let list_expr = self.expr().clone().list().eval(new_struct);
3466 Self::from_expr(list_expr, None)
3467 }
3468
3469 pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
3472 use polars::prelude::as_struct;
3473 let args = [other.expr().clone()];
3474 let zip_expr = self.expr().clone().map_many(
3475 |cols| expect_col(crate::udfs::apply_map_zip_to_struct(cols)),
3476 &args,
3477 |_schema, fields| {
3478 let list_inner = match &fields[0].dtype {
3479 DataType::List(inner) => *inner.clone(),
3480 _ => return Ok(fields[0].clone()),
3481 };
3482 let (key_dtype, value_dtype) = match &list_inner {
3483 DataType::Struct(struct_fields) => {
3484 let k = struct_fields
3485 .iter()
3486 .find(|f| f.name.as_str() == "key")
3487 .map(|f| f.dtype.clone())
3488 .unwrap_or(DataType::String);
3489 let v = struct_fields
3490 .iter()
3491 .find(|f| f.name.as_str() == "value")
3492 .map(|f| f.dtype.clone())
3493 .unwrap_or(DataType::String);
3494 (k, v)
3495 }
3496 _ => (DataType::String, DataType::String),
3497 };
3498 let out_struct = DataType::Struct(vec![
3499 Field::new("key".into(), key_dtype),
3500 Field::new("value1".into(), value_dtype.clone()),
3501 Field::new("value2".into(), value_dtype),
3502 ]);
3503 Ok(Field::new(
3504 fields[0].name().clone(),
3505 DataType::List(Box::new(out_struct)),
3506 ))
3507 },
3508 );
3509 let key_field = col("").struct_().field_by_name("key").alias("key");
3510 let value_field = merge.alias("value");
3511 let merge_expr = as_struct(vec![key_field, value_field]);
3512 let list_expr = zip_expr.list().eval(merge_expr);
3513 Self::from_expr(list_expr, None)
3514 }
3515
3516 pub fn map_filter(&self, predicate: Expr) -> Column {
3519 use polars::prelude::NULL;
3520 let then_val = Self::from_expr(col(""), None);
3521 let else_val = Self::from_expr(lit(NULL), None);
3522 let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
3523 .then(&then_val)
3524 .otherwise(&else_val)
3525 .into_expr();
3526 let list_expr = self
3527 .expr()
3528 .clone()
3529 .list()
3530 .eval(elem_expr)
3531 .list()
3532 .drop_nulls();
3533 Self::from_expr(list_expr, None)
3534 }
3535
3536 pub fn map_from_entries(&self) -> Column {
3538 Self::from_expr(self.expr().clone(), None)
3539 }
3540
3541 pub fn map_contains_key(&self, key: &Column) -> Column {
3543 let args = [key.expr().clone()];
3544 let expr = self.expr().clone().map_many(
3545 |cols| expect_col(crate::udfs::apply_map_contains_key(cols)),
3546 &args,
3547 |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
3548 );
3549 Self::from_expr(expr, None)
3550 }
3551
3552 pub fn get(&self, key: &Column) -> Column {
3554 let args = [key.expr().clone()];
3555 let expr = self.expr().clone().map_many(
3556 |cols| expect_col(crate::udfs::apply_get(cols)),
3557 &args,
3558 |_schema, fields| {
3559 let dtype = &fields[0].dtype;
3560 let value_dtype = match dtype {
3561 DataType::List(inner) => match inner.as_ref() {
3562 DataType::Struct(struct_fields) => struct_fields
3563 .iter()
3564 .find(|f| f.name == "value")
3565 .map(|f| f.dtype.clone())
3566 .unwrap_or(DataType::String),
3567 _ => DataType::String,
3568 },
3569 DataType::Struct(struct_fields) => struct_fields
3570 .first()
3571 .map(|f| f.dtype.clone())
3572 .unwrap_or(DataType::String),
3573 _ => DataType::String,
3574 };
3575 Ok(Field::new(fields[0].name().clone(), value_dtype))
3576 },
3577 );
3578 Self::from_expr(expr, None)
3579 }
3580
3581 pub fn get_json_object(&self, path: &str) -> Column {
3583 let path = path.to_string();
3584 let expr = self
3585 .expr()
3586 .clone()
3587 .map(
3588 move |s| expect_col(crate::udfs::apply_get_json_object(s, &path)),
3589 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3590 )
3591 .cast(DataType::String);
3592 Self::from_expr(expr, None)
3593 }
3594
3595 pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
3597 use polars::prelude::DataType;
3598 let dtype = schema.unwrap_or(DataType::String);
3599 let out = self.expr().clone().str().json_decode(dtype);
3600 Self::from_expr(out, None)
3601 }
3602
3603 pub fn to_json(&self) -> Column {
3605 let out = self.expr().clone().struct_().json_encode();
3606 Self::from_expr(out, None)
3607 }
3608
3609 pub fn json_array_length(&self, path: &str) -> Column {
3611 let path = path.to_string();
3612 let expr = self.expr().clone().map(
3613 move |s| expect_col(crate::udfs::apply_json_array_length(s, &path)),
3614 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
3615 );
3616 Self::from_expr(expr, None)
3617 }
3618
3619 pub fn json_object_keys(&self) -> Column {
3621 let expr = self.expr().clone().map(
3622 |s| expect_col(crate::udfs::apply_json_object_keys(s)),
3623 |_schema, field| {
3624 Ok(Field::new(
3625 field.name().clone(),
3626 DataType::List(Box::new(DataType::String)),
3627 ))
3628 },
3629 );
3630 Self::from_expr(expr, None)
3631 }
3632
3633 pub fn json_tuple(&self, keys: &[&str]) -> Column {
3635 let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
3636 let struct_fields: Vec<polars::datatypes::Field> = keys_vec
3637 .iter()
3638 .map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
3639 .collect();
3640 let expr = self.expr().clone().map(
3641 move |s| expect_col(crate::udfs::apply_json_tuple(s, &keys_vec)),
3642 move |_schema, field| {
3643 Ok(Field::new(
3644 field.name().clone(),
3645 DataType::Struct(struct_fields.clone()),
3646 ))
3647 },
3648 );
3649 Self::from_expr(expr, None)
3650 }
3651
3652 pub fn from_csv(&self) -> Column {
3654 let expr = self.expr().clone().map(
3655 |s| expect_col(crate::udfs::apply_from_csv(s)),
3656 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Struct(vec![]))),
3657 );
3658 Self::from_expr(expr, None)
3659 }
3660
3661 pub fn to_csv(&self) -> Column {
3663 let expr = self.expr().clone().map(
3664 |s| expect_col(crate::udfs::apply_to_csv(s)),
3665 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3666 );
3667 Self::from_expr(expr, None)
3668 }
3669
3670 pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
3673 let part = part.to_string();
3674 let key_owned = key.map(String::from);
3675 let expr = self.expr().clone().map(
3676 move |s| expect_col(crate::udfs::apply_parse_url(s, &part, key_owned.as_deref())),
3677 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3678 );
3679 Self::from_expr(expr, None)
3680 }
3681
3682 pub fn hash(&self) -> Column {
3684 let expr = self.expr().clone().map(
3685 |s| expect_col(crate::udfs::apply_hash_one(s)),
3686 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
3687 );
3688 Self::from_expr(expr, None)
3689 }
3690
3691 pub fn isin(&self, other: &Column) -> Column {
3693 let out = self.expr().clone().is_in(other.expr().clone(), false);
3694 Self::from_expr(out, None)
3695 }
3696
3697 pub fn url_decode(&self) -> Column {
3699 let expr = self.expr().clone().map(
3700 |s| expect_col(crate::udfs::apply_url_decode(s)),
3701 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3702 );
3703 Self::from_expr(expr, None)
3704 }
3705
3706 pub fn url_encode(&self) -> Column {
3708 let expr = self.expr().clone().map(
3709 |s| expect_col(crate::udfs::apply_url_encode(s)),
3710 |_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
3711 );
3712 Self::from_expr(expr, None)
3713 }
3714
3715 pub fn shift_left(&self, n: i32) -> Column {
3717 use polars::prelude::*;
3718 let pow = lit(2i64).pow(lit(n as i64));
3719 Self::from_expr(
3720 (self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
3721 None,
3722 )
3723 }
3724
3725 pub fn shift_right(&self, n: i32) -> Column {
3727 use polars::prelude::*;
3728 let pow = lit(2i64).pow(lit(n as i64));
3729 Self::from_expr(
3730 (self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
3731 None,
3732 )
3733 }
3734
3735 pub fn shift_right_unsigned(&self, n: i32) -> Column {
3737 let expr = self.expr().clone().map(
3738 move |s| expect_col(crate::udfs::apply_shift_right_unsigned(s, n)),
3739 |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
3740 );
3741 Self::from_expr(expr, None)
3742 }
3743}
3744
3745#[cfg(test)]
3746mod tests {
3747 use super::Column;
3748 use polars::prelude::{IntoLazy, col, df, lit};
3749
3750 fn test_df() -> polars::prelude::DataFrame {
3752 df!(
3753 "a" => &[1, 2, 3, 4, 5],
3754 "b" => &[10, 20, 30, 40, 50]
3755 )
3756 .unwrap()
3757 }
3758
3759 fn test_df_with_nulls() -> polars::prelude::DataFrame {
3761 df!(
3762 "a" => &[Some(1), Some(2), None, Some(4), None],
3763 "b" => &[Some(10), None, Some(30), None, None]
3764 )
3765 .unwrap()
3766 }
3767
3768 #[test]
3769 fn test_column_new() {
3770 let column = Column::new("age".to_string());
3771 assert_eq!(column.name(), "age");
3772 }
3773
3774 #[test]
3775 fn test_column_from_expr() {
3776 let expr = col("test");
3777 let column = Column::from_expr(expr, Some("test".to_string()));
3778 assert_eq!(column.name(), "test");
3779 }
3780
3781 #[test]
3782 fn test_column_from_expr_default_name() {
3783 let expr = col("test").gt(lit(5));
3784 let column = Column::from_expr(expr, None);
3785 assert_eq!(column.name(), "<expr>");
3786 }
3787
3788 #[test]
3789 fn test_column_alias() {
3790 let column = Column::new("original".to_string());
3791 let aliased = column.alias("new_name");
3792 assert_eq!(aliased.name(), "new_name");
3793 }
3794
3795 #[test]
3796 fn test_column_gt() {
3797 let df = test_df();
3798 let column = Column::new("a".to_string());
3799 let result = column.gt(lit(3));
3800
3801 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3803 assert_eq!(filtered.height(), 2); }
3805
3806 #[test]
3807 fn test_column_lt() {
3808 let df = test_df();
3809 let column = Column::new("a".to_string());
3810 let result = column.lt(lit(3));
3811
3812 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3813 assert_eq!(filtered.height(), 2); }
3815
3816 #[test]
3817 fn test_column_eq() {
3818 let df = test_df();
3819 let column = Column::new("a".to_string());
3820 let result = column.eq(lit(3));
3821
3822 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3823 assert_eq!(filtered.height(), 1); }
3825
3826 #[test]
3827 fn test_column_neq() {
3828 let df = test_df();
3829 let column = Column::new("a".to_string());
3830 let result = column.neq(lit(3));
3831
3832 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3833 assert_eq!(filtered.height(), 4); }
3835
3836 #[test]
3837 fn test_column_gt_eq() {
3838 let df = test_df();
3839 let column = Column::new("a".to_string());
3840 let result = column.gt_eq(lit(3));
3841
3842 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3843 assert_eq!(filtered.height(), 3); }
3845
3846 #[test]
3847 fn test_column_lt_eq() {
3848 let df = test_df();
3849 let column = Column::new("a".to_string());
3850 let result = column.lt_eq(lit(3));
3851
3852 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3853 assert_eq!(filtered.height(), 3); }
3855
3856 #[test]
3857 fn test_column_is_null() {
3858 let df = test_df_with_nulls();
3859 let column = Column::new("a".to_string());
3860 let result = column.is_null();
3861
3862 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3863 assert_eq!(filtered.height(), 2); }
3865
3866 #[test]
3867 fn test_column_is_not_null() {
3868 let df = test_df_with_nulls();
3869 let column = Column::new("a".to_string());
3870 let result = column.is_not_null();
3871
3872 let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
3873 assert_eq!(filtered.height(), 3); }
3875
3876 #[test]
3877 fn test_null_boolean_column_produces_null_bool_series() {
3878 let df = test_df();
3879 let expr = Column::null_boolean().into_expr();
3880 let out = df
3881 .lazy()
3882 .select([expr.alias("null_bool")])
3883 .collect()
3884 .unwrap();
3885 let s = out.column("null_bool").unwrap();
3886 assert_eq!(s.dtype(), &polars::prelude::DataType::Boolean);
3887 assert_eq!(s.null_count(), s.len());
3888 }
3889
3890 #[test]
3891 fn test_eq_null_safe_both_null() {
3892 let df = df!(
3894 "a" => &[Some(1), None, Some(3)],
3895 "b" => &[Some(1), None, Some(4)]
3896 )
3897 .unwrap();
3898
3899 let col_a = Column::new("a".to_string());
3900 let col_b = Column::new("b".to_string());
3901 let result = col_a.eq_null_safe(&col_b);
3902
3903 let result_df = df
3905 .lazy()
3906 .with_column(result.into_expr().alias("eq_null_safe"))
3907 .collect()
3908 .unwrap();
3909
3910 let eq_col = result_df.column("eq_null_safe").unwrap();
3912 let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
3913
3914 assert_eq!(values[0], Some(true));
3918 assert_eq!(values[1], Some(true)); assert_eq!(values[2], Some(false));
3920 }
3921
3922 #[test]
3923 fn test_eq_null_safe_one_null() {
3924 let df = df!(
3926 "a" => &[Some(1), None, Some(3)],
3927 "b" => &[Some(1), Some(2), None]
3928 )
3929 .unwrap();
3930
3931 let col_a = Column::new("a".to_string());
3932 let col_b = Column::new("b".to_string());
3933 let result = col_a.eq_null_safe(&col_b);
3934
3935 let result_df = df
3936 .lazy()
3937 .with_column(result.into_expr().alias("eq_null_safe"))
3938 .collect()
3939 .unwrap();
3940
3941 let eq_col = result_df.column("eq_null_safe").unwrap();
3942 let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
3943
3944 assert_eq!(values[0], Some(true));
3948 assert_eq!(values[1], Some(false));
3949 assert_eq!(values[2], Some(false));
3950 }
3951}