use polars::prelude::{
col, lit, DataType, Expr, GetOutput, ListNameSpaceExtension, PlSmallStr, RankMethod,
RankOptions, TimeUnit,
};
use std::sync::Arc;
fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
let mut out = String::with_capacity(pattern.len() * 2);
let mut it = pattern.chars();
while let Some(c) = it.next() {
if escape_char == Some(c) {
if let Some(next) = it.next() {
if "\\.*+?[](){}^$|".contains(next) {
out.push('\\');
}
out.push(next);
} else {
out.push('\\');
out.push(c);
}
} else {
match c {
'%' => out.push_str(".*"),
'_' => out.push('.'),
'\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
| '|' => {
out.push('\\');
out.push(c);
}
_ => out.push(c),
}
}
}
format!("^{out}$")
}
#[derive(Debug, Clone, Copy)]
pub enum DeferredRandom {
Rand(Option<u64>),
Randn(Option<u64>),
}
#[derive(Debug, Clone)]
pub struct Column {
name: String,
expr: Expr, pub(crate) deferred: Option<DeferredRandom>,
pub(crate) udf_call: Option<(String, Vec<Column>)>,
}
impl Column {
pub fn new(name: String) -> Self {
Column {
name: name.clone(),
expr: col(&name),
deferred: None,
udf_call: None,
}
}
pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
let display_name = name.unwrap_or_else(|| "<expr>".to_string());
Column {
name: display_name,
expr,
deferred: None,
udf_call: None,
}
}
pub fn from_udf_call(name: String, args: Vec<Column>) -> Self {
Column {
name: format!("{name}()"),
expr: lit(0i32), deferred: None,
udf_call: Some((name, args)),
}
}
pub fn from_rand(seed: Option<u64>) -> Self {
let expr = lit(1i64).cum_sum(false).map(
move |c| crate::udfs::apply_rand_with_seed(c, seed),
GetOutput::from_type(DataType::Float64),
);
Column {
name: "rand".to_string(),
expr,
deferred: Some(DeferredRandom::Rand(seed)),
udf_call: None,
}
}
pub fn from_randn(seed: Option<u64>) -> Self {
let expr = lit(1i64).cum_sum(false).map(
move |c| crate::udfs::apply_randn_with_seed(c, seed),
GetOutput::from_type(DataType::Float64),
);
Column {
name: "randn".to_string(),
expr,
deferred: Some(DeferredRandom::Randn(seed)),
udf_call: None,
}
}
pub fn expr(&self) -> &Expr {
&self.expr
}
pub fn into_expr(self) -> Expr {
self.expr
}
pub fn name(&self) -> &str {
&self.name
}
pub fn alias(&self, name: &str) -> Column {
Column {
name: name.to_string(),
expr: self.expr.clone().alias(name),
deferred: self.deferred,
udf_call: self.udf_call.clone(),
}
}
pub fn asc(&self) -> crate::functions::SortOrder {
crate::functions::asc(self)
}
pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
crate::functions::asc_nulls_first(self)
}
pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
crate::functions::asc_nulls_last(self)
}
pub fn desc(&self) -> crate::functions::SortOrder {
crate::functions::desc(self)
}
pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
crate::functions::desc_nulls_first(self)
}
pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
crate::functions::desc_nulls_last(self)
}
pub fn is_null(&self) -> Column {
Column {
name: format!("({} IS NULL)", self.name),
expr: self.expr.clone().is_null(),
deferred: None,
udf_call: None,
}
}
pub fn is_not_null(&self) -> Column {
Column {
name: format!("({} IS NOT NULL)", self.name),
expr: self.expr.clone().is_not_null(),
deferred: None,
udf_call: None,
}
}
pub fn isnull(&self) -> Column {
self.is_null()
}
pub fn isnotnull(&self) -> Column {
self.is_not_null()
}
fn null_boolean_expr() -> Expr {
use polars::prelude::*;
lit(NULL).cast(DataType::Boolean)
}
pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
let regex = like_pattern_to_regex(pattern, escape_char);
self.regexp_like(®ex)
}
pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
use polars::prelude::*;
let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
}
pub fn eq_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let eq_result = self.expr().clone().eq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(eq_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn ne_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let ne_result = self.expr().clone().neq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(ne_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn eq_null_safe(&self, other: &Column) -> Column {
use crate::functions::{lit_bool, when};
let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(
self.expr().clone(),
other.expr().clone(),
)
.unwrap_or_else(|_| (self.expr().clone(), other.expr().clone()));
let left_null = left_c.clone().is_null();
let right_null = right_c.clone().is_null();
let both_null = left_null.clone().and(right_null.clone());
let either_null = left_null.clone().or(right_null.clone());
let eq_result = left_c.eq(right_c);
when(&Self::from_expr(both_null, None))
.then(&lit_bool(true))
.otherwise(
&when(&Self::from_expr(either_null, None))
.then(&lit_bool(false))
.otherwise(&Self::from_expr(eq_result, None)),
)
}
pub fn gt_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let gt_result = self.expr().clone().gt(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(gt_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn ge_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let ge_result = self.expr().clone().gt_eq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(ge_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn lt_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let lt_result = self.expr().clone().lt(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(lt_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn le_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let le_result = self.expr().clone().lt_eq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(le_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn gt(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().gt(other), None)
}
pub fn gt_eq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().gt_eq(other), None)
}
pub fn lt(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().lt(other), None)
}
pub fn lt_eq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().lt_eq(other), None)
}
pub fn eq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().eq(other), None)
}
pub fn neq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().neq(other), None)
}
pub fn upper(&self) -> Column {
Self::from_expr(self.expr().clone().str().to_uppercase(), None)
}
pub fn lower(&self) -> Column {
Self::from_expr(self.expr().clone().str().to_lowercase(), None)
}
pub fn lcase(&self) -> Column {
self.lower()
}
pub fn ucase(&self) -> Column {
self.upper()
}
pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
use polars::prelude::*;
if length.map(|l| l < 1).unwrap_or(false) {
return Self::from_expr(lit(""), None);
}
let len_chars = self.expr().clone().str().len_chars();
let offset_expr = if start >= 1 {
lit((start - 1).max(0))
} else {
let from_end = len_chars + lit(start);
when(from_end.clone().lt(lit(0i64)))
.then(lit(0i64))
.otherwise(from_end)
};
let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
Self::from_expr(
self.expr().clone().str().slice(offset_expr, length_expr),
None,
)
}
pub fn length(&self) -> Column {
Self::from_expr(self.expr().clone().str().len_chars(), None)
}
pub fn bit_length(&self) -> Column {
use polars::prelude::*;
let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
Self::from_expr(len_bytes * lit(8i32), None)
}
pub fn octet_length(&self) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().len_bytes().cast(DataType::Int32),
None,
)
}
pub fn char_length(&self) -> Column {
self.length()
}
pub fn character_length(&self) -> Column {
self.length()
}
pub fn encode(&self, charset: &str) -> Column {
let charset = charset.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_encode(s, &charset),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn decode(&self, charset: &str) -> Column {
let charset = charset.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_decode(s, &charset),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn to_binary(&self, fmt: &str) -> Column {
let fmt = fmt.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_to_binary(s, &fmt),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn try_to_binary(&self, fmt: &str) -> Column {
let fmt = fmt.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_try_to_binary(s, &fmt),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn aes_encrypt(&self, key: &str) -> Column {
let key = key.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_aes_encrypt(s, &key),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn aes_decrypt(&self, key: &str) -> Column {
let key = key.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_aes_decrypt(s, &key),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn try_aes_decrypt(&self, key: &str) -> Column {
let key = key.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_try_aes_decrypt(s, &key),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn typeof_(&self) -> Column {
Self::from_expr(
self.expr().clone().map(
crate::udfs::apply_typeof,
GetOutput::from_type(DataType::String),
),
None,
)
}
pub fn trim(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().str().strip_chars(lit(" \t\n\r")), None)
}
pub fn ltrim(&self) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().strip_chars_start(lit(" \t\n\r")),
None,
)
}
pub fn rtrim(&self) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().strip_chars_end(lit(" \t\n\r")),
None,
)
}
pub fn btrim(&self, trim_str: Option<&str>) -> Column {
use polars::prelude::*;
let chars = trim_str.unwrap_or(" \t\n\r");
Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
}
pub fn locate(&self, substr: &str, pos: i64) -> Column {
use polars::prelude::*;
if substr.is_empty() {
return Self::from_expr(lit(1i64), None);
}
let start = (pos - 1).max(0);
let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
let found = slice_expr.str().find_literal(lit(substr.to_string()));
Self::from_expr(
(found.cast(DataType::Int64) + lit(start + 1)).fill_null(lit(0i64)),
None,
)
}
pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_conv(s, from_base, to_base),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn hex(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_hex,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn unhex(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_unhex,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn bin(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_bin,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn getbit(&self, pos: i64) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_getbit(s, pos),
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn bit_and(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().cast(DataType::Int64).map_many(
crate::udfs::apply_bit_and,
&args,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn bit_or(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().cast(DataType::Int64).map_many(
crate::udfs::apply_bit_or,
&args,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn bit_xor(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().cast(DataType::Int64).map_many(
crate::udfs::apply_bit_xor,
&args,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn bit_count(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_bit_count,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
let msg = err_msg.map(String::from);
let expr = self.expr().clone().map(
move |c| crate::udfs::apply_assert_true(c, msg.as_deref()),
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn bitwise_not(&self) -> Column {
let expr = (lit(-1i64) - self.expr().clone().cast(DataType::Int64)).cast(DataType::Int64);
Self::from_expr(expr, None)
}
pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
let pair_delim = pair_delim.to_string();
let key_value_delim = key_value_delim.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_str_to_map(s, &pair_delim, &key_value_delim),
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
fn pattern_has_lookaround(pattern: &str) -> bool {
let p = pattern.as_bytes();
let n = p.len();
let mut i = 0;
while i + 2 < n {
if p[i] == b'(' && p[i + 1] == b'?' {
match p[i + 2] {
b'=' | b'!' => return true, b'<' if i + 4 <= n && (p[i + 3] == b'=' || p[i + 3] == b'!') => return true, _ => {}
}
}
i += 1;
}
false
}
pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
use polars::prelude::*;
if Self::pattern_has_lookaround(pattern) {
let pat = pattern.to_string();
let group = group_index;
Self::from_expr(
self.expr().clone().map(
move |s| crate::udfs::apply_regexp_extract_lookaround(s, &pat, group),
GetOutput::from_type(DataType::String),
),
None,
)
} else {
let pat = pattern.to_string();
Self::from_expr(
self.expr().clone().str().extract(lit(pat), group_index),
None,
)
}
}
pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
use polars::prelude::*;
let pat = pattern.to_string();
let rep = replacement.to_string();
Self::from_expr(
self.expr().clone().str().replace(lit(pat), lit(rep), false),
None,
)
}
pub fn left(&self, n: i64) -> Column {
use polars::prelude::*;
let len = n.max(0) as u32;
Self::from_expr(
self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
None,
)
}
pub fn right(&self, n: i64) -> Column {
use polars::prelude::*;
let n_val = n.max(0);
let n_expr = lit(n_val);
let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
.then(lit(0i64))
.otherwise(len_chars - n_expr.clone());
Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
}
pub fn replace(&self, search: &str, replacement: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().replace_all(
lit(search.to_string()),
lit(replacement.to_string()),
true,
),
None,
)
}
pub fn replace_many(&self, pairs: &[(String, String)]) -> Column {
let mut out = self.clone();
for (search, replacement) in pairs {
out = out.replace(search, replacement);
}
out
}
pub fn startswith(&self, prefix: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.starts_with(lit(prefix.to_string())),
None,
)
}
pub fn endswith(&self, suffix: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().ends_with(lit(suffix.to_string())),
None,
)
}
pub fn contains(&self, substring: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.contains(lit(substring.to_string()), true),
None,
)
}
pub fn split(&self, delimiter: &str, limit: Option<i32>) -> Column {
use polars::prelude::*;
let use_limit = limit.is_some_and(|l| l > 0);
if use_limit {
let delim = delimiter.to_string();
let lim = limit.unwrap_or(0);
let expr = self.expr().clone().map(
move |col| crate::udfs::apply_split_with_limit(col, &delim, lim),
GetOutput::from_type(DataType::List(Box::new(DataType::String))),
);
Self::from_expr(expr, None)
} else {
Self::from_expr(
self.expr().clone().str().split(lit(delimiter.to_string())),
None,
)
}
}
pub fn initcap(&self) -> Column {
Self::from_expr(self.expr().clone().str().to_lowercase(), None)
}
pub fn regexp_extract_all(&self, pattern: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.extract_all(lit(pattern.to_string())),
None,
)
}
pub fn regexp_like(&self, pattern: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.contains(lit(pattern.to_string()), false),
None,
)
}
pub fn regexp_count(&self, pattern: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.count_matches(lit(pattern.to_string()), false)
.cast(DataType::Int64),
None,
)
}
pub fn regexp_substr(&self, pattern: &str) -> Column {
self.regexp_extract(pattern, 0)
}
pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
let idx = group_idx.unwrap_or(0);
let pattern = pattern.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_regexp_instr(s, pattern.clone(), idx),
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn find_in_set(&self, set_column: &Column) -> Column {
let args = [set_column.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_find_in_set,
&args,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn repeat(&self, n: i32) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.repeat_by(lit(n as u32))
.list()
.join(lit(""), false),
None,
)
}
pub fn reverse(&self) -> Column {
Self::from_expr(self.expr().clone().str().reverse(), None)
}
pub fn instr(&self, substr: &str) -> Column {
use polars::prelude::*;
let found = self
.expr()
.clone()
.str()
.find_literal(lit(substr.to_string()));
Self::from_expr(
(found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
None,
)
}
pub fn lpad(&self, length: i32, pad: &str) -> Column {
let pad_str = if pad.is_empty() { " " } else { pad };
let fill = pad_str.chars().next().unwrap_or(' ');
Self::from_expr(
self.expr().clone().str().pad_start(length as usize, fill),
None,
)
}
pub fn rpad(&self, length: i32, pad: &str) -> Column {
let pad_str = if pad.is_empty() { " " } else { pad };
let fill = pad_str.chars().next().unwrap_or(' ');
Self::from_expr(
self.expr().clone().str().pad_end(length as usize, fill),
None,
)
}
pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
use polars::prelude::*;
let mut e = self.expr().clone();
let from_chars: Vec<char> = from_str.chars().collect();
let to_chars: Vec<char> = to_str.chars().collect();
for (i, fc) in from_chars.iter().enumerate() {
let f = fc.to_string();
let t = to_chars
.get(i)
.map(|c| c.to_string())
.unwrap_or_else(String::new); e = e.str().replace_all(lit(f), lit(t), true);
}
Self::from_expr(e, None)
}
pub fn mask(
&self,
upper_char: Option<char>,
lower_char: Option<char>,
digit_char: Option<char>,
other_char: Option<char>,
) -> Column {
use polars::prelude::*;
let upper = upper_char.unwrap_or('X').to_string();
let lower = lower_char.unwrap_or('x').to_string();
let digit = digit_char.unwrap_or('n').to_string();
let other = other_char.map(|c| c.to_string());
let mut e = self
.expr()
.clone()
.str()
.replace_all(lit("[A-Z]".to_string()), lit(upper), false)
.str()
.replace_all(lit("[a-z]".to_string()), lit(lower), false)
.str()
.replace_all(lit(r"\d".to_string()), lit(digit), false);
if let Some(o) = other {
e = e
.str()
.replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
}
Self::from_expr(e, None)
}
pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
use polars::prelude::*;
if part_num == 0 {
return Self::from_expr(Expr::Literal(LiteralValue::Null), None);
}
let use_regex = delimiter == "|";
if use_regex {
let pattern = delimiter.to_string();
let part = part_num;
let get_expr = self.expr().clone().map(
move |col| crate::udfs::apply_split_part_regex(col, &pattern, part),
GetOutput::from_type(DataType::String),
);
let expr = when(self.expr().clone().is_null())
.then(Expr::Literal(LiteralValue::Null))
.otherwise(get_expr.fill_null(lit("")));
return Self::from_expr(expr, None);
}
let delim = delimiter.to_string();
let split_expr = self.expr().clone().str().split(lit(delim));
let index = if part_num > 0 {
lit(part_num - 1)
} else {
lit(part_num)
};
let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
let expr = when(self.expr().clone().is_null())
.then(Expr::Literal(LiteralValue::Null))
.otherwise(get_expr);
Self::from_expr(expr, None)
}
pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
use polars::prelude::*;
let delim = delimiter.to_string();
let split_expr = self.expr().clone().str().split(lit(delim.clone()));
let n = count.unsigned_abs() as i64;
let expr = if count > 0 {
split_expr
.clone()
.list()
.slice(lit(0i64), lit(n))
.list()
.join(lit(delim), false)
} else {
let len = split_expr.clone().list().len();
let start = when(len.clone().gt(lit(n)))
.then(len.clone() - lit(n))
.otherwise(lit(0i64));
let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
split_expr
.list()
.slice(start, slice_len)
.list()
.join(lit(delim), false)
};
Self::from_expr(expr, None)
}
pub fn soundex(&self) -> Column {
let expr = self
.expr()
.clone()
.map(crate::udfs::apply_soundex, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn levenshtein(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_levenshtein,
&args,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn crc32(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_crc32,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn xxhash64(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_xxhash64,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn ascii(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_ascii,
GetOutput::from_type(DataType::Int32),
);
Self::from_expr(expr, None)
}
pub fn format_number(&self, decimals: u32) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_format_number(s, decimals),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn char(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_char,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn chr(&self) -> Column {
self.char()
}
pub fn base64(&self) -> Column {
let expr = self
.expr()
.clone()
.map(crate::udfs::apply_base64, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn unbase64(&self) -> Column {
let expr = self
.expr()
.clone()
.map(crate::udfs::apply_unbase64, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn sha1(&self) -> Column {
let expr = self
.expr()
.clone()
.map(crate::udfs::apply_sha1, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn sha2(&self, bit_length: i32) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_sha2(s, bit_length),
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn md5(&self) -> Column {
let expr = self
.expr()
.clone()
.map(crate::udfs::apply_md5, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
use polars::prelude::*;
let pos = pos.max(1);
let replace_len = length.max(0);
let start_left = 0i64;
let len_left = (pos - 1).max(0);
let start_right = (pos - 1 + replace_len).max(0);
let len_right = 1_000_000i64; let left = self
.expr()
.clone()
.str()
.slice(lit(start_left), lit(len_left));
let mid = lit(replace.to_string());
let right = self
.expr()
.clone()
.str()
.slice(lit(start_right), lit(len_right));
let exprs = [left, mid, right];
let concat_expr = polars::prelude::concat_str(&exprs, "", false);
Self::from_expr(concat_expr, None)
}
pub fn abs(&self) -> Column {
Self::from_expr(self.expr().clone().abs(), None)
}
pub fn ceil(&self) -> Column {
Self::from_expr(self.expr().clone().ceil(), None)
}
pub fn ceiling(&self) -> Column {
self.ceil()
}
pub fn floor(&self) -> Column {
Self::from_expr(self.expr().clone().floor(), None)
}
pub fn round(&self, decimals: u32) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_round(s, decimals),
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn bround(&self, scale: i32) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_bround(s, scale),
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn negate(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone() * lit(-1), None)
}
pub fn multiply_pyspark(&self, other: &Column) -> Column {
use polars::prelude::GetOutput;
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_pyspark_multiply,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn add_pyspark(&self, other: &Column) -> Column {
use polars::prelude::GetOutput;
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_pyspark_add,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn subtract_pyspark(&self, other: &Column) -> Column {
use polars::prelude::GetOutput;
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_pyspark_subtract,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn divide_pyspark(&self, other: &Column) -> Column {
use polars::prelude::GetOutput;
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_pyspark_divide,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn mod_pyspark(&self, other: &Column) -> Column {
use polars::prelude::GetOutput;
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_pyspark_mod,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn multiply(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() * other.expr().clone(), None)
}
pub fn add(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() + other.expr().clone(), None)
}
pub fn subtract(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() - other.expr().clone(), None)
}
pub fn divide(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() / other.expr().clone(), None)
}
pub fn mod_(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() % other.expr().clone(), None)
}
pub fn sqrt(&self) -> Column {
Self::from_expr(self.expr().clone().sqrt(), None)
}
pub fn pow(&self, exp: i64) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().pow(lit(exp)), None)
}
pub fn pow_with(&self, exponent: &Column) -> Column {
Self::from_expr(self.expr().clone().pow(exponent.expr().clone()), None)
}
pub fn power(&self, exp: i64) -> Column {
self.pow(exp)
}
pub fn exp(&self) -> Column {
Self::from_expr(self.expr().clone().exp(), None)
}
pub fn log(&self) -> Column {
Self::from_expr(self.expr().clone().log(std::f64::consts::E), None)
}
pub fn ln(&self) -> Column {
self.log()
}
pub fn sin(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_sin,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn cos(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_cos,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn tan(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_tan,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn cot(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_cot,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn csc(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_csc,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn sec(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_sec,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn asin(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_asin,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn acos(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_acos,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn atan(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_atan,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn atan2(&self, x: &Column) -> Column {
let args = [x.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_atan2,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn degrees(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_degrees,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn to_degrees(&self) -> Column {
self.degrees()
}
pub fn radians(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_radians,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn to_radians(&self) -> Column {
self.radians()
}
pub fn signum(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_signum,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn cosh(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_cosh,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn sinh(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_sinh,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn tanh(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_tanh,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn acosh(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_acosh,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn asinh(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_asinh,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn atanh(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_atanh,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn cbrt(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_cbrt,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn expm1(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_expm1,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn log1p(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_log1p,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn log10(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_log10,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn log2(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_log2,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn rint(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_rint,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn hypot(&self, other: &Column) -> Column {
let xx = self.expr().clone() * self.expr().clone();
let yy = other.expr().clone() * other.expr().clone();
Self::from_expr((xx + yy).sqrt(), None)
}
pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
crate::functions::cast(self, type_name)
}
pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
crate::functions::try_cast(self, type_name)
}
pub fn is_nan(&self) -> Column {
Self::from_expr(self.expr().clone().is_nan(), None)
}
pub fn year(&self) -> Column {
Self::from_expr(self.expr().clone().dt().year(), None)
}
pub fn month(&self) -> Column {
Self::from_expr(self.expr().clone().dt().month(), None)
}
pub fn day(&self) -> Column {
Self::from_expr(self.expr().clone().dt().day(), None)
}
pub fn dayofmonth(&self) -> Column {
self.day()
}
pub fn quarter(&self) -> Column {
Self::from_expr(self.expr().clone().dt().quarter(), None)
}
pub fn weekofyear(&self) -> Column {
Self::from_expr(self.expr().clone().dt().week(), None)
}
pub fn week(&self) -> Column {
self.weekofyear()
}
pub fn dayofweek(&self) -> Column {
let w = self.expr().clone().dt().weekday();
let dayofweek = (w % lit(7i32)) + lit(1i32); Self::from_expr(dayofweek, None)
}
pub fn dayofyear(&self) -> Column {
Self::from_expr(
self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
None,
)
}
pub fn to_date(&self) -> Column {
use polars::prelude::DataType;
Self::from_expr(self.expr().clone().cast(DataType::Date), None)
}
pub fn date_format(&self, format: &str) -> Column {
Self::from_expr(self.expr().clone().dt().strftime(format), None)
}
pub fn hour(&self) -> Column {
use polars::prelude::GetOutput;
let expr = self.expr().clone().map(
crate::udfs::apply_hour,
GetOutput::from_type(DataType::Int32),
);
Self::from_expr(expr, None)
}
pub fn minute(&self) -> Column {
use polars::prelude::GetOutput;
let expr = self.expr().clone().map(
crate::udfs::apply_minute,
GetOutput::from_type(DataType::Int32),
);
Self::from_expr(expr, None)
}
pub fn second(&self) -> Column {
use polars::prelude::GetOutput;
let expr = self.expr().clone().map(
crate::udfs::apply_second,
GetOutput::from_type(DataType::Int32),
);
Self::from_expr(expr, None)
}
pub fn extract(&self, field: &str) -> Column {
use polars::prelude::*;
let e = self.expr().clone();
let expr = match field.trim().to_lowercase().as_str() {
"year" => e.dt().year(),
"month" => e.dt().month(),
"day" => e.dt().day(),
"hour" => e.dt().hour(),
"minute" => e.dt().minute(),
"second" => e.dt().second(),
"quarter" => e.dt().quarter(),
"week" | "weekofyear" => e.dt().week(),
"dayofweek" | "dow" => {
let w = e.dt().weekday();
(w % lit(7i32)) + lit(1i32)
}
"dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
_ => e.dt().year(), };
Self::from_expr(expr, None)
}
pub fn unix_micros(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
}
pub fn unix_millis(&self) -> Column {
use polars::prelude::*;
let micros = self.expr().clone().cast(DataType::Int64);
Self::from_expr(micros / lit(1000i64), None)
}
pub fn unix_seconds(&self) -> Column {
use polars::prelude::*;
let micros = self.expr().clone().cast(DataType::Int64);
Self::from_expr(micros / lit(1_000_000i64), None)
}
pub fn dayname(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_dayname,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn weekday(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_weekday,
GetOutput::from_type(DataType::Int32),
);
Self::from_expr(expr, None)
}
pub fn date_add(&self, n: i32) -> Column {
use polars::prelude::*;
let date_expr = self.expr().clone().cast(DataType::Date);
let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
Self::from_expr(date_expr + dur, None)
}
pub fn date_sub(&self, n: i32) -> Column {
use polars::prelude::*;
let date_expr = self.expr().clone().cast(DataType::Date);
let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
Self::from_expr(date_expr - dur, None)
}
pub fn datediff(&self, other: &Column) -> Column {
use polars::prelude::*;
let start = self.expr().clone().cast(DataType::Date);
let end = other.expr().clone().cast(DataType::Date);
Self::from_expr((end - start).dt().total_days(), None)
}
pub fn last_day(&self) -> Column {
Self::from_expr(self.expr().clone().dt().month_end(), None)
}
pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
use polars::prelude::*;
let ts = self.expr().clone();
let amt = amount.expr().clone().cast(DataType::Int64);
let dur = match unit.trim().to_uppercase().as_str() {
"DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
"HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
"MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
"SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
"WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
_ => duration(DurationArgs::new().with_days(amt)),
};
Self::from_expr(ts + dur, None)
}
pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
let start = self.expr().clone();
let end = other.expr().clone();
let diff = end - start;
let expr = match unit.trim().to_uppercase().as_str() {
"HOUR" | "HOURS" => diff.dt().total_hours(),
"MINUTE" | "MINUTES" => diff.dt().total_minutes(),
"SECOND" | "SECONDS" => diff.dt().total_seconds(),
"DAY" | "DAYS" => diff.dt().total_days(),
_ => diff.dt().total_days(),
};
Self::from_expr(expr, None)
}
pub fn from_utc_timestamp(&self, tz: &str) -> Column {
let tz = tz.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_from_utc_timestamp(s, &tz),
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn to_utc_timestamp(&self, tz: &str) -> Column {
let tz = tz.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_to_utc_timestamp(s, &tz),
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn trunc(&self, format: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().dt().truncate(lit(format.to_string())),
None,
)
}
pub fn add_months(&self, n: i32) -> Column {
let expr = self.expr().clone().map(
move |col| crate::udfs::apply_add_months(col, n),
GetOutput::from_type(DataType::Date),
);
Self::from_expr(expr, None)
}
pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
let args = [start.expr().clone()];
let expr = self.expr().clone().map_many(
move |cols| crate::udfs::apply_months_between(cols, round_off),
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn next_day(&self, day_of_week: &str) -> Column {
let day = day_of_week.to_string();
let expr = self.expr().clone().map(
move |col| crate::udfs::apply_next_day(col, &day),
GetOutput::from_type(DataType::Date),
);
Self::from_expr(expr, None)
}
pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
let fmt = format.map(String::from);
let expr = self.expr().clone().map(
move |col| crate::udfs::apply_unix_timestamp(col, fmt.as_deref()),
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn from_unixtime(&self, format: Option<&str>) -> Column {
let fmt = format.map(String::from);
let expr = self.expr().clone().map(
move |col| crate::udfs::apply_from_unixtime(col, fmt.as_deref()),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn timestamp_seconds(&self) -> Column {
let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
.cast(DataType::Datetime(TimeUnit::Microseconds, None));
Self::from_expr(expr, None)
}
pub fn timestamp_millis(&self) -> Column {
let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
.cast(DataType::Datetime(TimeUnit::Microseconds, None));
Self::from_expr(expr, None)
}
pub fn timestamp_micros(&self) -> Column {
let expr = self
.expr()
.clone()
.cast(DataType::Int64)
.cast(DataType::Datetime(TimeUnit::Microseconds, None));
Self::from_expr(expr, None)
}
pub fn unix_date(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_unix_date,
GetOutput::from_type(DataType::Int32),
);
Self::from_expr(expr, None)
}
pub fn date_from_unix_date(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_date_from_unix_date,
GetOutput::from_type(DataType::Date),
);
Self::from_expr(expr, None)
}
pub fn pmod(&self, divisor: &Column) -> Column {
let args = [divisor.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_pmod,
&args,
GetOutput::from_type(DataType::Float64),
);
Self::from_expr(expr, None)
}
pub fn factorial(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_factorial,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn over(&self, partition_by: &[&str]) -> Column {
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
Self::from_expr(self.expr().clone().over(partition_exprs), None)
}
pub fn rank(&self, descending: bool) -> Column {
let opts = RankOptions {
method: RankMethod::Min,
descending,
};
Self::from_expr(self.expr().clone().rank(opts, None), None)
}
pub fn dense_rank(&self, descending: bool) -> Column {
let opts = RankOptions {
method: RankMethod::Dense,
descending,
};
Self::from_expr(self.expr().clone().rank(opts, None), None)
}
pub fn row_number(&self, descending: bool) -> Column {
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
Self::from_expr(self.expr().clone().rank(opts, None), None)
}
pub fn lag(&self, n: i64) -> Column {
Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
}
pub fn lead(&self, n: i64) -> Column {
Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
}
pub fn first_value(&self) -> Column {
Self::from_expr(self.expr().clone().first(), None)
}
pub fn last_value(&self) -> Column {
Self::from_expr(self.expr().clone().last(), None)
}
pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Min,
descending,
};
let rank_expr = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let count_expr = self.expr().clone().count().over(partition_exprs.clone());
let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
let pct = rank_f / count_f;
Self::from_expr(pct, None)
}
pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let row_num = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let count_expr = self.expr().clone().count().over(partition_exprs.clone());
let cume = row_num / count_expr;
Self::from_expr(cume.cast(DataType::Float64), None)
}
pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let rank_expr = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let count_expr = self.expr().clone().count().over(partition_exprs.clone());
let n_expr = lit(n as f64);
let rank_f = rank_expr.cast(DataType::Float64);
let count_f = count_expr.cast(DataType::Float64);
let bucket = (rank_f * n_expr / count_f).ceil();
let clamped = bucket.clip(lit(1.0), lit(n as f64));
Self::from_expr(clamped.cast(DataType::Int32), None)
}
pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let rank_expr = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
let null_col = Self::from_expr(Expr::Literal(LiteralValue::Null), None);
let value_col = Self::from_expr(self.expr().clone(), None);
let when_expr = crate::functions::when(&cond_col)
.then(&value_col)
.otherwise(&null_col)
.into_expr();
let windowed = when_expr.max().over(partition_exprs);
Self::from_expr(windowed, None)
}
pub fn array_size(&self) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().list().len().cast(DataType::Int32),
Some("size".to_string()),
)
}
pub fn cardinality(&self) -> Column {
self.array_size()
}
pub fn array_contains(&self, value: Expr) -> Column {
Self::from_expr(self.expr().clone().list().contains(value), None)
}
pub fn array_join(&self, separator: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.list()
.join(lit(separator.to_string()), false),
None,
)
}
pub fn array_max(&self) -> Column {
Self::from_expr(self.expr().clone().list().max(), None)
}
pub fn array_min(&self) -> Column {
Self::from_expr(self.expr().clone().list().min(), None)
}
pub fn element_at(&self, index: i64) -> Column {
use polars::prelude::*;
let idx = if index >= 1 { index - 1 } else { index };
Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
}
pub fn get_item(&self, index: i64) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().list().get(lit(index), true), None)
}
pub fn get_field(&self, name: &str) -> Column {
Self::from_expr(
self.expr().clone().struct_().field_by_name(name),
Some(name.to_string()),
)
}
pub fn with_field(&self, name: &str, value: &Column) -> Column {
self.try_with_field(name, value)
.expect("with_field: column must be struct type")
}
pub fn try_with_field(
&self,
name: &str,
value: &Column,
) -> Result<Column, polars::error::PolarsError> {
let fields = vec![
Expr::Field(Arc::from([PlSmallStr::from("*")])),
value.expr().clone().alias(name),
];
let expr = self.expr().clone().struct_().with_fields(fields)?;
Ok(Self::from_expr(expr, None))
}
pub fn array_sort(&self) -> Column {
use polars::prelude::SortOptions;
let opts = SortOptions {
descending: false,
nulls_last: true,
..Default::default()
};
Self::from_expr(self.expr().clone().list().sort(opts), None)
}
pub fn array_distinct(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_array_distinct_first_order,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn mode(&self) -> Column {
let vc = self
.expr()
.clone()
.value_counts(true, false, "count", false);
let first_struct = vc.first();
let val_expr = first_struct.struct_().field_by_index(0);
Self::from_expr(val_expr, Some("mode".to_string()))
}
pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
use polars::prelude::*;
let start_expr = lit((start - 1).max(0)); let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
Self::from_expr(
self.expr().clone().list().slice(start_expr, length_expr),
None,
)
}
pub fn explode(&self) -> Column {
Self::from_expr(self.expr().clone().explode(), None)
}
pub fn explode_outer(&self) -> Column {
Self::from_expr(self.expr().clone().explode(), None)
}
pub fn posexplode_outer(&self) -> (Column, Column) {
self.posexplode()
}
pub fn arrays_zip(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_arrays_zip,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn arrays_overlap(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_arrays_overlap,
&args,
GetOutput::from_type(DataType::Boolean),
);
Self::from_expr(expr, None)
}
pub fn array_agg(&self) -> Column {
Self::from_expr(self.expr().clone().implode(), None)
}
pub fn array_position(&self, value: Expr) -> Column {
use polars::prelude::{DataType, NULL};
let cond = Self::from_expr(col("").eq(value), None);
let then_val = Self::from_expr(col("").cum_count(false), None);
let else_val = Self::from_expr(lit(NULL), None);
let idx_expr = crate::functions::when(&cond)
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(idx_expr, false)
.list()
.min()
.fill_null(lit(0i64))
.cast(DataType::Int64);
Self::from_expr(list_expr, Some("array_position".to_string()))
}
pub fn array_compact(&self) -> Column {
let list_expr = self.expr().clone().list().drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn array_remove(&self, value: Expr) -> Column {
use polars::prelude::NULL;
let cond = Self::from_expr(col("").neq(value), None);
let then_val = Self::from_expr(col(""), None);
let else_val = Self::from_expr(lit(NULL), None);
let elem_neq = crate::functions::when(&cond)
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(elem_neq, false)
.list()
.drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn array_repeat(&self, n: i64) -> Column {
let expr = self.expr().clone().map(
move |c| crate::udfs::apply_array_repeat(c, n),
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn array_flatten(&self) -> Column {
let expr = self
.expr()
.clone()
.map(crate::udfs::apply_array_flatten, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn array_append(&self, elem: &Column) -> Column {
let args = [elem.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_array_append,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn array_prepend(&self, elem: &Column) -> Column {
let args = [elem.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_array_prepend,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
let args = [pos.expr().clone(), elem.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_array_insert,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn array_except(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_array_except,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn array_intersect(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_array_intersect,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn array_union(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_array_union,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
let args = [other.expr().clone()];
let zip_expr = self.expr().clone().map_many(
crate::udfs::apply_zip_arrays_to_struct,
&args,
GetOutput::same_type(),
);
let list_expr = zip_expr.list().eval(merge, false);
Self::from_expr(list_expr, None)
}
pub fn array_exists(&self, predicate: Expr) -> Column {
let pred_expr = self
.expr()
.clone()
.list()
.eval(predicate, false)
.list()
.any();
Self::from_expr(pred_expr, Some("exists".to_string()))
}
pub fn array_forall(&self, predicate: Expr) -> Column {
let pred_expr = self
.expr()
.clone()
.list()
.eval(predicate, false)
.list()
.all();
Self::from_expr(pred_expr, Some("forall".to_string()))
}
pub fn array_filter(&self, predicate: Expr) -> Column {
use polars::prelude::NULL;
let then_val = Self::from_expr(col(""), None);
let else_val = Self::from_expr(lit(NULL), None);
let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(elem_expr, false)
.list()
.drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn array_transform(&self, f: Expr) -> Column {
let list_expr = self.expr().clone().list().eval(f, false);
Self::from_expr(list_expr, None)
}
pub fn array_sum(&self) -> Column {
Self::from_expr(self.expr().clone().list().sum(), None)
}
pub fn array_aggregate(&self, zero: &Column) -> Column {
let sum_expr = self.expr().clone().list().sum();
Self::from_expr(sum_expr + zero.expr().clone(), None)
}
pub fn array_mean(&self) -> Column {
Self::from_expr(self.expr().clone().list().mean(), None)
}
pub fn posexplode(&self) -> (Column, Column) {
let pos_expr = self
.expr()
.clone()
.list()
.eval(col("").cum_count(false), false)
.explode();
let val_expr = self.expr().clone().explode();
(
Self::from_expr(pos_expr, Some("pos".to_string())),
Self::from_expr(val_expr, Some("col".to_string())),
)
}
pub fn map_keys(&self) -> Column {
let elem_key = col("").struct_().field_by_name("key");
let list_expr = self.expr().clone().list().eval(elem_key, false);
Self::from_expr(list_expr, None)
}
pub fn map_values(&self) -> Column {
let elem_val = col("").struct_().field_by_name("value");
let list_expr = self.expr().clone().list().eval(elem_val, false);
Self::from_expr(list_expr, None)
}
pub fn map_entries(&self) -> Column {
Self::from_expr(self.expr().clone(), None)
}
pub fn map_from_arrays(&self, values: &Column) -> Column {
let args = [values.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_map_from_arrays,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn map_concat(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_map_concat,
&args,
GetOutput::same_type(),
);
Self::from_expr(expr, None)
}
pub fn transform_keys(&self, key_expr: Expr) -> Column {
use polars::prelude::as_struct;
let value = col("").struct_().field_by_name("value");
let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
let list_expr = self.expr().clone().list().eval(new_struct, false);
Self::from_expr(list_expr, None)
}
pub fn transform_values(&self, value_expr: Expr) -> Column {
use polars::prelude::as_struct;
let key = col("").struct_().field_by_name("key");
let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
let list_expr = self.expr().clone().list().eval(new_struct, false);
Self::from_expr(list_expr, None)
}
pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
use polars::prelude::as_struct;
let args = [other.expr().clone()];
let zip_expr = self.expr().clone().map_many(
crate::udfs::apply_map_zip_to_struct,
&args,
GetOutput::same_type(),
);
let key_field = col("").struct_().field_by_name("key").alias("key");
let value_field = merge.alias("value");
let merge_expr = as_struct(vec![key_field, value_field]);
let list_expr = zip_expr.list().eval(merge_expr, false);
Self::from_expr(list_expr, None)
}
pub fn map_filter(&self, predicate: Expr) -> Column {
use polars::prelude::NULL;
let then_val = Self::from_expr(col(""), None);
let else_val = Self::from_expr(lit(NULL), None);
let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(elem_expr, false)
.list()
.drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn map_from_entries(&self) -> Column {
Self::from_expr(self.expr().clone(), None)
}
pub fn map_contains_key(&self, key: &Column) -> Column {
let args = [key.expr().clone()];
let expr = self.expr().clone().map_many(
crate::udfs::apply_map_contains_key,
&args,
GetOutput::from_type(DataType::Boolean),
);
Self::from_expr(expr, None)
}
pub fn get(&self, key: &Column) -> Column {
let args = [key.expr().clone()];
let expr =
self.expr()
.clone()
.map_many(crate::udfs::apply_get, &args, GetOutput::same_type());
Self::from_expr(expr, None)
}
pub fn get_json_object(&self, path: &str) -> Column {
let path_expr = polars::prelude::lit(path.to_string());
let out = self.expr().clone().str().json_path_match(path_expr);
Self::from_expr(out, None)
}
pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
let out = self.expr().clone().str().json_decode(schema, None);
Self::from_expr(out, None)
}
pub fn to_json(&self) -> Column {
let out = self.expr().clone().struct_().json_encode();
Self::from_expr(out, None)
}
pub fn json_array_length(&self, path: &str) -> Column {
let path = path.to_string();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_json_array_length(s, &path),
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn json_object_keys(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_json_object_keys,
GetOutput::from_type(DataType::List(Box::new(DataType::String))),
);
Self::from_expr(expr, None)
}
pub fn json_tuple(&self, keys: &[&str]) -> Column {
let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
let struct_fields: Vec<polars::datatypes::Field> = keys_vec
.iter()
.map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
.collect();
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_json_tuple(s, &keys_vec),
GetOutput::from_type(DataType::Struct(struct_fields)),
);
Self::from_expr(expr, None)
}
pub fn from_csv(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_from_csv,
GetOutput::from_type(DataType::Struct(vec![])),
);
Self::from_expr(expr, None)
}
pub fn to_csv(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_to_csv,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
let part = part.to_string();
let key_owned = key.map(String::from);
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_parse_url(s, &part, key_owned.as_deref()),
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn hash(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_hash_one,
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
pub fn isin(&self, other: &Column) -> Column {
let out = self.expr().clone().is_in(other.expr().clone());
Self::from_expr(out, None)
}
pub fn url_decode(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_url_decode,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn url_encode(&self) -> Column {
let expr = self.expr().clone().map(
crate::udfs::apply_url_encode,
GetOutput::from_type(DataType::String),
);
Self::from_expr(expr, None)
}
pub fn shift_left(&self, n: i32) -> Column {
use polars::prelude::*;
let pow = lit(2i64).pow(lit(n as i64));
Self::from_expr(
(self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
None,
)
}
pub fn shift_right(&self, n: i32) -> Column {
use polars::prelude::*;
let pow = lit(2i64).pow(lit(n as i64));
Self::from_expr(
(self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
None,
)
}
pub fn shift_right_unsigned(&self, n: i32) -> Column {
let expr = self.expr().clone().map(
move |s| crate::udfs::apply_shift_right_unsigned(s, n),
GetOutput::from_type(DataType::Int64),
);
Self::from_expr(expr, None)
}
}
#[cfg(test)]
mod tests {
use super::Column;
use polars::prelude::{col, df, lit, IntoLazy};
fn test_df() -> polars::prelude::DataFrame {
df!(
"a" => &[1, 2, 3, 4, 5],
"b" => &[10, 20, 30, 40, 50]
)
.unwrap()
}
fn test_df_with_nulls() -> polars::prelude::DataFrame {
df!(
"a" => &[Some(1), Some(2), None, Some(4), None],
"b" => &[Some(10), None, Some(30), None, None]
)
.unwrap()
}
#[test]
fn test_column_new() {
let column = Column::new("age".to_string());
assert_eq!(column.name(), "age");
}
#[test]
fn test_column_from_expr() {
let expr = col("test");
let column = Column::from_expr(expr, Some("test".to_string()));
assert_eq!(column.name(), "test");
}
#[test]
fn test_column_from_expr_default_name() {
let expr = col("test").gt(lit(5));
let column = Column::from_expr(expr, None);
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_column_alias() {
let column = Column::new("original".to_string());
let aliased = column.alias("new_name");
assert_eq!(aliased.name(), "new_name");
}
#[test]
fn test_column_gt() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.gt(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 2); }
#[test]
fn test_column_lt() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.lt(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 2); }
#[test]
fn test_column_eq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.eq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 1); }
#[test]
fn test_column_neq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.neq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 4); }
#[test]
fn test_column_gt_eq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.gt_eq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 3); }
#[test]
fn test_column_lt_eq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.lt_eq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 3); }
#[test]
fn test_column_is_null() {
let df = test_df_with_nulls();
let column = Column::new("a".to_string());
let result = column.is_null();
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 2); }
#[test]
fn test_column_is_not_null() {
let df = test_df_with_nulls();
let column = Column::new("a".to_string());
let result = column.is_not_null();
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 3); }
#[test]
fn test_eq_null_safe_both_null() {
let df = df!(
"a" => &[Some(1), None, Some(3)],
"b" => &[Some(1), None, Some(4)]
)
.unwrap();
let col_a = Column::new("a".to_string());
let col_b = Column::new("b".to_string());
let result = col_a.eq_null_safe(&col_b);
let result_df = df
.lazy()
.with_column(result.into_expr().alias("eq_null_safe"))
.collect()
.unwrap();
let eq_col = result_df.column("eq_null_safe").unwrap();
let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
assert_eq!(values[0], Some(true));
assert_eq!(values[1], Some(true)); assert_eq!(values[2], Some(false));
}
#[test]
fn test_eq_null_safe_one_null() {
let df = df!(
"a" => &[Some(1), None, Some(3)],
"b" => &[Some(1), Some(2), None]
)
.unwrap();
let col_a = Column::new("a".to_string());
let col_b = Column::new("b".to_string());
let result = col_a.eq_null_safe(&col_b);
let result_df = df
.lazy()
.with_column(result.into_expr().alias("eq_null_safe"))
.collect()
.unwrap();
let eq_col = result_df.column("eq_null_safe").unwrap();
let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
assert_eq!(values[0], Some(true));
assert_eq!(values[1], Some(false));
assert_eq!(values[2], Some(false));
}
}