use polars::prelude::{
DataType, Expr, Field, PolarsError, PolarsResult, RankMethod, RankOptions,
RollingOptionsFixedWindow, SortOptions, TimeUnit, WindowMapping, col, lit,
};
use polars_plan::dsl::AggExpr;
use std::ops::Neg;
#[inline]
pub(crate) fn expect_col(
r: PolarsResult<Option<polars::prelude::Column>>,
) -> PolarsResult<polars::prelude::Column> {
r.and_then(|o| o.ok_or_else(|| PolarsError::ComputeError("expected column".into())))
}
fn like_pattern_to_regex(pattern: &str, escape_char: Option<char>) -> String {
let mut out = String::with_capacity(pattern.len() * 2);
let mut it = pattern.chars();
while let Some(c) = it.next() {
if escape_char == Some(c) {
if let Some(next) = it.next() {
if "\\.*+?[](){}^$|".contains(next) {
out.push('\\');
}
out.push(next);
} else {
out.push('\\');
out.push(c);
}
} else {
match c {
'%' => out.push_str(".*"),
'_' => out.push('.'),
'\\' | '.' | '+' | '*' | '?' | '[' | ']' | '(' | ')' | '{' | '}' | '^' | '$'
| '|' => {
out.push('\\');
out.push(c);
}
_ => out.push(c),
}
}
}
format!("^{out}$")
}
fn pyspark_trunc_format_to_polars_duration(format: &str) -> String {
match format.to_lowercase().as_str() {
"year" | "years" => "1y".to_string(),
"month" | "months" => "1mo".to_string(),
"week" | "weeks" | "wk" => "1w".to_string(),
"day" | "days" => "1d".to_string(),
"hour" | "hours" => "1h".to_string(),
"minute" | "minutes" | "min" => "1m".to_string(),
"second" | "seconds" | "sec" => "1s".to_string(),
"quarter" | "quarters" | "q" => "1q".to_string(),
_ => format.to_string(), }
}
#[derive(Debug, Clone, Copy)]
pub enum DeferredRandom {
Rand(Option<u64>),
Randn(Option<u64>),
}
#[derive(Debug, Clone)]
pub struct FirstLastValue {
pub value_expr: Expr,
pub is_last: bool,
}
#[derive(Debug, Clone)]
pub struct RangeWindowSpec {
pub partition_by: Vec<String>,
pub order_by: String,
pub value_col: String,
pub start: i64,
pub end: i64,
pub agg: RangeWindowAgg,
}
#[derive(Debug, Clone, Copy)]
pub enum RangeWindowAgg {
Sum,
Mean,
Count,
}
#[derive(Debug, Clone)]
pub struct Column {
name: String,
expr: Expr, pub(crate) is_array_expr: bool,
pub deferred: Option<DeferredRandom>,
pub udf_call: Option<(String, Vec<Column>)>,
pub source_for_running: Option<String>,
pub source_for_running_mean: Option<String>,
pub first_last_value: Option<FirstLastValue>,
pub source_for_running_count: Option<String>,
pub range_window_spec: Option<RangeWindowSpec>,
}
fn expr_is_or_contains_n_unique(expr: &Expr) -> bool {
match expr {
Expr::Agg(AggExpr::NUnique(_)) => true,
Expr::Cast { expr: inner, .. } => expr_is_or_contains_n_unique(inner.as_ref()),
Expr::Alias(inner, _) => expr_is_or_contains_n_unique(inner.as_ref()),
_ => false,
}
}
impl Column {
pub fn new(name: String) -> Self {
Column {
name: name.clone(),
expr: col(&name),
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn from_expr(expr: Expr, name: Option<String>) -> Self {
let display_name = name.unwrap_or_else(|| "<expr>".to_string());
Column {
name: display_name,
expr,
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn from_last_agg(col: &Column) -> Self {
let value_expr = col.expr().clone();
let expr = value_expr.clone().last();
Column {
name: "last".to_string(),
expr,
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: Some(FirstLastValue {
value_expr,
is_last: true,
}),
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn from_udf_call(name: String, args: Vec<Column>) -> Self {
Column {
name: format!("{name}()"),
expr: lit(0i32), is_array_expr: false,
deferred: None,
udf_call: Some((name, args)),
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn from_rand(seed: Option<u64>) -> Self {
let expr = lit(1i64).cum_sum(false).map(
move |c| expect_col(crate::udfs::apply_rand_with_seed(c, seed)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Column {
name: "rand".to_string(),
expr,
is_array_expr: false,
deferred: Some(DeferredRandom::Rand(seed)),
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn from_randn(seed: Option<u64>) -> Self {
let expr = lit(1i64).cum_sum(false).map(
move |c| expect_col(crate::udfs::apply_randn_with_seed(c, seed)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Column {
name: "randn".to_string(),
expr,
is_array_expr: false,
deferred: Some(DeferredRandom::Randn(seed)),
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn expr(&self) -> &Expr {
&self.expr
}
pub fn into_expr(self) -> Expr {
self.expr.alias(&self.name)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn udf_call_info(&self) -> Option<(String, Vec<String>)> {
self.udf_call.as_ref().map(|(name, args)| {
(
name.clone(),
args.iter().map(|c| c.name().to_string()).collect(),
)
})
}
pub fn udf_call_with_args(&self) -> Option<(&str, &[Column])> {
self.udf_call
.as_ref()
.map(|(name, args)| (name.as_str(), args.as_slice()))
}
pub fn literal_as_json_string(&self) -> Option<String> {
match &self.expr {
Expr::Literal(lv) => crate::dataframe::literal_value_to_serde_value(lv)
.and_then(|v| serde_json::to_string(&v).ok()),
_ => None,
}
}
pub fn udf_call_info_with_literals(
&self,
) -> Option<(String, Vec<String>, Vec<Option<String>>)> {
self.udf_call.as_ref().map(|(name, args)| {
let arg_names: Vec<String> = args.iter().map(|c| c.name().to_string()).collect();
let literals: Vec<Option<String>> =
args.iter().map(|c| c.literal_as_json_string()).collect();
(name.clone(), arg_names, literals)
})
}
pub fn alias(&self, name: &str) -> Column {
Column {
name: name.to_string(),
expr: self.expr.clone().alias(name),
is_array_expr: self.is_array_expr,
deferred: self.deferred,
udf_call: self.udf_call.clone(),
source_for_running: self.source_for_running.clone(),
source_for_running_mean: self.source_for_running_mean.clone(),
first_last_value: self.first_last_value.clone(),
source_for_running_count: self.source_for_running_count.clone(),
range_window_spec: self.range_window_spec.clone(),
}
}
pub fn asc(&self) -> crate::functions::SortOrder {
crate::functions::asc(self)
}
pub fn asc_nulls_first(&self) -> crate::functions::SortOrder {
crate::functions::asc_nulls_first(self)
}
pub fn asc_nulls_last(&self) -> crate::functions::SortOrder {
crate::functions::asc_nulls_last(self)
}
pub fn desc(&self) -> crate::functions::SortOrder {
crate::functions::desc(self)
}
pub fn desc_nulls_first(&self) -> crate::functions::SortOrder {
crate::functions::desc_nulls_first(self)
}
pub fn desc_nulls_last(&self) -> crate::functions::SortOrder {
crate::functions::desc_nulls_last(self)
}
pub fn is_null(&self) -> Column {
Column {
name: format!("({} IS NULL)", self.name),
expr: self.expr.clone().is_null(),
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn is_not_null(&self) -> Column {
Column {
name: format!("({} IS NOT NULL)", self.name),
expr: self.expr.clone().is_not_null(),
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: None,
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn isnull(&self) -> Column {
self.is_null()
}
pub fn isnotnull(&self) -> Column {
self.is_not_null()
}
fn null_boolean_expr() -> Expr {
use polars::prelude::*;
lit(NULL).cast(DataType::Boolean)
}
pub fn like(&self, pattern: &str, escape_char: Option<char>) -> Column {
let regex = like_pattern_to_regex(pattern, escape_char);
self.regexp_like(®ex)
}
pub fn ilike(&self, pattern: &str, escape_char: Option<char>) -> Column {
use polars::prelude::*;
let regex = format!("(?i){}", like_pattern_to_regex(pattern, escape_char));
Self::from_expr(self.expr().clone().str().contains(lit(regex), false), None)
}
pub fn eq_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let eq_result = self.expr().clone().eq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(eq_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn ne_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let ne_result = self.expr().clone().neq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(ne_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn eq_null_safe(&self, other: &Column) -> Column {
use polars::prelude::{lit, when};
let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(
self.expr().clone(),
other.expr().clone(),
)
.unwrap_or_else(|_| (self.expr().clone(), other.expr().clone()));
let left_null = left_c.clone().is_null();
let right_null = right_c.clone().is_null();
let both_null = left_null.clone().and(right_null.clone());
let both_non_null = left_null.not().and(right_null.not());
let eq_result = left_c.eq(right_c);
let expr = when(both_null)
.then(lit(true))
.when(both_non_null)
.then(eq_result)
.otherwise(lit(false));
Column::from_expr(expr, None)
}
pub fn null_boolean() -> Column {
Column::from_expr(Self::null_boolean_expr(), None)
}
pub fn lit_null(dtype: &str) -> Result<Column, String> {
use polars::prelude::{NULL, lit};
let dt = crate::functions::parse_type_name(dtype)?;
Ok(Column::from_expr(lit(NULL).cast(dt), None))
}
pub fn from_bool(b: bool) -> Column {
crate::functions::lit_bool(b)
}
pub fn from_i64(n: i64) -> Column {
crate::functions::lit_i64(n)
}
pub fn from_string(s: &str) -> Column {
crate::functions::lit_str(s)
}
pub fn gt_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let gt_result = self.expr().clone().gt(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(gt_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn ge_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let ge_result = self.expr().clone().gt_eq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(ge_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn lt_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let lt_result = self.expr().clone().lt(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(lt_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn le_pyspark(&self, other: &Column) -> Column {
let left_null = self.expr().clone().is_null();
let right_null = other.expr().clone().is_null();
let either_null = left_null.clone().or(right_null.clone());
let le_result = self.expr().clone().lt_eq(other.expr().clone());
let null_boolean = Self::null_boolean_expr();
let null_aware_expr = crate::functions::when(&Self::from_expr(either_null, None))
.then(&Self::from_expr(null_boolean, None))
.otherwise(&Self::from_expr(le_result, None));
Self::from_expr(null_aware_expr.into_expr(), None)
}
pub fn gt(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().gt(other), None)
}
pub fn gt_eq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().gt_eq(other), None)
}
pub fn lt(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().lt(other), None)
}
pub fn lt_eq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().lt_eq(other), None)
}
pub fn between(&self, lower: &Column, upper: &Column) -> Column {
use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
use polars::prelude::*;
let left = self.expr().clone();
let lower_expr = lower.expr().clone();
let upper_expr = upper.expr().clone();
let infer_lit_type = |e: &Expr| -> Option<DataType> {
if let Expr::Literal(lv) = e {
let dt = lv.get_datatype();
if matches!(dt, DataType::Unknown(_)) {
None
} else {
Some(dt)
}
} else {
None
}
};
let lower_ty = infer_lit_type(&lower_expr).unwrap_or(DataType::String);
let upper_ty = infer_lit_type(&upper_expr).unwrap_or(DataType::String);
let lt = DataType::String;
let (left_c, lower_c) = match coerce_for_pyspark_comparison(
left.clone(),
lower_expr.clone(),
<,
&lower_ty,
&CompareOp::GtEq,
) {
Ok((a, b)) => (a, b),
Err(_) => (left.clone(), lower_expr),
};
let upper_clone = upper.expr().clone();
let (left_cc, upper_c) = match coerce_for_pyspark_comparison(
left_c.clone(),
upper_expr,
<,
&upper_ty,
&CompareOp::LtEq,
) {
Ok((a, b)) => (a, b),
Err(_) => (left_c.clone(), upper_clone),
};
let ge = left_cc.clone().gt_eq(lower_c);
let le = left_cc.lt_eq(upper_c);
Self::from_expr(ge.and(le), None)
}
pub fn eq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().eq(other), None)
}
pub fn neq(&self, other: Expr) -> Column {
Self::from_expr(self.expr().clone().neq(other), None)
}
pub fn and_(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone().and(other.expr().clone()), None)
}
pub fn or_(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone().or(other.expr().clone()), None)
}
pub fn upper(&self) -> Column {
Self::from_expr(self.expr().clone().str().to_uppercase(), None)
}
pub fn lower(&self) -> Column {
Self::from_expr(self.expr().clone().str().to_lowercase(), None)
}
pub fn lcase(&self) -> Column {
self.lower()
}
pub fn ucase(&self) -> Column {
self.upper()
}
pub fn substr(&self, start: i64, length: Option<i64>) -> Column {
use polars::prelude::*;
if length.map(|l| l < 1).unwrap_or(false) {
let expr = when(self.expr().clone().is_null())
.then(lit(NULL))
.otherwise(lit(""));
return Self::from_expr(expr, None);
}
let len_chars = self.expr().clone().str().len_chars();
let offset_expr = if start == 0 {
lit(0i64)
} else if start >= 1 {
lit((start - 1).max(0))
} else {
let from_end = len_chars + lit(start);
when(from_end.clone().lt(lit(0i64)))
.then(lit(0i64))
.otherwise(from_end)
};
let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
Self::from_expr(
self.expr().clone().str().slice(offset_expr, length_expr),
None,
)
}
pub fn length(&self) -> Column {
Self::from_expr(self.expr().clone().str().len_chars(), None)
}
pub fn bit_length(&self) -> Column {
use polars::prelude::*;
let len_bytes = self.expr().clone().str().len_bytes().cast(DataType::Int32);
Self::from_expr((len_bytes * lit(8i32)).cast(DataType::Int32), None)
}
pub fn octet_length(&self) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().len_bytes().cast(DataType::Int32),
None,
)
}
pub fn char_length(&self) -> Column {
self.length()
}
pub fn character_length(&self) -> Column {
self.length()
}
pub fn encode(&self, charset: &str) -> Column {
let charset = charset.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_encode(s, &charset)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn decode(&self, charset: &str) -> Column {
let charset = charset.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_decode(s, &charset)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn to_binary(&self, fmt: &str) -> Column {
let fmt = fmt.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_to_binary(s, &fmt)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn try_to_binary(&self, fmt: &str) -> Column {
let fmt = fmt.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_try_to_binary(s, &fmt)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn aes_encrypt(&self, key: &str) -> Column {
let key = key.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_aes_encrypt(s, &key)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn aes_decrypt(&self, key: &str) -> Column {
let key = key.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_aes_decrypt(s, &key)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn try_aes_decrypt(&self, key: &str) -> Column {
let key = key.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_try_aes_decrypt(s, &key)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn typeof_(&self) -> Column {
Self::from_expr(
self.expr().clone().map(
|s| expect_col(crate::udfs::apply_typeof(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
),
None,
)
}
pub fn trim(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().str().strip_chars(lit(" ")), None)
}
pub fn ltrim(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().str().strip_chars_start(lit(" ")), None)
}
pub fn rtrim(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().str().strip_chars_end(lit(" ")), None)
}
pub fn btrim(&self, trim_str: Option<&str>) -> Column {
use polars::prelude::*;
let chars = trim_str.unwrap_or(" ");
Self::from_expr(self.expr().clone().str().strip_chars(lit(chars)), None)
}
pub fn locate(&self, substr: &str, pos: i64) -> Column {
use polars::prelude::*;
if substr.is_empty() {
return Self::from_expr(lit(1i64), None);
}
let start = (pos - 1).max(0);
let slice_expr = self.expr().clone().str().slice(lit(start), lit(i64::MAX));
let found = slice_expr.str().find_literal(lit(substr.to_string()));
let expr = (found.cast(DataType::Int64) + lit(start + 1))
.fill_null(lit(0i64))
.cast(DataType::Int64);
Self::from_expr(expr, None)
}
pub fn conv(&self, from_base: i32, to_base: i32) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_conv(s, from_base, to_base)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn hex(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_hex(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn unhex(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_unhex(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn bin(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_bin(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn getbit(&self, pos: i64) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_getbit(s, pos)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn bit_and(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().cast(DataType::Int64).map_many(
|cols| expect_col(crate::udfs::apply_bit_and(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn bit_or(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().cast(DataType::Int64).map_many(
|cols| expect_col(crate::udfs::apply_bit_or(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn bit_xor(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().cast(DataType::Int64).map_many(
|cols| expect_col(crate::udfs::apply_bit_xor(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn bit_count(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_bit_count(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn assert_true(&self, err_msg: Option<&str>) -> Column {
let msg = err_msg.map(String::from);
let expr = self.expr().clone().map(
move |c| expect_col(crate::udfs::apply_assert_true(c, msg.as_deref())),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn bitwise_not(&self) -> Column {
use polars::prelude::Field;
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_coerce_to_int64_for_bitwise(col)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
let expr = (lit(-1i64) - expr).cast(DataType::Int64);
Self::from_expr(expr, None)
}
pub fn logical_not(&self) -> Column {
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_logical_not_boolean_only(col)),
|_schema, field| {
if field.dtype() == &DataType::Boolean {
Ok(field.clone())
} else {
Err(PolarsError::ComputeError(
"logical NOT (~) requires boolean type".into(),
))
}
},
);
Self::from_expr(expr, None)
}
pub fn str_to_map(&self, pair_delim: &str, key_value_delim: &str) -> Column {
let pair_delim = pair_delim.to_string();
let key_value_delim = key_value_delim.to_string();
let expr = self.expr().clone().map(
move |s| {
expect_col(crate::udfs::apply_str_to_map(
s,
&pair_delim,
&key_value_delim,
))
},
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
fn pattern_has_lookaround(pattern: &str) -> bool {
let p = pattern.as_bytes();
let n = p.len();
let mut i = 0;
while i + 2 < n {
if p[i] == b'(' && p[i + 1] == b'?' {
match p[i + 2] {
b'=' | b'!' => return true, b'<' if i + 4 <= n && (p[i + 3] == b'=' || p[i + 3] == b'!') => return true, _ => {}
}
}
i += 1;
}
false
}
pub fn regexp_extract(&self, pattern: &str, group_index: usize) -> Column {
use polars::prelude::*;
if Self::pattern_has_lookaround(pattern) {
let pat = pattern.to_string();
let group = group_index;
Self::from_expr(
self.expr().clone().map(
move |s| {
expect_col(crate::udfs::apply_regexp_extract_lookaround(s, &pat, group))
},
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
),
None,
)
} else {
let pat = pattern.to_string();
Self::from_expr(
self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_regexp_extract(s, &pat, group_index)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
),
None,
)
}
}
pub fn regexp_replace(&self, pattern: &str, replacement: &str) -> Column {
use polars::prelude::*;
let pat = pattern.to_string();
let rep = replacement.to_string();
Self::from_expr(
self.expr()
.clone()
.cast(DataType::String)
.str()
.replace_all(lit(pat), lit(rep), false),
None,
)
}
pub fn left(&self, n: i64) -> Column {
use polars::prelude::*;
let len = n.max(0) as u32;
Self::from_expr(
self.expr().clone().str().slice(lit(0i64), lit(len as i64)),
None,
)
}
pub fn right(&self, n: i64) -> Column {
use polars::prelude::*;
let n_val = n.max(0);
let n_expr = lit(n_val);
let len_chars = self.expr().clone().str().len_chars().cast(DataType::Int64);
let start = when((len_chars.clone() - n_expr.clone()).lt_eq(lit(0i64)))
.then(lit(0i64))
.otherwise(len_chars - n_expr.clone());
Self::from_expr(self.expr().clone().str().slice(start, n_expr), None)
}
pub fn replace(&self, search: &str, replacement: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().replace_all(
lit(search.to_string()),
lit(replacement.to_string()),
true,
),
None,
)
}
pub fn replace_many(&self, pairs: &[(String, String)]) -> Column {
let mut out = self.clone();
for (search, replacement) in pairs {
out = out.replace(search, replacement);
}
out
}
pub fn startswith(&self, prefix: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.starts_with(lit(prefix.to_string())),
None,
)
}
pub fn endswith(&self, suffix: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr().clone().str().ends_with(lit(suffix.to_string())),
None,
)
}
pub fn contains(&self, substring: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.contains(lit(substring.to_string()), true),
None,
)
}
pub fn split(&self, delimiter: &str, limit: Option<i32>) -> Column {
use polars::prelude::*;
let use_limit = limit.is_some_and(|l| l > 0);
if use_limit {
let delim = delimiter.to_string();
let lim = limit.unwrap_or(0);
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_split_with_limit(col, &delim, lim)),
|_schema, field| {
Ok(Field::new(
field.name().clone(),
DataType::List(Box::new(DataType::String)),
))
},
);
Self::from_expr(expr, None)
} else {
Self::from_expr(
self.expr().clone().str().split(lit(delimiter.to_string())),
None,
)
}
}
pub fn initcap(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_initcap(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn regexp_extract_all(&self, pattern: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.extract_all(lit(pattern.to_string())),
None,
)
}
pub fn regexp_extract_all_group(&self, pattern: &str, group_index: usize) -> Column {
if group_index == 0 {
return self.regexp_extract_all(pattern);
}
use polars::prelude::*;
let pat = pattern.to_string();
let idx = group_index;
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_regexp_extract_all_group(s, &pat, idx)),
|_schema, field| {
Ok(Field::new(
field.name().clone(),
DataType::List(Box::new(DataType::String)),
))
},
);
Self::from_expr(expr, None)
}
pub fn regexp_like(&self, pattern: &str) -> Column {
use polars::prelude::*;
if pattern.contains("(?=")
|| pattern.contains("(?!")
|| pattern.contains("(?<=")
|| pattern.contains("(?<!")
{
let pat = pattern.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_regexp_like_lookaround(s, &pat)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
);
return Self::from_expr(expr, None);
}
Self::from_expr(
self.expr()
.clone()
.str()
.contains(lit(pattern.to_string()), false),
None,
)
}
pub fn regexp_count(&self, pattern: &str) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.str()
.count_matches(lit(pattern.to_string()), false)
.cast(DataType::Int64),
None,
)
}
pub fn regexp_substr(&self, pattern: &str) -> Column {
self.regexp_extract(pattern, 0)
}
pub fn regexp_instr(&self, pattern: &str, group_idx: Option<usize>) -> Column {
let idx = group_idx.unwrap_or(0);
let pattern = pattern.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_regexp_instr(s, pattern.clone(), idx)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn find_in_set(&self, set_column: &Column) -> Column {
let args = [set_column.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_find_in_set(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn repeat(&self, n: i32) -> Column {
use polars::prelude::*;
Self::from_expr(
self.expr()
.clone()
.repeat_by(lit(n as u32))
.list()
.join(lit(""), false),
None,
)
}
pub fn reverse(&self) -> Column {
Self::from_expr(self.expr().clone().str().reverse(), None)
}
pub fn instr(&self, substr: &str) -> Column {
use polars::prelude::*;
let found = self
.expr()
.clone()
.str()
.find_literal(lit(substr.to_string()));
Self::from_expr(
(found.cast(DataType::Int64) + lit(1i64)).fill_null(lit(0i64)),
None,
)
}
pub fn lpad(&self, length: i32, pad: &str) -> Column {
let pad_str = if pad.is_empty() { " " } else { pad };
let fill = pad_str.chars().next().unwrap_or(' ');
Self::from_expr(
self.expr()
.clone()
.str()
.pad_start(lit(length as i64), fill),
None,
)
}
pub fn rpad(&self, length: i32, pad: &str) -> Column {
let pad_str = if pad.is_empty() { " " } else { pad };
let fill = pad_str.chars().next().unwrap_or(' ');
Self::from_expr(
self.expr().clone().str().pad_end(lit(length as i64), fill),
None,
)
}
pub fn translate(&self, from_str: &str, to_str: &str) -> Column {
use polars::prelude::*;
let mut e = self.expr().clone();
let from_chars: Vec<char> = from_str.chars().collect();
let to_chars: Vec<char> = to_str.chars().collect();
for (i, fc) in from_chars.iter().enumerate() {
let f = fc.to_string();
let t = to_chars
.get(i)
.map(|c| c.to_string())
.unwrap_or_else(String::new); e = e.str().replace_all(lit(f), lit(t), true);
}
Self::from_expr(e, None)
}
pub fn mask(
&self,
upper_char: Option<char>,
lower_char: Option<char>,
digit_char: Option<char>,
other_char: Option<char>,
) -> Column {
use polars::prelude::*;
let upper = upper_char.unwrap_or('X').to_string();
let lower = lower_char.unwrap_or('x').to_string();
let digit = digit_char.unwrap_or('n').to_string();
let other = other_char.map(|c| c.to_string());
let mut e = self
.expr()
.clone()
.str()
.replace_all(lit("[A-Z]".to_string()), lit(upper), false)
.str()
.replace_all(lit("[a-z]".to_string()), lit(lower), false)
.str()
.replace_all(lit(r"\d".to_string()), lit(digit), false);
if let Some(o) = other {
e = e
.str()
.replace_all(lit("[^A-Za-z0-9]".to_string()), lit(o), false);
}
Self::from_expr(e, None)
}
pub fn split_part(&self, delimiter: &str, part_num: i64) -> Column {
use polars::prelude::*;
if part_num == 0 {
return Self::from_expr(lit(NULL), None);
}
let use_regex = delimiter == "|";
if use_regex {
let pattern = delimiter.to_string();
let part = part_num;
let get_expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_split_part_regex(col, &pattern, part)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
let expr = when(self.expr().clone().is_null())
.then(lit(NULL))
.otherwise(get_expr.fill_null(lit("")));
return Self::from_expr(expr, None);
}
let delim = delimiter.to_string();
let split_expr = self.expr().clone().str().split(lit(delim));
let index = if part_num > 0 {
lit(part_num - 1)
} else {
lit(part_num)
};
let get_expr = split_expr.list().get(index, true).fill_null(lit(""));
let expr = when(self.expr().clone().is_null())
.then(lit(NULL))
.otherwise(get_expr);
Self::from_expr(expr, None)
}
pub fn substring_index(&self, delimiter: &str, count: i64) -> Column {
use polars::prelude::*;
if delimiter.is_empty() {
let expr = when(self.expr().clone().is_null())
.then(lit(NULL))
.otherwise(lit("").cast(DataType::String));
return Self::from_expr(expr, None);
}
let delim = delimiter.to_string();
let split_expr = self.expr().clone().str().split(lit(delim.clone()));
let n = count.unsigned_abs() as i64;
let expr = if count > 0 {
split_expr
.clone()
.list()
.slice(lit(0i64), lit(n))
.list()
.join(lit(delim), false)
} else {
let len = split_expr.clone().list().len();
let start = when(len.clone().gt(lit(n)))
.then(len.clone() - lit(n))
.otherwise(lit(0i64));
let slice_len = when(len.clone().gt(lit(n))).then(lit(n)).otherwise(len);
split_expr
.list()
.slice(start, slice_len)
.list()
.join(lit(delim), false)
};
Self::from_expr(expr, None)
}
pub fn soundex(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_soundex(s)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn levenshtein(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_levenshtein(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn crc32(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_crc32(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn xxhash64(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_xxhash64(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn ascii(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_ascii(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
);
Self::from_expr(expr, None)
}
pub fn format_number(&self, decimals: u32) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_format_number(s, decimals)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn char(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_char(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn chr(&self) -> Column {
self.char()
}
pub fn base64(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_base64(s)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn unbase64(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_unbase64(s)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn sha1(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_sha1(s)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn sha2(&self, bit_length: i32) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_sha2(s, bit_length)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn md5(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_md5(s)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn overlay(&self, replace: &str, pos: i64, length: i64) -> Column {
use polars::prelude::*;
let pos = pos.max(1);
let replace_len = length.max(0);
let start_left = 0i64;
let len_left = (pos - 1).max(0);
let start_right = (pos - 1 + replace_len).max(0);
let len_right = 1_000_000i64; let left = self
.expr()
.clone()
.str()
.slice(lit(start_left), lit(len_left));
let mid = lit(replace.to_string());
let right = self
.expr()
.clone()
.str()
.slice(lit(start_right), lit(len_right));
let exprs = [left, mid, right];
let concat_expr = polars::prelude::concat_str(&exprs, "", false);
Self::from_expr(concat_expr, None)
}
pub fn abs(&self) -> Column {
Self::from_expr(self.expr().clone().abs(), None)
}
pub fn ceil(&self) -> Column {
use polars::prelude::*;
let expr = self.expr().clone().ceil().cast(DataType::Int64);
Self::from_expr(expr, None)
}
pub fn ceiling(&self) -> Column {
self.ceil()
}
pub fn floor(&self) -> Column {
use polars::prelude::*;
let expr = self.expr().clone().floor().cast(DataType::Int64);
Self::from_expr(expr, None)
}
pub fn round(&self, scale: i32) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_round(s, scale)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn bround(&self, scale: i32) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_bround(s, scale)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn negate(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone() * lit(-1), None)
}
pub fn multiply_pyspark(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_pyspark_multiply(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn add_pyspark(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_pyspark_add(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn subtract_pyspark(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_pyspark_subtract(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn divide_pyspark(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_pyspark_divide(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn mod_pyspark(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_pyspark_mod(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn multiply(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() * other.expr().clone(), None)
}
pub fn add(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() + other.expr().clone(), None)
}
pub fn subtract(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() - other.expr().clone(), None)
}
pub fn divide(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() / other.expr().clone(), None)
}
pub fn mod_(&self, other: &Column) -> Column {
Self::from_expr(self.expr().clone() % other.expr().clone(), None)
}
pub fn sqrt(&self) -> Column {
Self::from_expr(self.expr().clone().sqrt(), None)
}
pub fn pow(&self, exp: i64) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().pow(lit(exp)), None)
}
pub fn pow_with(&self, exponent: &Column) -> Column {
let args = [exponent.expr().clone()];
let expr = self.expr().clone().map_many(
move |cols| expect_col(crate::udfs::apply_pow_pyspark(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn power(&self, exp: i64) -> Column {
self.pow(exp)
}
pub fn exp(&self) -> Column {
Self::from_expr(self.expr().clone().exp(), None)
}
pub fn log(&self) -> Column {
Self::from_expr(self.expr().clone().log(lit(std::f64::consts::E)), None)
}
pub fn ln(&self) -> Column {
self.log()
}
pub fn sin(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_sin(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn cos(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_cos(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn tan(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_tan(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn cot(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_cot(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn csc(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_csc(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn sec(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_sec(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn asin(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_asin(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn acos(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_acos(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn atan(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_atan(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn atan2(&self, x: &Column) -> Column {
let args = [x.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_atan2(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn degrees(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_degrees(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn to_degrees(&self) -> Column {
self.degrees()
}
pub fn radians(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_radians(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn to_radians(&self) -> Column {
self.radians()
}
pub fn signum(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_signum(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn cosh(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_cosh(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn sinh(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_sinh(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn tanh(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_tanh(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn acosh(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_acosh(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn asinh(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_asinh(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn atanh(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_atanh(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn cbrt(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_cbrt(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn expm1(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_expm1(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn log1p(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_log1p(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn log10(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_log10(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn log2(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_log2(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn rint(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_rint(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn hypot(&self, other: &Column) -> Column {
let xx = self.expr().clone() * self.expr().clone();
let yy = other.expr().clone() * other.expr().clone();
Self::from_expr((xx + yy).sqrt(), None)
}
pub fn cast_to(&self, type_name: &str) -> Result<Column, String> {
crate::functions::cast(self, type_name)
}
pub fn try_cast_to(&self, type_name: &str) -> Result<Column, String> {
crate::functions::try_cast(self, type_name)
}
pub fn is_nan(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_isnan_pyspark_parity(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
);
Self::from_expr(expr, None)
}
pub fn year(&self) -> Column {
let name = format!("year({})", self.name());
use polars::prelude::*;
let parsed = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
Self::from_expr(parsed.dt().year().alias(&name), Some(name))
}
pub fn month(&self) -> Column {
let name = format!("month({})", self.name());
use polars::prelude::*;
let parsed = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
let month_expr = parsed.dt().month().cast(DataType::Int32);
Self::from_expr(month_expr.alias(&name), Some(name))
}
pub fn day(&self) -> Column {
Self::from_expr(self.expr().clone().dt().day(), None)
}
pub fn dayofmonth(&self) -> Column {
let name = format!("dayofmonth({})", self.name());
use polars::prelude::*;
let parsed = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
let day_expr = parsed.dt().day().cast(DataType::Int32);
Self::from_expr(day_expr.alias(&name), Some(name))
}
pub fn quarter(&self) -> Column {
Self::from_expr(self.expr().clone().dt().quarter(), None)
}
pub fn weekofyear(&self) -> Column {
Self::from_expr(self.expr().clone().dt().week(), None)
}
pub fn week(&self) -> Column {
self.weekofyear()
}
pub fn dayofweek(&self) -> Column {
use polars::prelude::*;
let parsed = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
let w = parsed.dt().weekday().cast(DataType::Int32);
let dayofweek = ((w % lit(7i32)) + lit(1i32)).cast(DataType::Int32);
let name = format!("dayofweek({})", self.name());
Self::from_expr(dayofweek.alias(&name), Some(name))
}
pub fn dayofyear(&self) -> Column {
Self::from_expr(
self.expr().clone().dt().ordinal_day().cast(DataType::Int32),
None,
)
}
pub fn to_date(&self) -> Column {
use polars::prelude::DataType;
Self::from_expr(self.expr().clone().cast(DataType::Date), None)
}
pub fn date_format(&self, format: &str) -> Column {
Self::from_expr(self.expr().clone().dt().strftime(format), None)
}
pub fn hour(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_hour(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
);
Self::from_expr(expr, None)
}
pub fn minute(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_minute(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
);
Self::from_expr(expr, None)
}
pub fn second(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_second(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
);
Self::from_expr(expr, None)
}
pub fn extract(&self, field: &str) -> Column {
use polars::prelude::*;
let e = self.expr().clone();
let expr = match field.trim().to_lowercase().as_str() {
"year" => e.dt().year(),
"month" => e.dt().month(),
"day" => e.dt().day(),
"hour" => e.dt().hour(),
"minute" => e.dt().minute(),
"second" => e.dt().second(),
"quarter" => e.dt().quarter(),
"week" | "weekofyear" => e.dt().week(),
"dayofweek" | "dow" => {
let w = e.dt().weekday();
(w % lit(7i32)) + lit(1i32)
}
"dayofyear" | "doy" => e.dt().ordinal_day().cast(DataType::Int32),
_ => e.dt().year(), };
Self::from_expr(expr, None)
}
pub fn unix_micros(&self) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().cast(DataType::Int64), None)
}
pub fn unix_millis(&self) -> Column {
use polars::prelude::*;
let micros = self.expr().clone().cast(DataType::Int64);
Self::from_expr(micros / lit(1000i64), None)
}
pub fn unix_seconds(&self) -> Column {
use polars::prelude::*;
let micros = self.expr().clone().cast(DataType::Int64);
Self::from_expr(micros / lit(1_000_000i64), None)
}
pub fn dayname(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_dayname(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn weekday(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_weekday(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
);
Self::from_expr(expr, None)
}
pub fn date_add(&self, n: i32) -> Column {
use polars::prelude::*;
let date_expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
let name = format!("date_add({}, {n})", self.name());
Self::from_expr((date_expr + dur).alias(&name), Some(name))
}
pub fn date_sub(&self, n: i32) -> Column {
use polars::prelude::*;
let date_expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_string_to_date_format(s, None, false)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
let dur = duration(DurationArgs::new().with_days(lit(n as i64)));
let name = format!("date_sub({}, {n})", self.name());
Self::from_expr((date_expr - dur).alias(&name), Some(name))
}
pub fn datediff(&self, other: &Column) -> Column {
use polars::prelude::*;
let start = self.expr().clone().cast(DataType::Date);
let end = other.expr().clone().cast(DataType::Date);
let expr = (end - start).dt().total_days(false).cast(DataType::Int32);
Self::from_expr(expr, None)
}
pub fn last_day(&self) -> Column {
Self::from_expr(self.expr().clone().dt().month_end(), None)
}
pub fn timestampadd(&self, unit: &str, amount: &Column) -> Column {
use polars::prelude::*;
let ts = self.expr().clone();
let amt = amount.expr().clone().cast(DataType::Int64);
let dur = match unit.trim().to_uppercase().as_str() {
"DAY" | "DAYS" => duration(DurationArgs::new().with_days(amt)),
"HOUR" | "HOURS" => duration(DurationArgs::new().with_hours(amt)),
"MINUTE" | "MINUTES" => duration(DurationArgs::new().with_minutes(amt)),
"SECOND" | "SECONDS" => duration(DurationArgs::new().with_seconds(amt)),
"WEEK" | "WEEKS" => duration(DurationArgs::new().with_weeks(amt)),
_ => duration(DurationArgs::new().with_days(amt)),
};
Self::from_expr(ts + dur, None)
}
pub fn timestampdiff(&self, unit: &str, other: &Column) -> Column {
let start = self.expr().clone();
let end = other.expr().clone();
let diff = end - start;
let expr = match unit.trim().to_uppercase().as_str() {
"HOUR" | "HOURS" => diff.dt().total_hours(false),
"MINUTE" | "MINUTES" => diff.dt().total_minutes(false),
"SECOND" | "SECONDS" => diff.dt().total_seconds(false),
"DAY" | "DAYS" => diff.dt().total_days(false),
_ => diff.dt().total_days(false),
};
Self::from_expr(expr, None)
}
pub fn from_utc_timestamp(&self, tz: &str) -> Column {
let tz = tz.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_from_utc_timestamp(s, &tz)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn to_utc_timestamp(&self, tz: &str) -> Column {
let tz = tz.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_to_utc_timestamp(s, &tz)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn trunc(&self, format: &str) -> Column {
use polars::prelude::*;
let polars_duration = pyspark_trunc_format_to_polars_duration(format);
let duration = polars_duration.clone();
let expr = self.expr().clone().map(
move |c| expect_col(crate::udfs::apply_date_trunc(c, &duration)),
|_schema, field| {
Ok(Field::new(
field.name().clone(),
DataType::Datetime(TimeUnit::Microseconds, None),
))
},
);
Self::from_expr(expr, Some(self.name().to_string()))
}
pub fn add_months(&self, n: i32) -> Column {
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_add_months(col, n)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
Self::from_expr(expr, None)
}
pub fn months_between(&self, start: &Column, round_off: bool) -> Column {
let args = [start.expr().clone()];
let expr = self.expr().clone().map_many(
move |cols| expect_col(crate::udfs::apply_months_between(cols, round_off)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn next_day(&self, day_of_week: &str) -> Column {
let day = day_of_week.to_string();
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_next_day(col, &day)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
Self::from_expr(expr, None)
}
pub fn unix_timestamp(&self, format: Option<&str>) -> Column {
let fmt = format.map(String::from);
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_unix_timestamp(col, fmt.as_deref())),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn from_unixtime(&self, format: Option<&str>) -> Column {
let fmt = format.map(String::from);
let expr = self.expr().clone().map(
move |col| expect_col(crate::udfs::apply_from_unixtime(col, fmt.as_deref())),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn timestamp_seconds(&self) -> Column {
let expr = (self.expr().clone().cast(DataType::Int64) * lit(1_000_000i64))
.cast(DataType::Datetime(TimeUnit::Microseconds, None));
Self::from_expr(expr, None)
}
pub fn timestamp_millis(&self) -> Column {
let expr = (self.expr().clone().cast(DataType::Int64) * lit(1000i64))
.cast(DataType::Datetime(TimeUnit::Microseconds, None));
Self::from_expr(expr, None)
}
pub fn timestamp_micros(&self) -> Column {
let expr = self
.expr()
.clone()
.cast(DataType::Int64)
.cast(DataType::Datetime(TimeUnit::Microseconds, None));
Self::from_expr(expr, None)
}
pub fn unix_date(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_unix_date(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int32)),
);
Self::from_expr(expr, None)
}
pub fn date_from_unix_date(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_date_from_unix_date(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
);
Self::from_expr(expr, None)
}
pub fn pmod(&self, divisor: &Column) -> Column {
let args = [divisor.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_pmod(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Float64)),
);
Self::from_expr(expr, None)
}
pub fn factorial(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_factorial(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn over(&self, partition_by: &[&str]) -> Column {
let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
vec![lit(1i32)]
} else {
partition_by.iter().map(|s| col(*s)).collect()
};
Self::from_expr(self.expr().clone().over(partition_exprs), None)
}
pub fn over_window(
&self,
partition_by: &[&str],
order_by_encoded: &[String],
use_running_aggregate: bool,
is_full_partition_frame: bool,
frame: Option<(&str, i64, i64)>,
) -> Result<Column, PolarsError> {
const UNBOUNDED_PRECEDING: i64 = i64::MIN;
const UNBOUNDED_FOLLOWING: i64 = i64::MAX;
if expr_is_or_contains_n_unique(self.expr()) && self.name.starts_with("count_distinct(") {
return Err(PolarsError::InvalidOperation(
"Distinct window functions are not supported".into(),
));
}
let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
vec![lit(1i32)]
} else {
partition_by.iter().map(|s| col(*s)).collect()
};
if let Some(ref fl) = self.first_last_value {
if fl.is_last && !order_by_encoded.is_empty() && !is_full_partition_frame {
let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
for s in order_by_encoded.iter() {
let s = s.trim();
let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
(stripped.trim(), true)
} else {
(s.trim(), false)
};
order_exprs.push(col(name));
descending_multi.push(descending);
}
let default_opts = SortOptions {
descending: descending_multi.first().copied().unwrap_or(false),
nulls_last: descending_multi.first().copied().unwrap_or(false),
..Default::default()
};
let expr = fl.value_expr.clone().over_with_options(
Some(partition_exprs),
Some((order_exprs, default_opts)),
WindowMapping::default(),
)?;
return Ok(Self::from_expr(expr, None));
}
}
let has_rolling_source = self.source_for_running.is_some()
|| self.source_for_running_mean.is_some()
|| self.source_for_running_count.is_some();
let use_rolling = frame.is_some_and(|(kind, start, end)| {
kind == "rows"
&& start != UNBOUNDED_PRECEDING
&& end != UNBOUNDED_FOLLOWING
&& use_running_aggregate
&& has_rolling_source
});
if use_rolling {
let (_kind, start, end) = frame.expect("use_rolling implies frame");
let window_size = (end - start + 1) as usize;
let center = start < 0 && end > 0;
let rolling_opts = RollingOptionsFixedWindow {
window_size,
min_periods: 1,
center,
..Default::default()
};
let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
for s in order_by_encoded.iter() {
let s = s.trim();
let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
(stripped.trim(), true)
} else {
(s, false)
};
order_exprs.push(col(name));
descending_multi.push(descending);
}
let default_opts = SortOptions {
descending: descending_multi.first().copied().unwrap_or(false),
nulls_last: descending_multi.first().copied().unwrap_or(false),
..Default::default()
};
let over_opts = (
Some(partition_exprs.clone()),
Some((order_exprs, default_opts)),
);
let expr = if let Some(ref src) = self.source_for_running_mean {
col(src)
.cast(DataType::Float64)
.rolling_mean(rolling_opts.clone())
.over_with_options(over_opts.0, over_opts.1, WindowMapping::default())?
} else if let Some(ref src) = self.source_for_running {
col(src)
.cast(DataType::Float64)
.rolling_sum(rolling_opts.clone())
.over_with_options(over_opts.0, over_opts.1, WindowMapping::default())?
} else if let Some(ref src) = self.source_for_running_count {
col(src)
.is_not_null()
.cast(DataType::Int64)
.rolling_sum(rolling_opts)
.over_with_options(over_opts.0, over_opts.1, WindowMapping::default())?
} else {
unreachable!("use_rolling requires has_rolling_source")
};
return Ok(Self::from_expr(expr, None));
}
if let Some((kind, start, end)) = frame {
if kind == "range" && order_by_encoded.len() == 1 && has_rolling_source {
let order_name = order_by_encoded[0]
.trim()
.strip_prefix('-')
.unwrap_or(order_by_encoded[0].trim())
.to_string();
let partition_by: Vec<String> =
partition_by.iter().map(|s| (*s).to_string()).collect();
let (value_col, agg) = if let Some(ref src) = self.source_for_running_mean {
(src.clone(), RangeWindowAgg::Mean)
} else if let Some(ref src) = self.source_for_running {
(src.clone(), RangeWindowAgg::Sum)
} else if let Some(ref src) = self.source_for_running_count {
(src.clone(), RangeWindowAgg::Count)
} else {
unreachable!("has_rolling_source")
};
let spec = RangeWindowSpec {
partition_by,
order_by: order_name,
value_col,
start,
end,
agg,
};
let mut c =
Self::from_expr(lit(0i64).cast(DataType::Float64), Some(self.name.clone()));
c.range_window_spec = Some(spec);
return Ok(c);
}
}
let base_expr = if use_running_aggregate {
if let Some(ref src) = self.source_for_running_mean {
let sum_expr = col(src).cast(DataType::Float64).cum_sum(false);
let count_expr = col(src).cum_count(false).cast(DataType::Float64);
sum_expr / count_expr
} else if let Some(ref src) = self.source_for_running {
col(src).cast(DataType::Float64).cum_sum(false)
} else if let Some(ref src) = self.source_for_running_count {
col(src).cum_count(false).cast(DataType::Int64)
} else {
self.expr().clone()
}
} else {
self.expr().clone()
};
let expr = if order_by_encoded.is_empty() {
base_expr.over(partition_exprs)
} else {
let mut order_exprs: Vec<Expr> = Vec::with_capacity(order_by_encoded.len());
let mut descending_multi: Vec<bool> = Vec::with_capacity(order_by_encoded.len());
for s in order_by_encoded.iter() {
let s = s.trim();
let (name, descending) = if let Some(stripped) = s.strip_prefix('-') {
(stripped.trim(), true)
} else {
(s, false)
};
order_exprs.push(col(name));
descending_multi.push(descending);
}
let default_opts = SortOptions {
descending: descending_multi.first().copied().unwrap_or(false),
nulls_last: descending_multi.first().copied().unwrap_or(false),
..Default::default()
};
base_expr.over_with_options(
Some(partition_exprs),
Some((order_exprs, default_opts)),
WindowMapping::default(),
)?
};
Ok(Self::from_expr(expr, None))
}
pub fn rank(&self, descending: bool) -> Column {
let opts = RankOptions {
method: RankMethod::Min,
descending,
};
Self::from_expr(self.expr().clone().rank(opts, None), None)
}
pub fn dense_rank(&self, descending: bool) -> Column {
let opts = RankOptions {
method: RankMethod::Dense,
descending,
};
Self::from_expr(self.expr().clone().rank(opts, None), None)
}
pub fn row_number(&self, descending: bool) -> Column {
use polars::prelude::*;
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let rank_expr = self
.expr()
.clone()
.cast(DataType::Float64)
.fill_null(lit(if descending {
f64::NEG_INFINITY
} else {
f64::INFINITY
}))
.rank(opts, None);
Self::from_expr(rank_expr, None)
}
pub fn row_number_over(
partition_by: &[&str],
order_by_encoded: &[String],
) -> Result<Column, PolarsError> {
use polars::prelude::*;
if order_by_encoded.is_empty() {
return Err(PolarsError::InvalidOperation(
"row_number_over: order_by_encoded cannot be empty".into(),
));
}
let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
vec![lit(1i32)]
} else {
partition_by.iter().map(|s| col(*s)).collect()
};
fn parse_order_key(s: &str) -> (&str, bool) {
let s = s.trim();
let descending = s.starts_with('-');
let name = if descending {
s.trim_start_matches('-').trim()
} else {
s
};
(name, descending)
}
let all_asc = order_by_encoded.iter().all(|s| !s.trim().starts_with('-'));
let rank_expr = if order_by_encoded.len() == 1 {
let (first_name, first_desc) = parse_order_key(order_by_encoded[0].trim());
let order_col = col(first_name)
.cast(DataType::Float64)
.fill_null(lit(if first_desc {
f64::NEG_INFINITY
} else {
f64::INFINITY
}));
let rank_input = if first_desc {
order_col.neg()
} else {
order_col
};
let opts = RankOptions {
method: RankMethod::Ordinal,
descending: false,
};
rank_input.rank(opts, None)
} else if all_asc {
let struct_fields: Vec<Expr> = order_by_encoded
.iter()
.map(|s| col(parse_order_key(s).0))
.collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending: false,
};
as_struct(struct_fields).rank(opts, None)
} else {
let struct_fields: Vec<Expr> = order_by_encoded
.iter()
.map(|s| {
let (name, desc) = parse_order_key(s);
if desc {
(col(name)
.cast(DataType::Float64)
.fill_null(lit(f64::NEG_INFINITY)))
.neg()
} else {
col(name)
.cast(DataType::Float64)
.fill_null(lit(f64::INFINITY))
}
})
.collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending: false,
};
as_struct(struct_fields).rank(opts, None)
};
let expr = rank_expr.over(partition_exprs);
Ok(Self::from_expr(expr, None))
}
pub fn lag(&self, n: i64) -> Column {
Self::from_expr(self.expr().clone().shift(polars::prelude::lit(n)), None)
}
pub fn lead(&self, n: i64) -> Column {
Self::from_expr(self.expr().clone().shift(polars::prelude::lit(-n)), None)
}
pub fn first_value(&self) -> Column {
let value_expr = self.expr().clone();
Column {
name: "first_value".to_string(),
expr: value_expr.clone().first(),
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: Some(FirstLastValue {
value_expr,
is_last: false,
}),
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn last_value(&self) -> Column {
let value_expr = self.expr().clone();
Column {
name: "last_value".to_string(),
expr: value_expr.clone().last(),
is_array_expr: false,
deferred: None,
udf_call: None,
source_for_running: None,
source_for_running_mean: None,
first_last_value: Some(FirstLastValue {
value_expr,
is_last: true,
}),
source_for_running_count: None,
range_window_spec: None,
}
}
pub fn percent_rank(&self, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Min,
descending,
};
let rank_expr = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let count_expr = self.expr().clone().count().over(partition_exprs.clone());
let rank_f = (rank_expr - lit(1i64)).cast(DataType::Float64);
let count_f = (count_expr - lit(1i64)).cast(DataType::Float64);
let pct = when(count_f.clone().gt(lit(0.0)))
.then(rank_f / count_f)
.otherwise(lit(0.0));
Self::from_expr(pct, None)
}
pub fn cume_dist(&self, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let row_num = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let count_expr = self.expr().clone().count().over(partition_exprs.clone());
let count_f = count_expr.clone().cast(DataType::Float64);
let cume = when(count_f.clone().eq(lit(0.0)))
.then(lit(0.0))
.otherwise(row_num.cast(DataType::Float64) / count_f);
Self::from_expr(cume, None)
}
pub fn ntile(&self, n: u32, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = if partition_by.is_empty() {
vec![lit(1i32)]
} else {
partition_by.iter().map(|s| col(*s)).collect()
};
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let filled_expr = self
.expr()
.clone()
.cast(DataType::Float64)
.fill_null(lit(f64::NEG_INFINITY));
let rank_expr = filled_expr
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let count_expr = len().over(partition_exprs.clone());
let n_expr = lit(n as f64);
let rank_f = rank_expr.cast(DataType::Float64);
let count_f = count_expr.cast(DataType::Float64);
let bucket = when(count_f.clone().eq(lit(0.0))).then(lit(1.0)).otherwise(
((rank_f.clone() - lit(1.0)) * n_expr.clone() / count_f.clone()).floor() + lit(1.0),
);
let clamped = bucket.clip(lit(1.0), lit(n as f64));
Self::from_expr(clamped.cast(DataType::Int32), None)
}
pub fn nth_value(&self, n: i64, partition_by: &[&str], descending: bool) -> Column {
use polars::prelude::*;
let partition_exprs: Vec<Expr> = partition_by.iter().map(|s| col(*s)).collect();
let opts = RankOptions {
method: RankMethod::Ordinal,
descending,
};
let rank_expr = self
.expr()
.clone()
.rank(opts, None)
.over(partition_exprs.clone());
let cond_col = Self::from_expr(rank_expr.eq(lit(n)), None);
let null_col = Self::from_expr(lit(NULL), None);
let value_col = Self::from_expr(self.expr().clone(), None);
let when_expr = crate::functions::when(&cond_col)
.then(&value_col)
.otherwise(&null_col)
.into_expr();
let windowed = when_expr.max().over(partition_exprs);
Self::from_expr(windowed, None)
}
pub fn array_size(&self) -> Column {
use polars::prelude::*;
let len_expr = self.expr().clone().list().len().cast(DataType::Int32);
let expr = when(self.expr().clone().is_null())
.then(lit(-1i32))
.otherwise(len_expr)
.cast(DataType::Int32);
Self::from_expr(expr, Some("size".to_string()))
}
pub fn cardinality(&self) -> Column {
self.array_size()
}
pub fn array_contains(&self, value: Expr) -> Column {
use polars::prelude::*;
let args = [value];
let base_expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_contains(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
);
let is_null = self.expr().clone().is_null();
let expr = when(is_null)
.then(lit(NULL))
.otherwise(base_expr)
.cast(DataType::Boolean);
Self::from_expr(expr, None)
}
pub fn array_join(&self, separator: &str, null_replacement: Option<&str>) -> Column {
use polars::prelude::*;
let elem_to_str = if let Some(repl) = null_replacement {
col("").cast(DataType::String).fill_null(lit(repl))
} else {
col("").cast(DataType::String)
};
let list_expr = self.expr().clone().list().eval(elem_to_str);
let ignore_nulls = null_replacement.is_none();
let joined = list_expr
.list()
.join(lit(separator.to_string()), ignore_nulls);
Self::from_expr(joined, None)
}
pub fn array_max(&self) -> Column {
Self::from_expr(self.expr().clone().list().max(), None)
}
pub fn array_min(&self) -> Column {
Self::from_expr(self.expr().clone().list().min(), None)
}
pub fn element_at(&self, index: i64) -> Column {
use polars::prelude::*;
let idx = if index >= 1 { index - 1 } else { index };
Self::from_expr(self.expr().clone().list().get(lit(idx), true), None)
}
pub fn get_item(&self, index: i64) -> Column {
use polars::prelude::*;
Self::from_expr(self.expr().clone().list().get(lit(index), true), None)
}
pub fn get_field(&self, name: &str) -> Column {
Self::from_expr(
self.expr().clone().struct_().field_by_name(name),
Some(name.to_string()),
)
}
pub fn with_field(&self, name: &str, value: &Column) -> Column {
self.try_with_field(name, value)
.expect("with_field: column must be struct type")
}
pub fn try_with_field(
&self,
name: &str,
value: &Column,
) -> Result<Column, polars::error::PolarsError> {
use polars::prelude::PlSmallStr;
let field_name = name.to_string();
let field_name_schema = field_name.clone();
let args = [value.expr().clone()];
let expr = self.expr().clone().map_many(
move |cols| {
expect_col(crate::udfs::apply_struct_with_field(
cols[0].clone(),
cols[1].clone(),
&field_name,
))
},
&args,
move |_schema, fields| {
let struct_field = &fields[0];
let struct_dtype = struct_field.dtype();
let inner: &[Field] = match struct_dtype {
DataType::Struct(f) => f.as_ref(),
_ => return Ok(struct_field.clone()),
};
let value_dtype = fields[1].dtype().clone();
let known_value_dtype = if value_dtype.is_known() {
value_dtype
} else if let DataType::Unknown(uk) = &value_dtype {
uk.materialize().unwrap_or(DataType::String)
} else {
DataType::String
};
let mut new_fields: Vec<Field> = inner.to_vec();
let mut replaced = false;
for f in &mut new_fields {
if f.name.as_str() == field_name_schema {
let dtype = if known_value_dtype.is_known() {
known_value_dtype.clone()
} else if f.dtype.is_known() {
f.dtype.clone()
} else {
DataType::String
};
*f = Field::new(PlSmallStr::from(f.name.as_str()), dtype);
replaced = true;
break;
}
}
if !replaced {
new_fields.push(Field::new(
PlSmallStr::from(field_name_schema.as_str()),
known_value_dtype,
));
}
let out_dtype = DataType::Struct(new_fields);
Ok(Field::new(struct_field.name().clone(), out_dtype))
},
);
Ok(Self::from_expr(expr, None))
}
pub fn array_sort(&self, asc: bool) -> Column {
use polars::prelude::SortOptions;
let opts = SortOptions {
descending: !asc,
nulls_last: !asc, ..Default::default()
};
Self::from_expr(self.expr().clone().list().sort(opts), None)
}
pub fn array_distinct(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_array_distinct_first_order(s)),
|_schema, field| {
let new_name = format!("array_distinct({})", field.name());
Ok(Field::new(new_name.into(), field.dtype().clone()))
},
);
Self::from_expr(expr, None)
}
pub fn mode(&self) -> Column {
let vc = self
.expr()
.clone()
.value_counts(true, false, "count", false);
let first_struct = vc.first();
let val_expr = first_struct.struct_().field_by_index(0);
Self::from_expr(val_expr, Some("mode".to_string()))
}
pub fn array_slice(&self, start: i64, length: Option<i64>) -> Column {
use polars::prelude::*;
let list_expr = self.expr().clone();
let len_expr = list_expr.clone().list().len().cast(DataType::Int64);
let start_expr = when(lit(start).lt(0))
.then(
when((len_expr.clone() + lit(start)).lt(0))
.then(lit(0i64))
.otherwise(len_expr + lit(start)),
)
.otherwise(lit((start - 1).max(0)));
let length_expr = length.map(lit).unwrap_or_else(|| lit(i64::MAX));
Self::from_expr(list_expr.list().slice(start_expr, length_expr), None)
}
pub fn explode(&self) -> Column {
use polars::prelude::ExplodeOptions;
Self::from_expr(
self.expr().clone().explode(ExplodeOptions {
empty_as_null: false,
keep_nulls: false,
}),
None,
)
}
pub fn explode_outer(&self) -> Column {
use polars::prelude::ExplodeOptions;
Self::from_expr(
self.expr().clone().explode(ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
}),
None,
)
}
pub fn posexplode_outer(&self) -> (Column, Column) {
use polars::prelude::{ExplodeOptions, as_struct};
let opts = ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
};
let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
let val_inner = col("").alias("col");
let struct_expr = as_struct(vec![pos_inner, val_inner]);
let list_struct_expr = self.expr().clone().list().eval(struct_expr);
let struct_exploded = list_struct_expr.explode(opts);
let pos_expr = struct_exploded.clone().struct_().field_by_name("pos");
let val_expr = struct_exploded.struct_().field_by_name("col");
(
Self::from_expr(pos_expr, Some("pos".to_string())),
Self::from_expr(val_expr, Some("col".to_string())),
)
}
pub fn arrays_zip(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_arrays_zip(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn arrays_overlap(&self, other: &Column) -> Column {
use polars::prelude::*;
let args = [other.expr().clone()];
let base_expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_arrays_overlap(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
);
let is_null = self
.expr()
.clone()
.is_null()
.or(other.expr().clone().is_null());
let expr = polars::prelude::when(is_null)
.then(lit(NULL))
.otherwise(base_expr)
.cast(DataType::Boolean);
Self::from_expr(expr, None)
}
pub fn array_agg(&self) -> Column {
Self::from_expr(self.expr().clone().implode(), None)
}
pub fn array_position(&self, value: Expr) -> Column {
use polars::prelude::{DataType, NULL};
let cond = Self::from_expr(col("").eq(value), None);
let then_val = Self::from_expr(col("").cum_count(false), None);
let else_val = Self::from_expr(lit(NULL), None);
let idx_expr = crate::functions::when(&cond)
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(idx_expr)
.list()
.min()
.fill_null(lit(0i64))
.cast(DataType::Int64);
Self::from_expr(list_expr, Some("array_position".to_string()))
}
pub fn array_compact(&self) -> Column {
let list_expr = self.expr().clone().list().drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn array_remove(&self, value: Expr) -> Column {
use polars::prelude::NULL;
let cond = Self::from_expr(col("").neq(value), None);
let then_val = Self::from_expr(col(""), None);
let else_val = Self::from_expr(lit(NULL), None);
let elem_neq = crate::functions::when(&cond)
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(elem_neq)
.list()
.drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn array_repeat(&self, n: i64) -> Column {
let expr = self.expr().clone().map(
move |c| expect_col(crate::udfs::apply_array_repeat(c, n)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn array_flatten(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_array_flatten(s)),
|_schema, field| Ok(field.clone()),
);
Self::from_expr(expr, None)
}
pub fn array_append(&self, elem: &Column) -> Column {
let args = [elem.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_append(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn array_prepend(&self, elem: &Column) -> Column {
let args = [elem.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_prepend(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn array_insert(&self, pos: &Column, elem: &Column) -> Column {
let args = [pos.expr().clone(), elem.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_insert(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn array_except(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_except(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn array_intersect(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_intersect(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn array_union(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_array_union(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn zip_with(&self, other: &Column, merge: Expr) -> Column {
let args = [other.expr().clone()];
let zip_expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_zip_arrays_to_struct(cols)),
&args,
|_schema, fields| {
let left_inner = match &fields[0].dtype {
DataType::List(inner) => *inner.clone(),
_ => DataType::Unknown(Default::default()),
};
let right_inner = match fields.get(1).map(|f| &f.dtype) {
Some(DataType::List(inner)) => *inner.clone(),
_ => DataType::Unknown(Default::default()),
};
let struct_dtype = DataType::Struct(vec![
Field::new("left".into(), left_inner),
Field::new("right".into(), right_inner),
]);
Ok(Field::new(
fields[0].name().clone(),
DataType::List(Box::new(struct_dtype)),
))
},
);
let list_expr = zip_expr.list().eval(merge);
Self::from_expr(list_expr, None)
}
pub fn array_exists(&self, predicate: Expr) -> Column {
let pred_expr = self.expr().clone().list().eval(predicate).list().any();
Self::from_expr(pred_expr, Some("exists".to_string()))
}
pub fn array_forall(&self, predicate: Expr) -> Column {
let pred_expr = self.expr().clone().list().eval(predicate).list().all();
Self::from_expr(pred_expr, Some("forall".to_string()))
}
pub fn array_filter(&self, predicate: Expr) -> Column {
use polars::prelude::NULL;
let then_val = Self::from_expr(col(""), None);
let else_val = Self::from_expr(lit(NULL), None);
let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(elem_expr)
.list()
.drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn array_transform(&self, f: Expr) -> Column {
let list_expr = self.expr().clone().list().eval(f);
Self::from_expr(list_expr, None)
}
pub fn array_sum(&self) -> Column {
Self::from_expr(self.expr().clone().list().sum(), None)
}
pub fn array_aggregate(&self, zero: &Column) -> Column {
let sum_expr = self.expr().clone().list().sum();
Self::from_expr(sum_expr + zero.expr().clone(), None)
}
pub fn array_mean(&self) -> Column {
Self::from_expr(self.expr().clone().list().mean(), None)
}
pub fn posexplode(&self) -> (Column, Column) {
use polars::prelude::{ExplodeOptions, as_struct};
let opts = ExplodeOptions {
empty_as_null: false,
keep_nulls: false,
};
let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
let val_inner = col("").alias("col");
let struct_expr = as_struct(vec![pos_inner, val_inner]);
let list_struct_expr = self.expr().clone().list().eval(struct_expr);
let struct_exploded = list_struct_expr.explode(opts);
let pos_expr = struct_exploded.clone().struct_().field_by_name("pos");
let val_expr = struct_exploded.struct_().field_by_name("col");
(
Self::from_expr(pos_expr, Some("pos".to_string())),
Self::from_expr(val_expr, Some("col".to_string())),
)
}
pub fn map_keys(&self) -> Column {
let elem_key = col("").struct_().field_by_name("key");
let list_expr = self.expr().clone().list().eval(elem_key);
Self::from_expr(list_expr, None)
}
pub fn map_values(&self) -> Column {
let elem_val = col("").struct_().field_by_name("value");
let list_expr = self.expr().clone().list().eval(elem_val);
Self::from_expr(list_expr, None)
}
pub fn map_entries(&self) -> Column {
Self::from_expr(self.expr().clone(), None)
}
pub fn map_from_arrays(&self, values: &Column) -> Column {
let args = [values.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_map_from_arrays(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn map_concat(&self, other: &Column) -> Column {
let args = [other.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_map_concat(cols)),
&args,
|_schema, fields| Ok(fields[0].clone()),
);
Self::from_expr(expr, None)
}
pub fn transform_keys(&self, key_expr: Expr) -> Column {
use polars::prelude::as_struct;
let value = col("").struct_().field_by_name("value");
let new_struct = as_struct(vec![key_expr.alias("key"), value.alias("value")]);
let list_expr = self.expr().clone().list().eval(new_struct);
Self::from_expr(list_expr, None)
}
pub fn transform_values(&self, value_expr: Expr) -> Column {
use polars::prelude::as_struct;
let key = col("").struct_().field_by_name("key");
let new_struct = as_struct(vec![key.alias("key"), value_expr.alias("value")]);
let list_expr = self.expr().clone().list().eval(new_struct);
Self::from_expr(list_expr, None)
}
pub fn map_zip_with(&self, other: &Column, merge: Expr) -> Column {
use polars::prelude::as_struct;
let args = [other.expr().clone()];
let zip_expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_map_zip_to_struct(cols)),
&args,
|_schema, fields| {
let list_inner = match &fields[0].dtype {
DataType::List(inner) => *inner.clone(),
_ => return Ok(fields[0].clone()),
};
let (key_dtype, value_dtype) = match &list_inner {
DataType::Struct(struct_fields) => {
let k = struct_fields
.iter()
.find(|f| f.name.as_str() == "key")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
let v = struct_fields
.iter()
.find(|f| f.name.as_str() == "value")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
(k, v)
}
_ => (DataType::String, DataType::String),
};
let out_struct = DataType::Struct(vec![
Field::new("key".into(), key_dtype),
Field::new("value1".into(), value_dtype.clone()),
Field::new("value2".into(), value_dtype),
]);
Ok(Field::new(
fields[0].name().clone(),
DataType::List(Box::new(out_struct)),
))
},
);
let key_field = col("").struct_().field_by_name("key").alias("key");
let value_field = merge.alias("value");
let merge_expr = as_struct(vec![key_field, value_field]);
let list_expr = zip_expr.list().eval(merge_expr);
Self::from_expr(list_expr, None)
}
pub fn map_filter(&self, predicate: Expr) -> Column {
use polars::prelude::NULL;
let then_val = Self::from_expr(col(""), None);
let else_val = Self::from_expr(lit(NULL), None);
let elem_expr = crate::functions::when(&Self::from_expr(predicate, None))
.then(&then_val)
.otherwise(&else_val)
.into_expr();
let list_expr = self
.expr()
.clone()
.list()
.eval(elem_expr)
.list()
.drop_nulls();
Self::from_expr(list_expr, None)
}
pub fn map_from_entries(&self) -> Column {
Self::from_expr(self.expr().clone(), None)
}
pub fn map_contains_key(&self, key: &Column) -> Column {
let args = [key.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_map_contains_key(cols)),
&args,
|_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Boolean)),
);
Self::from_expr(expr, None)
}
pub fn get(&self, key: &Column) -> Column {
let args = [key.expr().clone()];
let expr = self.expr().clone().map_many(
|cols| expect_col(crate::udfs::apply_get(cols)),
&args,
|_schema, fields| {
let dtype = &fields[0].dtype;
let value_dtype = match dtype {
DataType::List(inner) => match inner.as_ref() {
DataType::Struct(struct_fields) => struct_fields
.iter()
.find(|f| f.name == "value")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String),
_ => DataType::String,
},
DataType::Struct(struct_fields) => struct_fields
.first()
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String),
_ => DataType::String,
};
Ok(Field::new(fields[0].name().clone(), value_dtype))
},
);
Self::from_expr(expr, None)
}
pub fn get_json_object(&self, path: &str) -> Column {
let path = path.to_string();
let expr = self
.expr()
.clone()
.map(
move |s| expect_col(crate::udfs::apply_get_json_object(s, &path)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
)
.cast(DataType::String);
Self::from_expr(expr, None)
}
pub fn from_json(&self, schema: Option<polars::datatypes::DataType>) -> Column {
use polars::prelude::DataType;
let dtype = schema.unwrap_or(DataType::String);
let out = self.expr().clone().str().json_decode(dtype);
Self::from_expr(out, None)
}
pub fn to_json(&self) -> Column {
let out = self.expr().clone().struct_().json_encode();
Self::from_expr(out, None)
}
pub fn json_array_length(&self, path: &str) -> Column {
let path = path.to_string();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_json_array_length(s, &path)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn json_object_keys(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_json_object_keys(s)),
|_schema, field| {
Ok(Field::new(
field.name().clone(),
DataType::List(Box::new(DataType::String)),
))
},
);
Self::from_expr(expr, None)
}
pub fn json_tuple(&self, keys: &[&str]) -> Column {
let keys_vec: Vec<String> = keys.iter().map(|s| (*s).to_string()).collect();
let struct_fields: Vec<polars::datatypes::Field> = keys_vec
.iter()
.map(|k| polars::datatypes::Field::new(k.as_str().into(), DataType::String))
.collect();
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_json_tuple(s, &keys_vec)),
move |_schema, field| {
Ok(Field::new(
field.name().clone(),
DataType::Struct(struct_fields.clone()),
))
},
);
Self::from_expr(expr, None)
}
pub fn from_csv(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_from_csv(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Struct(vec![]))),
);
Self::from_expr(expr, None)
}
pub fn to_csv(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_to_csv(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn parse_url(&self, part: &str, key: Option<&str>) -> Column {
let part = part.to_string();
let key_owned = key.map(String::from);
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_parse_url(s, &part, key_owned.as_deref())),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn hash(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_hash_one(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
pub fn isin(&self, other: &Column) -> Column {
let out = self.expr().clone().is_in(other.expr().clone(), false);
Self::from_expr(out, None)
}
pub fn url_decode(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_url_decode(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn url_encode(&self) -> Column {
let expr = self.expr().clone().map(
|s| expect_col(crate::udfs::apply_url_encode(s)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::String)),
);
Self::from_expr(expr, None)
}
pub fn shift_left(&self, n: i32) -> Column {
use polars::prelude::*;
let pow = lit(2i64).pow(lit(n as i64));
Self::from_expr(
(self.expr().clone().cast(DataType::Int64) * pow).cast(DataType::Int64),
None,
)
}
pub fn shift_right(&self, n: i32) -> Column {
use polars::prelude::*;
let pow = lit(2i64).pow(lit(n as i64));
Self::from_expr(
(self.expr().clone().cast(DataType::Int64) / pow).cast(DataType::Int64),
None,
)
}
pub fn shift_right_unsigned(&self, n: i32) -> Column {
let expr = self.expr().clone().map(
move |s| expect_col(crate::udfs::apply_shift_right_unsigned(s, n)),
|_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
);
Self::from_expr(expr, None)
}
}
#[cfg(test)]
mod tests {
use super::Column;
use polars::prelude::{IntoLazy, col, df, lit};
fn test_df() -> polars::prelude::DataFrame {
df!(
"a" => &[1, 2, 3, 4, 5],
"b" => &[10, 20, 30, 40, 50]
)
.unwrap()
}
fn test_df_with_nulls() -> polars::prelude::DataFrame {
df!(
"a" => &[Some(1), Some(2), None, Some(4), None],
"b" => &[Some(10), None, Some(30), None, None]
)
.unwrap()
}
#[test]
fn test_column_new() {
let column = Column::new("age".to_string());
assert_eq!(column.name(), "age");
}
#[test]
fn test_column_from_expr() {
let expr = col("test");
let column = Column::from_expr(expr, Some("test".to_string()));
assert_eq!(column.name(), "test");
}
#[test]
fn test_column_from_expr_default_name() {
let expr = col("test").gt(lit(5));
let column = Column::from_expr(expr, None);
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_column_alias() {
let column = Column::new("original".to_string());
let aliased = column.alias("new_name");
assert_eq!(aliased.name(), "new_name");
}
#[test]
fn test_column_gt() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.gt(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 2); }
#[test]
fn test_column_lt() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.lt(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 2); }
#[test]
fn test_column_eq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.eq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 1); }
#[test]
fn test_column_neq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.neq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 4); }
#[test]
fn test_column_gt_eq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.gt_eq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 3); }
#[test]
fn test_column_lt_eq() {
let df = test_df();
let column = Column::new("a".to_string());
let result = column.lt_eq(lit(3));
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 3); }
#[test]
fn test_column_is_null() {
let df = test_df_with_nulls();
let column = Column::new("a".to_string());
let result = column.is_null();
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 2); }
#[test]
fn test_column_is_not_null() {
let df = test_df_with_nulls();
let column = Column::new("a".to_string());
let result = column.is_not_null();
let filtered = df.lazy().filter(result.into_expr()).collect().unwrap();
assert_eq!(filtered.height(), 3); }
#[test]
fn test_null_boolean_column_produces_null_bool_series() {
let df = test_df();
let expr = Column::null_boolean().into_expr();
let out = df
.lazy()
.select([expr.alias("null_bool")])
.collect()
.unwrap();
let s = out.column("null_bool").unwrap();
assert_eq!(s.dtype(), &polars::prelude::DataType::Boolean);
assert_eq!(s.null_count(), s.len());
}
#[test]
fn test_eq_null_safe_both_null() {
let df = df!(
"a" => &[Some(1), None, Some(3)],
"b" => &[Some(1), None, Some(4)]
)
.unwrap();
let col_a = Column::new("a".to_string());
let col_b = Column::new("b".to_string());
let result = col_a.eq_null_safe(&col_b);
let result_df = df
.lazy()
.with_column(result.into_expr().alias("eq_null_safe"))
.collect()
.unwrap();
let eq_col = result_df.column("eq_null_safe").unwrap();
let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
assert_eq!(values[0], Some(true));
assert_eq!(values[1], Some(true)); assert_eq!(values[2], Some(false));
}
#[test]
fn test_eq_null_safe_one_null() {
let df = df!(
"a" => &[Some(1), None, Some(3)],
"b" => &[Some(1), Some(2), None]
)
.unwrap();
let col_a = Column::new("a".to_string());
let col_b = Column::new("b".to_string());
let result = col_a.eq_null_safe(&col_b);
let result_df = df
.lazy()
.with_column(result.into_expr().alias("eq_null_safe"))
.collect()
.unwrap();
let eq_col = result_df.column("eq_null_safe").unwrap();
let values: Vec<Option<bool>> = eq_col.bool().unwrap().into_iter().collect();
assert_eq!(values[0], Some(true));
assert_eq!(values[1], Some(false));
assert_eq!(values[2], Some(false));
}
}