use crate::column::Column;
use crate::dataframe::DataFrame;
use polars::prelude::*;
#[derive(Debug, Clone)]
pub struct SortOrder {
pub(crate) expr: Expr,
pub(crate) descending: bool,
pub(crate) nulls_last: bool,
}
impl SortOrder {
pub fn expr(&self) -> &Expr {
&self.expr
}
}
pub fn asc(column: &Column) -> SortOrder {
SortOrder {
expr: column.expr().clone(),
descending: false,
nulls_last: false,
}
}
pub fn asc_nulls_first(column: &Column) -> SortOrder {
SortOrder {
expr: column.expr().clone(),
descending: false,
nulls_last: false,
}
}
pub fn asc_nulls_last(column: &Column) -> SortOrder {
SortOrder {
expr: column.expr().clone(),
descending: false,
nulls_last: true,
}
}
pub fn desc(column: &Column) -> SortOrder {
SortOrder {
expr: column.expr().clone(),
descending: true,
nulls_last: true,
}
}
pub fn desc_nulls_first(column: &Column) -> SortOrder {
SortOrder {
expr: column.expr().clone(),
descending: true,
nulls_last: false,
}
}
pub fn desc_nulls_last(column: &Column) -> SortOrder {
SortOrder {
expr: column.expr().clone(),
descending: true,
nulls_last: true,
}
}
pub fn parse_type_name(name: &str) -> Result<DataType, String> {
let s = name.trim().to_lowercase();
if s.starts_with("decimal(") && s.contains(')') {
return Ok(DataType::Float64);
}
Ok(match s.as_str() {
"int" | "integer" => DataType::Int32,
"long" | "bigint" => DataType::Int64,
"float" => DataType::Float32,
"double" => DataType::Float64,
"string" | "str" => DataType::String,
"boolean" | "bool" => DataType::Boolean,
"date" => DataType::Date,
"timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
_ => return Err(format!("unknown type name: {name}")),
})
}
pub fn col(name: &str) -> Column {
Column::new(name.to_string())
}
pub fn grouping(column: &Column) -> Column {
let _ = column;
Column::from_expr(lit(0i32), Some("grouping".to_string()))
}
pub fn grouping_id(_columns: &[Column]) -> Column {
Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
}
pub fn lit_i32(value: i32) -> Column {
let expr: Expr = lit(value);
Column::from_expr(expr, None)
}
pub fn lit_i64(value: i64) -> Column {
let expr: Expr = lit(value);
Column::from_expr(expr, None)
}
pub fn lit_f64(value: f64) -> Column {
let expr: Expr = lit(value);
Column::from_expr(expr, None)
}
pub fn lit_bool(value: bool) -> Column {
let expr: Expr = lit(value);
Column::from_expr(expr, None)
}
pub fn lit_str(value: &str) -> Column {
let expr: Expr = lit(value);
Column::from_expr(expr, None)
}
pub fn count(col: &Column) -> Column {
Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
}
pub fn sum(col: &Column) -> Column {
Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
}
pub fn avg(col: &Column) -> Column {
Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
}
pub fn mean(col: &Column) -> Column {
avg(col)
}
pub fn max(col: &Column) -> Column {
Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
}
pub fn min(col: &Column) -> Column {
Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
}
pub fn first(col: &Column, ignorenulls: bool) -> Column {
let _ = ignorenulls;
Column::from_expr(col.expr().clone().first(), None)
}
pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
let _ = ignorenulls;
Column::from_expr(col.expr().clone().first(), None)
}
pub fn count_if(col: &Column) -> Column {
use polars::prelude::DataType;
Column::from_expr(
col.expr().clone().cast(DataType::Int64).sum(),
Some("count_if".to_string()),
)
}
pub fn try_sum(col: &Column) -> Column {
Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
}
pub fn try_avg(col: &Column) -> Column {
Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
}
pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
use polars::prelude::{as_struct, SortOptions};
let st = as_struct(vec![
ord_col.expr().clone().alias("_ord"),
value_col.expr().clone().alias("_val"),
]);
let e = st
.sort(SortOptions::default().with_order_descending(true))
.first()
.struct_()
.field_by_name("_val");
Column::from_expr(e, None)
}
pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
use polars::prelude::{as_struct, SortOptions};
let st = as_struct(vec![
ord_col.expr().clone().alias("_ord"),
value_col.expr().clone().alias("_val"),
]);
let e = st
.sort(SortOptions::default())
.first()
.struct_()
.field_by_name("_val");
Column::from_expr(e, None)
}
pub fn collect_list(col: &Column) -> Column {
Column::from_expr(
col.expr().clone().implode(),
Some("collect_list".to_string()),
)
}
pub fn collect_set(col: &Column) -> Column {
Column::from_expr(
col.expr().clone().unique().implode(),
Some("collect_set".to_string()),
)
}
pub fn bool_and(col: &Column) -> Column {
Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
}
pub fn every(col: &Column) -> Column {
Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
}
pub fn stddev(col: &Column) -> Column {
Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
}
pub fn variance(col: &Column) -> Column {
Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
}
pub fn stddev_pop(col: &Column) -> Column {
Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
}
pub fn stddev_samp(col: &Column) -> Column {
stddev(col)
}
pub fn std(col: &Column) -> Column {
stddev(col)
}
pub fn var_pop(col: &Column) -> Column {
Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
}
pub fn var_samp(col: &Column) -> Column {
variance(col)
}
pub fn median(col: &Column) -> Column {
use polars::prelude::QuantileMethod;
Column::from_expr(
col.expr()
.clone()
.quantile(lit(0.5), QuantileMethod::Linear),
Some("median".to_string()),
)
}
pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
use polars::prelude::QuantileMethod;
Column::from_expr(
col.expr()
.clone()
.quantile(lit(percentage), QuantileMethod::Linear),
Some(format!("approx_percentile({percentage})")),
)
}
pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
approx_percentile(col, percentage, accuracy)
}
pub fn mode(col: &Column) -> Column {
col.clone().mode()
}
pub fn count_distinct(col: &Column) -> Column {
use polars::prelude::DataType;
Column::from_expr(
col.expr().clone().n_unique().cast(DataType::Int64),
Some("count_distinct".to_string()),
)
}
pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
use polars::prelude::DataType;
Column::from_expr(
col.expr().clone().n_unique().cast(DataType::Int64),
Some("approx_count_distinct".to_string()),
)
}
pub fn kurtosis(col: &Column) -> Column {
Column::from_expr(
col.expr()
.clone()
.cast(DataType::Float64)
.kurtosis(true, true),
Some("kurtosis".to_string()),
)
}
pub fn skewness(col: &Column) -> Column {
Column::from_expr(
col.expr().clone().cast(DataType::Float64).skew(true),
Some("skewness".to_string()),
)
}
pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
use polars::prelude::{col as pl_col, len};
let c1 = pl_col(col1).cast(DataType::Float64);
let c2 = pl_col(col2).cast(DataType::Float64);
let n = len().cast(DataType::Float64);
let sum_ab = (c1.clone() * c2.clone()).sum();
let sum_a = pl_col(col1).sum().cast(DataType::Float64);
let sum_b = pl_col(col2).sum().cast(DataType::Float64);
(sum_ab - sum_a * sum_b / n.clone()) / n
}
pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
use polars::prelude::len;
let c1 = col1.expr().clone().cast(DataType::Float64);
let c2 = col2.expr().clone().cast(DataType::Float64);
let n = len().cast(DataType::Float64);
let sum_ab = (c1.clone() * c2.clone()).sum();
let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
Column::from_expr(e, Some("covar_pop".to_string()))
}
pub fn corr(col1: &Column, col2: &Column) -> Column {
use polars::prelude::{len, lit, when};
let c1 = col1.expr().clone().cast(DataType::Float64);
let c2 = col2.expr().clone().cast(DataType::Float64);
let n = len().cast(DataType::Float64);
let n1 = (len() - lit(1)).cast(DataType::Float64);
let sum_ab = (c1.clone() * c2.clone()).sum();
let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
let sum_a2 = (c1.clone() * c1).sum();
let sum_b2 = (c2.clone() * c2).sum();
let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
let std_a = var_a.sqrt();
let std_b = var_b.sqrt();
let e = when(len().gt(lit(1)))
.then(cov_samp / (std_a * std_b))
.otherwise(lit(f64::NAN));
Column::from_expr(e, Some("corr".to_string()))
}
pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
use polars::prelude::{col as pl_col, len, lit, when};
let c1 = pl_col(col1).cast(DataType::Float64);
let c2 = pl_col(col2).cast(DataType::Float64);
let n = len().cast(DataType::Float64);
let sum_ab = (c1.clone() * c2.clone()).sum();
let sum_a = pl_col(col1).sum().cast(DataType::Float64);
let sum_b = pl_col(col2).sum().cast(DataType::Float64);
when(len().gt(lit(1)))
.then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
.otherwise(lit(f64::NAN))
}
pub fn corr_expr(col1: &str, col2: &str) -> Expr {
use polars::prelude::{col as pl_col, len, lit, when};
let c1 = pl_col(col1).cast(DataType::Float64);
let c2 = pl_col(col2).cast(DataType::Float64);
let n = len().cast(DataType::Float64);
let n1 = (len() - lit(1)).cast(DataType::Float64);
let sum_ab = (c1.clone() * c2.clone()).sum();
let sum_a = pl_col(col1).sum().cast(DataType::Float64);
let sum_b = pl_col(col2).sum().cast(DataType::Float64);
let sum_a2 = (c1.clone() * c1).sum();
let sum_b2 = (c2.clone() * c2).sum();
let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
let std_a = var_a.sqrt();
let std_b = var_b.sqrt();
when(len().gt(lit(1)))
.then(cov_samp / (std_a * std_b))
.otherwise(lit(f64::NAN))
}
fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
use polars::prelude::col as pl_col;
let y = pl_col(y_col).cast(DataType::Float64);
let x = pl_col(x_col).cast(DataType::Float64);
let cond = y.clone().is_not_null().and(x.clone().is_not_null());
let n = y
.clone()
.filter(cond.clone())
.count()
.cast(DataType::Float64);
let sum_x = x.clone().filter(cond.clone()).sum();
let sum_y = y.clone().filter(cond.clone()).sum();
let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
let sum_xy = (x * y).filter(cond).sum();
(n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
}
pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
let (n, ..) = regr_cond_and_sums(y_col, x_col);
n
}
pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
when(n.clone().gt(lit(0.0)))
.then(sum_x / n)
.otherwise(lit(f64::NAN))
}
pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
when(n.clone().gt(lit(0.0)))
.then(sum_y / n)
.otherwise(lit(f64::NAN))
}
pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
when(n.clone().gt(lit(0.0)))
.then(sum_xx - sum_x.clone() * sum_x / n)
.otherwise(lit(f64::NAN))
}
pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
when(n.clone().gt(lit(0.0)))
.then(sum_yy - sum_y.clone() * sum_y / n)
.otherwise(lit(f64::NAN))
}
pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
when(n.clone().gt(lit(0.0)))
.then(sum_xy - sum_x * sum_y / n)
.otherwise(lit(f64::NAN))
}
pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
.then(regr_sxy / regr_sxx)
.otherwise(lit(f64::NAN))
}
pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
let slope = regr_sxy.clone() / regr_sxx.clone();
let avg_y = sum_y / n.clone();
let avg_x = sum_x / n.clone();
when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
.then(avg_y - slope * avg_x)
.otherwise(lit(f64::NAN))
}
pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
use polars::prelude::{lit, when};
let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
let regr_sxy = sum_xy - sum_x * sum_y / n;
when(
regr_sxx
.clone()
.gt(lit(0.0))
.and(regr_syy.clone().gt(lit(0.0))),
)
.then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
.otherwise(lit(f64::NAN))
}
pub fn when(condition: &Column) -> WhenBuilder {
WhenBuilder::new(condition.expr().clone())
}
pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
use polars::prelude::*;
let null_expr = Expr::Literal(LiteralValue::Null);
let expr = polars::prelude::when(condition.expr().clone())
.then(value.expr().clone())
.otherwise(null_expr);
crate::column::Column::from_expr(expr, None)
}
pub struct WhenBuilder {
condition: Expr,
}
impl WhenBuilder {
fn new(condition: Expr) -> Self {
WhenBuilder { condition }
}
pub fn then(self, value: &Column) -> ThenBuilder {
use polars::prelude::*;
let when_then = when(self.condition).then(value.expr().clone());
ThenBuilder::new(when_then)
}
pub fn otherwise(self, _value: &Column) -> Column {
panic!("when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)");
}
}
pub struct ThenBuilder {
state: WhenThenState,
}
enum WhenThenState {
Single(Box<polars::prelude::Then>),
Chained(Box<polars::prelude::ChainedThen>),
}
pub struct ChainedWhenBuilder {
inner: polars::prelude::ChainedWhen,
}
impl ThenBuilder {
fn new(when_then: polars::prelude::Then) -> Self {
ThenBuilder {
state: WhenThenState::Single(Box::new(when_then)),
}
}
fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
ThenBuilder {
state: WhenThenState::Chained(Box::new(chained)),
}
}
pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
let chained_when = match self.state {
WhenThenState::Single(t) => t.when(condition.expr().clone()),
WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
};
ChainedWhenBuilder {
inner: chained_when,
}
}
pub fn otherwise(self, value: &Column) -> Column {
let expr = match self.state {
WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
};
crate::column::Column::from_expr(expr, None)
}
}
impl ChainedWhenBuilder {
pub fn then(self, value: &Column) -> ThenBuilder {
ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
}
}
pub fn upper(column: &Column) -> Column {
column.clone().upper()
}
pub fn lower(column: &Column) -> Column {
column.clone().lower()
}
pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
column.clone().substr(start, length)
}
pub fn length(column: &Column) -> Column {
column.clone().length()
}
pub fn trim(column: &Column) -> Column {
column.clone().trim()
}
pub fn ltrim(column: &Column) -> Column {
column.clone().ltrim()
}
pub fn rtrim(column: &Column) -> Column {
column.clone().rtrim()
}
pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
column.clone().btrim(trim_str)
}
pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
column.clone().locate(substr, pos)
}
pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
column.clone().conv(from_base, to_base)
}
pub fn hex(column: &Column) -> Column {
column.clone().hex()
}
pub fn unhex(column: &Column) -> Column {
column.clone().unhex()
}
pub fn encode(column: &Column, charset: &str) -> Column {
column.clone().encode(charset)
}
pub fn decode(column: &Column, charset: &str) -> Column {
column.clone().decode(charset)
}
pub fn to_binary(column: &Column, fmt: &str) -> Column {
column.clone().to_binary(fmt)
}
pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
column.clone().try_to_binary(fmt)
}
pub fn aes_encrypt(column: &Column, key: &str) -> Column {
column.clone().aes_encrypt(key)
}
pub fn aes_decrypt(column: &Column, key: &str) -> Column {
column.clone().aes_decrypt(key)
}
pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
column.clone().try_aes_decrypt(key)
}
pub fn bin(column: &Column) -> Column {
column.clone().bin()
}
pub fn getbit(column: &Column, pos: i64) -> Column {
column.clone().getbit(pos)
}
pub fn bit_and(left: &Column, right: &Column) -> Column {
left.clone().bit_and(right)
}
pub fn bit_or(left: &Column, right: &Column) -> Column {
left.clone().bit_or(right)
}
pub fn bit_xor(left: &Column, right: &Column) -> Column {
left.clone().bit_xor(right)
}
pub fn bit_count(column: &Column) -> Column {
column.clone().bit_count()
}
pub fn bitwise_not(column: &Column) -> Column {
column.clone().bitwise_not()
}
pub fn bitmap_bit_position(column: &Column) -> Column {
use polars::prelude::DataType;
let expr = column.expr().clone().cast(DataType::Int32);
Column::from_expr(expr, None)
}
pub fn bitmap_bucket_number(column: &Column) -> Column {
use polars::prelude::DataType;
let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
Column::from_expr(expr, None)
}
pub fn bitmap_count(column: &Column) -> Column {
use polars::prelude::{DataType, GetOutput};
let expr = column.expr().clone().map(
crate::udfs::apply_bitmap_count,
GetOutput::from_type(DataType::Int64),
);
Column::from_expr(expr, None)
}
pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
use polars::prelude::{DataType, GetOutput};
column.expr().clone().implode().map(
crate::udfs::apply_bitmap_construct_agg,
GetOutput::from_type(DataType::Binary),
)
}
pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
use polars::prelude::{DataType, GetOutput};
column.expr().clone().implode().map(
crate::udfs::apply_bitmap_or_agg,
GetOutput::from_type(DataType::Binary),
)
}
pub fn bit_get(column: &Column, pos: i64) -> Column {
getbit(column, pos)
}
pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
column.clone().assert_true(err_msg)
}
pub fn raise_error(message: &str) -> Column {
let msg = message.to_string();
let expr = lit(0i64).map(
move |_col| -> PolarsResult<Option<polars::prelude::Column>> {
Err(PolarsError::ComputeError(msg.clone().into()))
},
GetOutput::from_type(DataType::Int64),
);
Column::from_expr(expr, Some("raise_error".to_string()))
}
pub fn broadcast(df: &DataFrame) -> DataFrame {
df.clone()
}
pub fn spark_partition_id() -> Column {
Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
}
pub fn input_file_name() -> Column {
Column::from_expr(lit(""), Some("input_file_name".to_string()))
}
pub fn monotonically_increasing_id() -> Column {
Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
}
pub fn current_catalog() -> Column {
Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
}
pub fn current_database() -> Column {
Column::from_expr(lit("default"), Some("current_database".to_string()))
}
pub fn current_schema() -> Column {
Column::from_expr(lit("default"), Some("current_schema".to_string()))
}
pub fn current_user() -> Column {
Column::from_expr(lit("unknown"), Some("current_user".to_string()))
}
pub fn user() -> Column {
Column::from_expr(lit("unknown"), Some("user".to_string()))
}
pub fn rand(seed: Option<u64>) -> Column {
Column::from_rand(seed)
}
pub fn randn(seed: Option<u64>) -> Column {
Column::from_randn(seed)
}
pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
use polars::prelude::Column as PlColumn;
let session = crate::session::get_thread_udf_session().ok_or_else(|| {
PolarsError::InvalidOperation(
"call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
)
})?;
let case_sensitive = session.is_case_sensitive();
let udf = session
.udf_registry
.get_rust_udf(name, case_sensitive)
.ok_or_else(|| {
PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
})?;
let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
let output_type = DataType::String;
let expr = if exprs.len() == 1 {
let udf = udf.clone();
exprs.into_iter().next().unwrap().map(
move |c| {
let s = c.take_materialized_series();
udf.apply(&[s])
.map(|out| Some(PlColumn::new("_".into(), out)))
},
GetOutput::from_type(output_type),
)
} else {
let udf = udf.clone();
let first = exprs[0].clone();
let rest: Vec<Expr> = exprs[1..].to_vec();
first.map_many(
move |columns| {
let series: Vec<Series> = columns
.iter_mut()
.map(|c| std::mem::take(c).take_materialized_series())
.collect();
udf.apply(&series)
.map(|out| Some(PlColumn::new("_".into(), out)))
},
&rest,
GetOutput::from_type(output_type),
)
};
Ok(Column::from_expr(expr, Some(format!("{name}()"))))
}
pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
left.clone().arrays_overlap(right)
}
pub fn arrays_zip(left: &Column, right: &Column) -> Column {
left.clone().arrays_zip(right)
}
pub fn explode_outer(column: &Column) -> Column {
column.clone().explode_outer()
}
pub fn posexplode_outer(column: &Column) -> (Column, Column) {
column.clone().posexplode_outer()
}
pub fn array_agg(column: &Column) -> Column {
column.clone().array_agg()
}
pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
column.clone().transform_keys(key_expr)
}
pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
column.clone().transform_values(value_expr)
}
pub fn str_to_map(
column: &Column,
pair_delim: Option<&str>,
key_value_delim: Option<&str>,
) -> Column {
let pd = pair_delim.unwrap_or(",");
let kvd = key_value_delim.unwrap_or(":");
column.clone().str_to_map(pd, kvd)
}
pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
column.clone().regexp_extract(pattern, group_index)
}
pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
column.clone().regexp_replace(pattern, replacement)
}
pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
column.clone().split(delimiter, limit)
}
pub fn initcap(column: &Column) -> Column {
column.clone().initcap()
}
pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
column.clone().regexp_extract_all(pattern)
}
pub fn regexp_like(column: &Column, pattern: &str) -> Column {
column.clone().regexp_like(pattern)
}
pub fn regexp_count(column: &Column, pattern: &str) -> Column {
column.clone().regexp_count(pattern)
}
pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
column.clone().regexp_substr(pattern)
}
pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
column.clone().split_part(delimiter, part_num)
}
pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
column.clone().regexp_instr(pattern, group_idx)
}
pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
str_column.clone().find_in_set(set_column)
}
pub fn format_string(format: &str, columns: &[&Column]) -> Column {
use polars::prelude::*;
if columns.is_empty() {
panic!("format_string needs at least one column");
}
let format_owned = format.to_string();
let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
let expr = columns[0].expr().clone().map_many(
move |cols| crate::udfs::apply_format_string(cols, &format_owned),
&args,
GetOutput::from_type(DataType::String),
);
crate::column::Column::from_expr(expr, None)
}
pub fn printf(format: &str, columns: &[&Column]) -> Column {
format_string(format, columns)
}
pub fn repeat(column: &Column, n: i32) -> Column {
column.clone().repeat(n)
}
pub fn reverse(column: &Column) -> Column {
column.clone().reverse()
}
pub fn instr(column: &Column, substr: &str) -> Column {
column.clone().instr(substr)
}
pub fn position(substr: &str, column: &Column) -> Column {
column.clone().instr(substr)
}
pub fn ascii(column: &Column) -> Column {
column.clone().ascii()
}
pub fn format_number(column: &Column, decimals: u32) -> Column {
column.clone().format_number(decimals)
}
pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
column.clone().overlay(replace, pos, length)
}
pub fn char(column: &Column) -> Column {
column.clone().char()
}
pub fn chr(column: &Column) -> Column {
column.clone().chr()
}
pub fn base64(column: &Column) -> Column {
column.clone().base64()
}
pub fn unbase64(column: &Column) -> Column {
column.clone().unbase64()
}
pub fn sha1(column: &Column) -> Column {
column.clone().sha1()
}
pub fn sha2(column: &Column, bit_length: i32) -> Column {
column.clone().sha2(bit_length)
}
pub fn md5(column: &Column) -> Column {
column.clone().md5()
}
pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
column.clone().lpad(length, pad)
}
pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
column.clone().rpad(length, pad)
}
pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
column.clone().translate(from_str, to_str)
}
pub fn mask(
column: &Column,
upper_char: Option<char>,
lower_char: Option<char>,
digit_char: Option<char>,
other_char: Option<char>,
) -> Column {
column
.clone()
.mask(upper_char, lower_char, digit_char, other_char)
}
pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
column.clone().substring_index(delimiter, count)
}
pub fn left(column: &Column, n: i64) -> Column {
column.clone().left(n)
}
pub fn right(column: &Column, n: i64) -> Column {
column.clone().right(n)
}
pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
column.clone().replace(search, replacement)
}
pub fn startswith(column: &Column, prefix: &str) -> Column {
column.clone().startswith(prefix)
}
pub fn endswith(column: &Column, suffix: &str) -> Column {
column.clone().endswith(suffix)
}
pub fn contains(column: &Column, substring: &str) -> Column {
column.clone().contains(substring)
}
pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
column.clone().like(pattern, escape_char)
}
pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
column.clone().ilike(pattern, escape_char)
}
pub fn rlike(column: &Column, pattern: &str) -> Column {
column.clone().regexp_like(pattern)
}
pub fn regexp(column: &Column, pattern: &str) -> Column {
rlike(column, pattern)
}
pub fn soundex(column: &Column) -> Column {
column.clone().soundex()
}
pub fn levenshtein(column: &Column, other: &Column) -> Column {
column.clone().levenshtein(other)
}
pub fn crc32(column: &Column) -> Column {
column.clone().crc32()
}
pub fn xxhash64(column: &Column) -> Column {
column.clone().xxhash64()
}
pub fn abs(column: &Column) -> Column {
column.clone().abs()
}
pub fn ceil(column: &Column) -> Column {
column.clone().ceil()
}
pub fn floor(column: &Column) -> Column {
column.clone().floor()
}
pub fn round(column: &Column, decimals: u32) -> Column {
column.clone().round(decimals)
}
pub fn bround(column: &Column, scale: i32) -> Column {
column.clone().bround(scale)
}
pub fn negate(column: &Column) -> Column {
column.clone().negate()
}
pub fn negative(column: &Column) -> Column {
negate(column)
}
pub fn positive(column: &Column) -> Column {
column.clone()
}
pub fn cot(column: &Column) -> Column {
column.clone().cot()
}
pub fn csc(column: &Column) -> Column {
column.clone().csc()
}
pub fn sec(column: &Column) -> Column {
column.clone().sec()
}
pub fn e() -> Column {
Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
}
pub fn pi() -> Column {
Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
}
pub fn sqrt(column: &Column) -> Column {
column.clone().sqrt()
}
pub fn pow(column: &Column, exp: i64) -> Column {
column.clone().pow(exp)
}
pub fn exp(column: &Column) -> Column {
column.clone().exp()
}
pub fn log(column: &Column) -> Column {
column.clone().log()
}
pub fn log_with_base(column: &Column, base: f64) -> Column {
crate::column::Column::from_expr(column.expr().clone().log(base), None)
}
pub fn sin(column: &Column) -> Column {
column.clone().sin()
}
pub fn cos(column: &Column) -> Column {
column.clone().cos()
}
pub fn tan(column: &Column) -> Column {
column.clone().tan()
}
pub fn asin(column: &Column) -> Column {
column.clone().asin()
}
pub fn acos(column: &Column) -> Column {
column.clone().acos()
}
pub fn atan(column: &Column) -> Column {
column.clone().atan()
}
pub fn atan2(y: &Column, x: &Column) -> Column {
y.clone().atan2(x)
}
pub fn degrees(column: &Column) -> Column {
column.clone().degrees()
}
pub fn radians(column: &Column) -> Column {
column.clone().radians()
}
pub fn signum(column: &Column) -> Column {
column.clone().signum()
}
pub fn sign(column: &Column) -> Column {
signum(column)
}
pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
let dtype = parse_type_name(type_name)?;
if dtype == DataType::Boolean {
use polars::prelude::GetOutput;
let expr = column.expr().clone().map(
|col| crate::udfs::apply_string_to_boolean(col, true),
GetOutput::from_type(DataType::Boolean),
);
return Ok(Column::from_expr(expr, None));
}
if dtype == DataType::Date {
use polars::prelude::GetOutput;
let expr = column.expr().clone().map(
|col| crate::udfs::apply_string_to_date(col, true),
GetOutput::from_type(DataType::Date),
);
return Ok(Column::from_expr(expr, None));
}
if dtype == DataType::Int32 || dtype == DataType::Int64 {
use polars::prelude::GetOutput;
let target = dtype.clone();
let expr = column.expr().clone().map(
move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
GetOutput::from_type(dtype),
);
return Ok(Column::from_expr(expr, None));
}
if dtype == DataType::Float64 {
use polars::prelude::GetOutput;
let expr = column.expr().clone().map(
|col| crate::udfs::apply_string_to_double(col, true),
GetOutput::from_type(DataType::Float64),
);
return Ok(Column::from_expr(expr, None));
}
Ok(Column::from_expr(
column.expr().clone().strict_cast(dtype),
None,
))
}
pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
let dtype = parse_type_name(type_name)?;
if dtype == DataType::Boolean {
use polars::prelude::GetOutput;
let expr = column.expr().clone().map(
|col| crate::udfs::apply_string_to_boolean(col, false),
GetOutput::from_type(DataType::Boolean),
);
return Ok(Column::from_expr(expr, None));
}
if dtype == DataType::Date {
use polars::prelude::GetOutput;
let expr = column.expr().clone().map(
|col| crate::udfs::apply_string_to_date(col, false),
GetOutput::from_type(DataType::Date),
);
return Ok(Column::from_expr(expr, None));
}
if dtype == DataType::Int32 || dtype == DataType::Int64 {
use polars::prelude::GetOutput;
let target = dtype.clone();
let expr = column.expr().clone().map(
move |col| crate::udfs::apply_string_to_int(col, false, target.clone()),
GetOutput::from_type(dtype),
);
return Ok(Column::from_expr(expr, None));
}
if dtype == DataType::Float64 {
use polars::prelude::GetOutput;
let expr = column.expr().clone().map(
|col| crate::udfs::apply_string_to_double(col, false),
GetOutput::from_type(DataType::Float64),
);
return Ok(Column::from_expr(expr, None));
}
Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
}
pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
match format {
Some(fmt) => Ok(column
.clone()
.date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
None => cast(column, "string"),
}
}
pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
to_char(column, format)
}
pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
cast(column, "double")
}
pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
try_cast(column, "double")
}
pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
use polars::prelude::{DataType, GetOutput, TimeUnit};
let fmt_owned = format.map(|s| s.to_string());
let expr = column.expr().clone().map(
move |s| crate::udfs::apply_to_timestamp_format(s, fmt_owned.as_deref(), true),
GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
);
Ok(crate::column::Column::from_expr(expr, None))
}
pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
use polars::prelude::*;
let fmt_owned = format.map(|s| s.to_string());
let expr = column.expr().clone().map(
move |s| crate::udfs::apply_to_timestamp_format(s, fmt_owned.as_deref(), false),
GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
);
Ok(crate::column::Column::from_expr(expr, None))
}
pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
use polars::prelude::{DataType, GetOutput, TimeUnit};
match format {
None => crate::cast(column, "timestamp"),
Some(fmt) => {
let fmt_owned = fmt.to_string();
let expr = column.expr().clone().map(
move |s| crate::udfs::apply_to_timestamp_ltz_format(s, Some(&fmt_owned), true),
GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
);
Ok(crate::column::Column::from_expr(expr, None))
}
}
}
pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
use polars::prelude::{DataType, GetOutput, TimeUnit};
match format {
None => crate::cast(column, "timestamp"),
Some(fmt) => {
let fmt_owned = fmt.to_string();
let expr = column.expr().clone().map(
move |s| crate::udfs::apply_to_timestamp_ntz_format(s, Some(&fmt_owned), true),
GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
);
Ok(crate::column::Column::from_expr(expr, None))
}
}
}
pub fn try_divide(left: &Column, right: &Column) -> Column {
use polars::prelude::*;
let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
let null_expr = Expr::Literal(LiteralValue::Null);
let div_expr =
left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
let expr = polars::prelude::when(zero_cond)
.then(null_expr)
.otherwise(div_expr);
crate::column::Column::from_expr(expr, None)
}
pub fn try_add(left: &Column, right: &Column) -> Column {
let args = [right.expr().clone()];
let expr =
left.expr()
.clone()
.map_many(crate::udfs::apply_try_add, &args, GetOutput::same_type());
Column::from_expr(expr, None)
}
pub fn try_subtract(left: &Column, right: &Column) -> Column {
let args = [right.expr().clone()];
let expr = left.expr().clone().map_many(
crate::udfs::apply_try_subtract,
&args,
GetOutput::same_type(),
);
Column::from_expr(expr, None)
}
pub fn try_multiply(left: &Column, right: &Column) -> Column {
let args = [right.expr().clone()];
let expr = left.expr().clone().map_many(
crate::udfs::apply_try_multiply,
&args,
GetOutput::same_type(),
);
Column::from_expr(expr, None)
}
pub fn try_element_at(column: &Column, index: i64) -> Column {
column.clone().element_at(index)
}
pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
if num_bucket <= 0 {
panic!(
"width_bucket: num_bucket must be positive, got {}",
num_bucket
);
}
use polars::prelude::*;
let v = value.expr().clone().cast(DataType::Float64);
let min_expr = lit(min_val);
let max_expr = lit(max_val);
let nb = num_bucket as f64;
let width = (max_val - min_val) / nb;
let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
let expr = polars::prelude::when(v.clone().lt(min_expr))
.then(lit(0i64))
.when(v.gt_eq(max_expr))
.then(lit(num_bucket + 1))
.otherwise(bucket_clamped);
crate::column::Column::from_expr(expr, None)
}
pub fn elt(index: &Column, columns: &[&Column]) -> Column {
use polars::prelude::*;
if columns.is_empty() {
panic!("elt requires at least one column");
}
let idx_expr = index.expr().clone();
let null_expr = Expr::Literal(LiteralValue::Null);
let mut expr = null_expr;
for (i, c) in columns.iter().enumerate().rev() {
let n = (i + 1) as i64;
expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
.then(c.expr().clone())
.otherwise(expr);
}
crate::column::Column::from_expr(expr, None)
}
pub fn bit_length(column: &Column) -> Column {
column.clone().bit_length()
}
pub fn octet_length(column: &Column) -> Column {
column.clone().octet_length()
}
pub fn char_length(column: &Column) -> Column {
column.clone().char_length()
}
pub fn character_length(column: &Column) -> Column {
column.clone().character_length()
}
pub fn typeof_(column: &Column) -> Column {
column.clone().typeof_()
}
pub fn isnan(column: &Column) -> Column {
column.clone().is_nan()
}
pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
if columns.is_empty() {
return Err("greatest requires at least one column".to_string());
}
if columns.len() == 1 {
return Ok((*columns[0]).clone());
}
let mut expr = columns[0].expr().clone();
for c in columns.iter().skip(1) {
let args = [c.expr().clone()];
expr = expr.map_many(crate::udfs::apply_greatest2, &args, GetOutput::same_type());
}
Ok(Column::from_expr(expr, None))
}
pub fn least(columns: &[&Column]) -> Result<Column, String> {
if columns.is_empty() {
return Err("least requires at least one column".to_string());
}
if columns.len() == 1 {
return Ok((*columns[0]).clone());
}
let mut expr = columns[0].expr().clone();
for c in columns.iter().skip(1) {
let args = [c.expr().clone()];
expr = expr.map_many(crate::udfs::apply_least2, &args, GetOutput::same_type());
}
Ok(Column::from_expr(expr, None))
}
pub fn year(column: &Column) -> Column {
column.clone().year()
}
pub fn month(column: &Column) -> Column {
column.clone().month()
}
pub fn day(column: &Column) -> Column {
column.clone().day()
}
pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
use polars::prelude::GetOutput;
let fmt = format.map(|s| s.to_string());
let expr = column.expr().clone().map(
move |col| crate::udfs::apply_string_to_date_format(col, fmt.as_deref(), false),
GetOutput::from_type(DataType::Date),
);
Ok(Column::from_expr(expr, None))
}
pub fn date_format(column: &Column, format: &str) -> Column {
column
.clone()
.date_format(&crate::udfs::pyspark_format_to_chrono(format))
}
pub fn current_date() -> Column {
use polars::prelude::*;
let today = chrono::Utc::now().date_naive();
let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
crate::column::Column::from_expr(Expr::Literal(LiteralValue::Date(days)), None)
}
pub fn current_timestamp() -> Column {
use polars::prelude::*;
let ts = chrono::Utc::now().timestamp_micros();
crate::column::Column::from_expr(
Expr::Literal(LiteralValue::DateTime(ts, TimeUnit::Microseconds, None)),
None,
)
}
pub fn curdate() -> Column {
current_date()
}
pub fn now() -> Column {
current_timestamp()
}
pub fn localtimestamp() -> Column {
current_timestamp()
}
pub fn date_diff(end: &Column, start: &Column) -> Column {
datediff(end, start)
}
pub fn dateadd(column: &Column, n: i32) -> Column {
date_add(column, n)
}
pub fn extract(column: &Column, field: &str) -> Column {
column.clone().extract(field)
}
pub fn date_part(column: &Column, field: &str) -> Column {
extract(column, field)
}
pub fn datepart(column: &Column, field: &str) -> Column {
extract(column, field)
}
pub fn unix_micros(column: &Column) -> Column {
column.clone().unix_micros()
}
pub fn unix_millis(column: &Column) -> Column {
column.clone().unix_millis()
}
pub fn unix_seconds(column: &Column) -> Column {
column.clone().unix_seconds()
}
pub fn dayname(column: &Column) -> Column {
column.clone().dayname()
}
pub fn weekday(column: &Column) -> Column {
column.clone().weekday()
}
pub fn hour(column: &Column) -> Column {
column.clone().hour()
}
pub fn minute(column: &Column) -> Column {
column.clone().minute()
}
pub fn second(column: &Column) -> Column {
column.clone().second()
}
pub fn date_add(column: &Column, n: i32) -> Column {
column.clone().date_add(n)
}
pub fn date_sub(column: &Column, n: i32) -> Column {
column.clone().date_sub(n)
}
pub fn datediff(end: &Column, start: &Column) -> Column {
start.clone().datediff(end)
}
pub fn last_day(column: &Column) -> Column {
column.clone().last_day()
}
pub fn trunc(column: &Column, format: &str) -> Column {
column.clone().trunc(format)
}
pub fn date_trunc(format: &str, column: &Column) -> Column {
trunc(column, format)
}
pub fn quarter(column: &Column) -> Column {
column.clone().quarter()
}
pub fn weekofyear(column: &Column) -> Column {
column.clone().weekofyear()
}
pub fn dayofweek(column: &Column) -> Column {
column.clone().dayofweek()
}
pub fn dayofyear(column: &Column) -> Column {
column.clone().dayofyear()
}
pub fn add_months(column: &Column, n: i32) -> Column {
column.clone().add_months(n)
}
pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
end.clone().months_between(start, round_off)
}
pub fn next_day(column: &Column, day_of_week: &str) -> Column {
column.clone().next_day(day_of_week)
}
pub fn unix_timestamp_now() -> Column {
use polars::prelude::*;
let secs = chrono::Utc::now().timestamp();
crate::column::Column::from_expr(lit(secs), None)
}
pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
column.clone().unix_timestamp(format)
}
pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
unix_timestamp(column, format)
}
pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
column.clone().from_unixtime(format)
}
pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
use polars::prelude::*;
let args = [month.expr().clone(), day.expr().clone()];
let expr = year.expr().clone().map_many(
crate::udfs::apply_make_date,
&args,
GetOutput::from_type(DataType::Date),
);
crate::column::Column::from_expr(expr, None)
}
pub fn make_timestamp(
year: &Column,
month: &Column,
day: &Column,
hour: &Column,
minute: &Column,
sec: &Column,
timezone: Option<&str>,
) -> Column {
use polars::prelude::*;
let tz_owned = timezone.map(|s| s.to_string());
let args = [
month.expr().clone(),
day.expr().clone(),
hour.expr().clone(),
minute.expr().clone(),
sec.expr().clone(),
];
let expr = year.expr().clone().map_many(
move |cols| crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()),
&args,
GetOutput::from_type(DataType::Datetime(TimeUnit::Microseconds, None)),
);
crate::column::Column::from_expr(expr, None)
}
pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
ts.clone().timestampadd(unit, amount)
}
pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
start.clone().timestampdiff(unit, end)
}
pub fn days(n: i64) -> Column {
make_interval(0, 0, 0, n, 0, 0, 0)
}
pub fn hours(n: i64) -> Column {
make_interval(0, 0, 0, 0, n, 0, 0)
}
pub fn minutes(n: i64) -> Column {
make_interval(0, 0, 0, 0, 0, n, 0)
}
pub fn months(n: i64) -> Column {
make_interval(0, n, 0, 0, 0, 0, 0)
}
pub fn years(n: i64) -> Column {
make_interval(n, 0, 0, 0, 0, 0, 0)
}
pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
column.clone().from_utc_timestamp(tz)
}
pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
column.clone().to_utc_timestamp(tz)
}
pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
let source_tz = source_tz.to_string();
let target_tz = target_tz.to_string();
let expr = column.expr().clone().map(
move |s| crate::udfs::apply_convert_timezone(s, &source_tz, &target_tz),
GetOutput::same_type(),
);
crate::column::Column::from_expr(expr, None)
}
pub fn current_timezone() -> Column {
use polars::prelude::*;
crate::column::Column::from_expr(lit("UTC"), None)
}
pub fn make_interval(
years: i64,
months: i64,
weeks: i64,
days: i64,
hours: i64,
mins: i64,
secs: i64,
) -> Column {
use polars::prelude::*;
let total_days = years * 365 + months * 30 + weeks * 7 + days;
let args = DurationArgs::new()
.with_days(lit(total_days))
.with_hours(lit(hours))
.with_minutes(lit(mins))
.with_seconds(lit(secs));
let dur = duration(args);
crate::column::Column::from_expr(dur, None)
}
pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
use polars::prelude::*;
let args = DurationArgs::new()
.with_days(lit(days))
.with_hours(lit(hours))
.with_minutes(lit(minutes))
.with_seconds(lit(seconds));
let dur = duration(args);
crate::column::Column::from_expr(dur, None)
}
pub fn make_ym_interval(years: i32, months: i32) -> Column {
use polars::prelude::*;
let total_months = years * 12 + months;
crate::column::Column::from_expr(lit(total_months), None)
}
pub fn make_timestamp_ntz(
year: &Column,
month: &Column,
day: &Column,
hour: &Column,
minute: &Column,
sec: &Column,
) -> Column {
make_timestamp(year, month, day, hour, minute, sec, None)
}
pub fn timestamp_seconds(column: &Column) -> Column {
column.clone().timestamp_seconds()
}
pub fn timestamp_millis(column: &Column) -> Column {
column.clone().timestamp_millis()
}
pub fn timestamp_micros(column: &Column) -> Column {
column.clone().timestamp_micros()
}
pub fn unix_date(column: &Column) -> Column {
column.clone().unix_date()
}
pub fn date_from_unix_date(column: &Column) -> Column {
column.clone().date_from_unix_date()
}
pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
dividend.clone().pmod(divisor)
}
pub fn factorial(column: &Column) -> Column {
column.clone().factorial()
}
pub fn concat(columns: &[&Column]) -> Column {
use polars::prelude::*;
if columns.is_empty() {
panic!("concat requires at least one column");
}
let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
}
pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
use polars::prelude::*;
if columns.is_empty() {
panic!("concat_ws requires at least one column");
}
let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
}
pub fn row_number(column: &Column) -> Column {
column.clone().row_number(false)
}
pub fn rank(column: &Column, descending: bool) -> Column {
column.clone().rank(descending)
}
pub fn dense_rank(column: &Column, descending: bool) -> Column {
column.clone().dense_rank(descending)
}
pub fn lag(column: &Column, n: i64) -> Column {
column.clone().lag(n)
}
pub fn lead(column: &Column, n: i64) -> Column {
column.clone().lead(n)
}
pub fn first_value(column: &Column) -> Column {
column.clone().first_value()
}
pub fn last_value(column: &Column) -> Column {
column.clone().last_value()
}
pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
column.clone().percent_rank(partition_by, descending)
}
pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
column.clone().cume_dist(partition_by, descending)
}
pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
column.clone().ntile(n, partition_by, descending)
}
pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
column.clone().nth_value(n, partition_by, descending)
}
pub fn coalesce(columns: &[&Column]) -> Column {
use polars::prelude::*;
if columns.is_empty() {
panic!("coalesce requires at least one column");
}
let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
let expr = coalesce(&exprs);
crate::column::Column::from_expr(expr, None)
}
pub fn nvl(column: &Column, value: &Column) -> Column {
coalesce(&[column, value])
}
pub fn ifnull(column: &Column, value: &Column) -> Column {
nvl(column, value)
}
pub fn nullif(column: &Column, value: &Column) -> Column {
use polars::prelude::*;
let cond = column.expr().clone().eq(value.expr().clone());
let null_lit = Expr::Literal(LiteralValue::Null);
let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
crate::column::Column::from_expr(expr, None)
}
pub fn nanvl(column: &Column, value: &Column) -> Column {
use polars::prelude::*;
let cond = column.expr().clone().is_nan();
let expr = when(cond)
.then(value.expr().clone())
.otherwise(column.expr().clone());
crate::column::Column::from_expr(expr, None)
}
pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
use polars::prelude::*;
let cond = col1.expr().clone().is_not_null();
let expr = when(cond)
.then(col2.expr().clone())
.otherwise(col3.expr().clone());
crate::column::Column::from_expr(expr, None)
}
pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
substring(column, start, length)
}
pub fn power(column: &Column, exp: i64) -> Column {
pow(column, exp)
}
pub fn ln(column: &Column) -> Column {
log(column)
}
pub fn ceiling(column: &Column) -> Column {
ceil(column)
}
pub fn lcase(column: &Column) -> Column {
lower(column)
}
pub fn ucase(column: &Column) -> Column {
upper(column)
}
pub fn dayofmonth(column: &Column) -> Column {
day(column)
}
pub fn to_degrees(column: &Column) -> Column {
degrees(column)
}
pub fn to_radians(column: &Column) -> Column {
radians(column)
}
pub fn cosh(column: &Column) -> Column {
column.clone().cosh()
}
pub fn sinh(column: &Column) -> Column {
column.clone().sinh()
}
pub fn tanh(column: &Column) -> Column {
column.clone().tanh()
}
pub fn acosh(column: &Column) -> Column {
column.clone().acosh()
}
pub fn asinh(column: &Column) -> Column {
column.clone().asinh()
}
pub fn atanh(column: &Column) -> Column {
column.clone().atanh()
}
pub fn cbrt(column: &Column) -> Column {
column.clone().cbrt()
}
pub fn expm1(column: &Column) -> Column {
column.clone().expm1()
}
pub fn log1p(column: &Column) -> Column {
column.clone().log1p()
}
pub fn log10(column: &Column) -> Column {
column.clone().log10()
}
pub fn log2(column: &Column) -> Column {
column.clone().log2()
}
pub fn rint(column: &Column) -> Column {
column.clone().rint()
}
pub fn hypot(x: &Column, y: &Column) -> Column {
let xx = x.expr().clone() * x.expr().clone();
let yy = y.expr().clone() * y.expr().clone();
crate::column::Column::from_expr((xx + yy).sqrt(), None)
}
pub fn isnull(column: &Column) -> Column {
column.clone().is_null()
}
pub fn isnotnull(column: &Column) -> Column {
column.clone().is_not_null()
}
pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
use polars::prelude::*;
if columns.is_empty() {
let empty_inner = Series::new("".into(), Vec::<i64>::new());
let list_series = ListChunked::from_iter([Some(empty_inner)])
.with_name("array".into())
.into_series();
let expr = lit(list_series).first();
return Ok(crate::column::Column::from_expr(expr, None));
}
let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
let expr = concat_list(exprs)
.map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
Ok(crate::column::Column::from_expr(expr, None))
}
pub fn array_size(column: &Column) -> Column {
column.clone().array_size()
}
pub fn size(column: &Column) -> Column {
column.clone().array_size()
}
pub fn cardinality(column: &Column) -> Column {
column.clone().cardinality()
}
pub fn array_contains(column: &Column, value: &Column) -> Column {
column.clone().array_contains(value.expr().clone())
}
pub fn array_join(column: &Column, separator: &str) -> Column {
column.clone().array_join(separator)
}
pub fn array_max(column: &Column) -> Column {
column.clone().array_max()
}
pub fn array_min(column: &Column) -> Column {
column.clone().array_min()
}
pub fn element_at(column: &Column, index: i64) -> Column {
column.clone().element_at(index)
}
pub fn array_sort(column: &Column) -> Column {
column.clone().array_sort()
}
pub fn array_distinct(column: &Column) -> Column {
column.clone().array_distinct()
}
pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
column.clone().array_slice(start, length)
}
pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
use polars::prelude::{as_struct, lit, DataType, GetOutput};
let step_expr = step
.map(|c| c.expr().clone().alias("2"))
.unwrap_or_else(|| lit(1i64).alias("2"));
let struct_expr = as_struct(vec![
start.expr().clone().alias("0"),
stop.expr().clone().alias("1"),
step_expr,
]);
let out_dtype = DataType::List(Box::new(DataType::Int64));
let expr = struct_expr.map(crate::udfs::apply_sequence, GetOutput::from_type(out_dtype));
crate::column::Column::from_expr(expr, None)
}
pub fn shuffle(column: &Column) -> Column {
use polars::prelude::GetOutput;
let expr = column
.expr()
.clone()
.map(crate::udfs::apply_shuffle, GetOutput::same_type());
crate::column::Column::from_expr(expr, None)
}
pub fn inline(column: &Column) -> Column {
column.clone().explode()
}
pub fn inline_outer(column: &Column) -> Column {
column.clone().explode_outer()
}
pub fn explode(column: &Column) -> Column {
column.clone().explode()
}
pub fn array_position(column: &Column, value: &Column) -> Column {
column.clone().array_position(value.expr().clone())
}
pub fn array_compact(column: &Column) -> Column {
column.clone().array_compact()
}
pub fn array_remove(column: &Column, value: &Column) -> Column {
column.clone().array_remove(value.expr().clone())
}
pub fn array_repeat(column: &Column, n: i64) -> Column {
column.clone().array_repeat(n)
}
pub fn array_flatten(column: &Column) -> Column {
column.clone().array_flatten()
}
pub fn array_exists(column: &Column, predicate: Expr) -> Column {
column.clone().array_exists(predicate)
}
pub fn array_forall(column: &Column, predicate: Expr) -> Column {
column.clone().array_forall(predicate)
}
pub fn array_filter(column: &Column, predicate: Expr) -> Column {
column.clone().array_filter(predicate)
}
pub fn array_transform(column: &Column, f: Expr) -> Column {
column.clone().array_transform(f)
}
pub fn array_sum(column: &Column) -> Column {
column.clone().array_sum()
}
pub fn aggregate(column: &Column, zero: &Column) -> Column {
column.clone().array_aggregate(zero)
}
pub fn array_mean(column: &Column) -> Column {
column.clone().array_mean()
}
pub fn posexplode(column: &Column) -> (Column, Column) {
column.clone().posexplode()
}
pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
use polars::chunked_array::StructChunked;
use polars::prelude::{as_struct, concat_list, lit, IntoSeries, ListChunked};
if key_values.is_empty() {
let key_s = Series::new("key".into(), Vec::<String>::new());
let value_s = Series::new("value".into(), Vec::<String>::new());
let fields: [&Series; 2] = [&key_s, &value_s];
let empty_struct = StructChunked::from_series(
polars::prelude::PlSmallStr::EMPTY,
0,
fields.iter().copied(),
)
.map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
.into_series();
let list_series = ListChunked::from_iter([Some(empty_struct)])
.with_name("create_map".into())
.into_series();
let expr = lit(list_series).first();
return Ok(crate::column::Column::from_expr(expr, None));
}
let mut struct_exprs: Vec<Expr> = Vec::new();
for i in (0..key_values.len()).step_by(2) {
if i + 1 < key_values.len() {
let k = key_values[i].expr().clone().alias("key");
let v = key_values[i + 1].expr().clone().alias("value");
struct_exprs.push(as_struct(vec![k, v]));
}
}
let expr = concat_list(struct_exprs)
.map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
Ok(crate::column::Column::from_expr(expr, None))
}
pub fn map_keys(column: &Column) -> Column {
column.clone().map_keys()
}
pub fn map_values(column: &Column) -> Column {
column.clone().map_values()
}
pub fn map_entries(column: &Column) -> Column {
column.clone().map_entries()
}
pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
keys.clone().map_from_arrays(values)
}
pub fn map_concat(a: &Column, b: &Column) -> Column {
a.clone().map_concat(b)
}
pub fn map_from_entries(column: &Column) -> Column {
column.clone().map_from_entries()
}
pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
map_col.clone().map_contains_key(key)
}
pub fn get(map_col: &Column, key: &Column) -> Column {
map_col.clone().get(key)
}
pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
map_col.clone().map_filter(predicate)
}
pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
map1.clone().map_zip_with(map2, merge)
}
pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
use polars::prelude::col;
let left_field = col("").struct_().field_by_name("left");
let right_field = col("").struct_().field_by_name("right");
let merge = crate::column::Column::from_expr(
coalesce(&[
&crate::column::Column::from_expr(left_field, None),
&crate::column::Column::from_expr(right_field, None),
])
.into_expr(),
None,
);
left.clone().zip_with(right, merge.into_expr())
}
pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
use polars::prelude::col;
let v1 = col("").struct_().field_by_name("value1");
let v2 = col("").struct_().field_by_name("value2");
let merge = coalesce(&[
&crate::column::Column::from_expr(v1, None),
&crate::column::Column::from_expr(v2, None),
])
.into_expr();
map1.clone().map_zip_with(map2, merge)
}
pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
use polars::prelude::{col, lit};
let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
map_col.clone().map_filter(pred)
}
pub fn struct_(columns: &[&Column]) -> Column {
use polars::prelude::as_struct;
if columns.is_empty() {
panic!("struct requires at least one column");
}
let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
crate::column::Column::from_expr(as_struct(exprs), None)
}
pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
use polars::prelude::as_struct;
if pairs.is_empty() {
panic!("named_struct requires at least one (name, column) pair");
}
let exprs: Vec<Expr> = pairs
.iter()
.map(|(name, col)| col.expr().clone().alias(*name))
.collect();
crate::column::Column::from_expr(as_struct(exprs), None)
}
pub fn array_append(array: &Column, elem: &Column) -> Column {
array.clone().array_append(elem)
}
pub fn array_prepend(array: &Column, elem: &Column) -> Column {
array.clone().array_prepend(elem)
}
pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
array.clone().array_insert(pos, elem)
}
pub fn array_except(a: &Column, b: &Column) -> Column {
a.clone().array_except(b)
}
pub fn array_intersect(a: &Column, b: &Column) -> Column {
a.clone().array_intersect(b)
}
pub fn array_union(a: &Column, b: &Column) -> Column {
a.clone().array_union(b)
}
pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
left.clone().zip_with(right, merge)
}
pub fn get_json_object(column: &Column, path: &str) -> Column {
column.clone().get_json_object(path)
}
pub fn json_object_keys(column: &Column) -> Column {
column.clone().json_object_keys()
}
pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
column.clone().json_tuple(keys)
}
pub fn from_csv(column: &Column) -> Column {
column.clone().from_csv()
}
pub fn to_csv(column: &Column) -> Column {
column.clone().to_csv()
}
pub fn schema_of_csv(_column: &Column) -> Column {
Column::from_expr(
lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
Some("schema_of_csv".to_string()),
)
}
pub fn schema_of_json(_column: &Column) -> Column {
Column::from_expr(
lit("STRUCT<>".to_string()),
Some("schema_of_json".to_string()),
)
}
pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
column.clone().from_json(schema)
}
pub fn to_json(column: &Column) -> Column {
column.clone().to_json()
}
pub fn isin(column: &Column, other: &Column) -> Column {
column.clone().isin(other)
}
pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
let s = Series::from_iter(values.iter().cloned());
Column::from_expr(column.expr().clone().is_in(lit(s)), None)
}
pub fn isin_str(column: &Column, values: &[&str]) -> Column {
let s: Series = Series::from_iter(values.iter().copied());
Column::from_expr(column.expr().clone().is_in(lit(s)), None)
}
pub fn url_decode(column: &Column) -> Column {
column.clone().url_decode()
}
pub fn url_encode(column: &Column) -> Column {
column.clone().url_encode()
}
pub fn shift_left(column: &Column, n: i32) -> Column {
column.clone().shift_left(n)
}
pub fn shift_right(column: &Column, n: i32) -> Column {
column.clone().shift_right(n)
}
pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
column.clone().shift_right_unsigned(n)
}
pub fn version() -> Column {
Column::from_expr(
lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
None,
)
}
pub fn equal_null(left: &Column, right: &Column) -> Column {
left.clone().eq_null_safe(right)
}
pub fn json_array_length(column: &Column, path: &str) -> Column {
column.clone().json_array_length(path)
}
pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
column.clone().parse_url(part, key)
}
pub fn hash(columns: &[&Column]) -> Column {
use polars::prelude::*;
if columns.is_empty() {
return crate::column::Column::from_expr(lit(0i64), None);
}
if columns.len() == 1 {
return columns[0].clone().hash();
}
let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
let struct_expr = polars::prelude::as_struct(exprs);
let name = columns[0].name().to_string();
let expr = struct_expr.map(
crate::udfs::apply_hash_struct,
GetOutput::from_type(DataType::Int64),
);
crate::column::Column::from_expr(expr, Some(name))
}
pub fn stack(columns: &[&Column]) -> Column {
struct_(columns)
}
#[cfg(test)]
mod tests {
use super::*;
use polars::prelude::{df, IntoLazy};
#[test]
fn test_col_creates_column() {
let column = col("test");
assert_eq!(column.name(), "test");
}
#[test]
fn test_lit_i32() {
let column = lit_i32(42);
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_lit_i64() {
let column = lit_i64(123456789012345i64);
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_lit_f64() {
let column = lit_f64(std::f64::consts::PI);
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_lit_bool() {
let column = lit_bool(true);
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_lit_str() {
let column = lit_str("hello");
assert_eq!(column.name(), "<expr>");
}
#[test]
fn test_create_map_empty() {
let empty_col = create_map(&[]).unwrap();
let df = df!("id" => &[1i64, 2i64]).unwrap();
let out = df
.lazy()
.with_columns([empty_col.into_expr().alias("m")])
.collect()
.unwrap();
assert_eq!(out.height(), 2);
let m = out.column("m").unwrap();
assert_eq!(m.len(), 2);
let list = m.list().unwrap();
for i in 0..2 {
let row = list.get(i).unwrap();
assert_eq!(row.len(), 0);
}
}
#[test]
fn test_count_aggregation() {
let column = col("value");
let result = count(&column);
assert_eq!(result.name(), "count");
}
#[test]
fn test_sum_aggregation() {
let column = col("value");
let result = sum(&column);
assert_eq!(result.name(), "sum");
}
#[test]
fn test_avg_aggregation() {
let column = col("value");
let result = avg(&column);
assert_eq!(result.name(), "avg");
}
#[test]
fn test_max_aggregation() {
let column = col("value");
let result = max(&column);
assert_eq!(result.name(), "max");
}
#[test]
fn test_min_aggregation() {
let column = col("value");
let result = min(&column);
assert_eq!(result.name(), "min");
}
#[test]
fn test_when_then_otherwise() {
let df = df!(
"age" => &[15, 25, 35]
)
.unwrap();
let age_col = col("age");
let condition = age_col.gt(polars::prelude::lit(18));
let result = when(&condition)
.then(&lit_str("adult"))
.otherwise(&lit_str("minor"));
let result_df = df
.lazy()
.with_column(result.into_expr().alias("status"))
.collect()
.unwrap();
let status_col = result_df.column("status").unwrap();
let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
assert_eq!(values[0], Some("minor")); assert_eq!(values[1], Some("adult")); assert_eq!(values[2], Some("adult")); }
#[test]
fn test_coalesce_returns_first_non_null() {
let df = df!(
"a" => &[Some(1), None, None],
"b" => &[None, Some(2), None],
"c" => &[None, None, Some(3)]
)
.unwrap();
let col_a = col("a");
let col_b = col("b");
let col_c = col("c");
let result = coalesce(&[&col_a, &col_b, &col_c]);
let result_df = df
.lazy()
.with_column(result.into_expr().alias("coalesced"))
.collect()
.unwrap();
let coalesced_col = result_df.column("coalesced").unwrap();
let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(2)); assert_eq!(values[2], Some(3)); }
#[test]
fn test_coalesce_with_literal_fallback() {
let df = df!(
"a" => &[Some(1), None],
"b" => &[None::<i32>, None::<i32>]
)
.unwrap();
let col_a = col("a");
let col_b = col("b");
let fallback = lit_i32(0);
let result = coalesce(&[&col_a, &col_b, &fallback]);
let result_df = df
.lazy()
.with_column(result.into_expr().alias("coalesced"))
.collect()
.unwrap();
let coalesced_col = result_df.column("coalesced").unwrap();
let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
assert_eq!(values[0], Some(1)); assert_eq!(values[1], Some(0)); }
#[test]
#[should_panic(expected = "coalesce requires at least one column")]
fn test_coalesce_empty_panics() {
let columns: [&Column; 0] = [];
let _ = coalesce(&columns);
}
#[test]
fn test_cast_double_string_column_strict_ok() {
let df = df!(
"s" => &["123", " 45.5 ", "0"]
)
.unwrap();
let s_col = col("s");
let cast_col = cast(&s_col, "double").unwrap();
let out = df
.lazy()
.with_column(cast_col.into_expr().alias("v"))
.collect()
.unwrap();
let v = out.column("v").unwrap();
let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
}
#[test]
fn test_try_cast_double_string_column_invalid_to_null() {
let df = df!(
"s" => &["123", " 45.5 ", "abc", ""]
)
.unwrap();
let s_col = col("s");
let try_cast_col = try_cast(&s_col, "double").unwrap();
let out = df
.lazy()
.with_column(try_cast_col.into_expr().alias("v"))
.collect()
.unwrap();
let v = out.column("v").unwrap();
let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
}
#[test]
fn test_to_number_and_try_to_number_numerics_and_strings() {
let df = df!(
"i" => &[1i32, 2, 3],
"f" => &[1.5f64, 2.5, 3.5],
"s" => &["10", "20.5", "xyz"]
)
.unwrap();
let i_col = col("i");
let f_col = col("f");
let s_col = col("s");
let to_number_i = to_number(&i_col, None).unwrap();
let to_number_f = to_number(&f_col, None).unwrap();
let try_to_number_s = try_to_number(&s_col, None).unwrap();
let out = df
.lazy()
.with_columns([
to_number_i.into_expr().alias("i_num"),
to_number_f.into_expr().alias("f_num"),
try_to_number_s.into_expr().alias("s_num"),
])
.collect()
.unwrap();
let i_num = out.column("i_num").unwrap();
let f_num = out.column("f_num").unwrap();
let s_num = out.column("s_num").unwrap();
let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
}
}