use super::DataFrame;
use crate::functions::SortOrder;
use polars::prelude::{
col, Expr, IntoLazy, IntoSeries, NamedFrom, PolarsError, Series, UnionArgs, UniqueKeepStrategy,
};
use std::collections::HashMap;
pub fn select(
df: &DataFrame,
cols: Vec<&str>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let resolved: Vec<String> = cols
.iter()
.map(|c| df.resolve_column_name(c))
.collect::<Result<Vec<_>, _>>()?;
let exprs: Vec<Expr> = resolved.iter().map(|s| col(s.as_str())).collect();
let lf = df.lazy_frame().select(&exprs);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn select_with_exprs(
df: &DataFrame,
exprs: Vec<Expr>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let exprs: Vec<Expr> = exprs
.into_iter()
.map(|e| df.resolve_expr_column_names(e))
.collect::<Result<Vec<_>, _>>()?;
let mut name_count: HashMap<String, u32> = HashMap::new();
let exprs: Vec<Expr> = exprs
.into_iter()
.map(|e| {
let base_name = polars_plan::utils::expr_output_name(&e)
.map(|s| s.to_string())
.unwrap_or_else(|_| "_".to_string());
let count = name_count.entry(base_name.clone()).or_insert(0);
*count += 1;
let final_name = if *count == 1 {
base_name
} else {
format!("{}_{}", base_name, *count - 1)
};
if *count == 1 {
e
} else {
e.alias(final_name.as_str())
}
})
.collect();
let lf = df.lazy_frame().select(&exprs);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn filter(
df: &DataFrame,
condition: Expr,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let condition = df.resolve_expr_column_names(condition)?;
let condition = df.coerce_string_numeric_comparisons(condition)?;
let lf = df.lazy_frame().filter(condition);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn with_column(
df: &DataFrame,
column_name: &str,
column: &crate::column::Column,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
if let Some(deferred) = column.deferred {
match deferred {
crate::column::DeferredRandom::Rand(seed) => {
let pl_df = df.collect_inner()?;
let mut pl_df = pl_df.as_ref().clone();
let n = pl_df.height();
let series = crate::udfs::series_rand_n(column_name, n, seed);
pl_df.with_column(series)?;
return Ok(super::DataFrame::from_polars_with_options(
pl_df,
case_sensitive,
));
}
crate::column::DeferredRandom::Randn(seed) => {
let pl_df = df.collect_inner()?;
let mut pl_df = pl_df.as_ref().clone();
let n = pl_df.height();
let series = crate::udfs::series_randn_n(column_name, n, seed);
pl_df.with_column(series)?;
return Ok(super::DataFrame::from_polars_with_options(
pl_df,
case_sensitive,
));
}
}
}
let expr = df.resolve_expr_column_names(column.expr().clone())?;
let expr = df.coerce_string_numeric_comparisons(expr)?;
let lf = df.lazy_frame().with_column(expr.alias(column_name));
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn order_by(
df: &DataFrame,
column_names: Vec<&str>,
ascending: Vec<bool>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let mut asc = ascending;
while asc.len() < column_names.len() {
asc.push(true);
}
asc.truncate(column_names.len());
let resolved: Vec<String> = column_names
.iter()
.map(|c| df.resolve_column_name(c))
.collect::<Result<Vec<_>, _>>()?;
let exprs: Vec<Expr> = resolved.iter().map(|s| col(s.as_str())).collect();
let descending: Vec<bool> = asc.iter().map(|&a| !a).collect();
let lf = df.lazy_frame().sort_by_exprs(
exprs,
SortMultipleOptions::new().with_order_descending_multi(descending),
);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn order_by_exprs(
df: &DataFrame,
sort_orders: Vec<SortOrder>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
if sort_orders.is_empty() {
return Ok(super::DataFrame::from_lazy_with_options(
df.lazy_frame(),
case_sensitive,
));
}
let exprs: Vec<Expr> = sort_orders
.iter()
.map(|s| df.resolve_expr_column_names(s.expr().clone()))
.collect::<Result<Vec<_>, _>>()?;
let descending: Vec<bool> = sort_orders.iter().map(|s| s.descending).collect();
let nulls_last: Vec<bool> = sort_orders.iter().map(|s| s.nulls_last).collect();
let opts = SortMultipleOptions::new()
.with_order_descending_multi(descending)
.with_nulls_last_multi(nulls_last);
let lf = df.lazy_frame().sort_by_exprs(exprs, opts);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn union(
left: &DataFrame,
right: &DataFrame,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let lf1 = left.lazy_frame();
let lf2 = right.lazy_frame();
let out = polars::prelude::concat([lf1, lf2], UnionArgs::default())?;
Ok(super::DataFrame::from_lazy_with_options(
out,
case_sensitive,
))
}
pub fn union_by_name(
left: &DataFrame,
right: &DataFrame,
allow_missing_columns: bool,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let left_names = left.columns()?;
let right_names = right.columns()?;
let contains = |names: &[String], name: &str| -> bool {
if case_sensitive {
names.iter().any(|n| n.as_str() == name)
} else {
let name_lower = name.to_lowercase();
names
.iter()
.any(|n| n.as_str().to_lowercase() == name_lower)
}
};
let resolve = |names: &[String], name: &str| -> Option<String> {
if case_sensitive {
names.iter().find(|n| n.as_str() == name).cloned()
} else {
let name_lower = name.to_lowercase();
names
.iter()
.find(|n| n.as_str().to_lowercase() == name_lower)
.cloned()
}
};
let all_columns: Vec<String> = if allow_missing_columns {
let mut out = left_names.clone();
for r in &right_names {
if !contains(&out, r.as_str()) {
out.push(r.clone());
}
}
out
} else {
left_names.clone()
};
let left_exprs: Vec<Expr> = all_columns
.iter()
.map(|c| match resolve(&left_names, c.as_str()) {
Some(r) => col(r.as_str()).alias(c.as_str()),
None => {
let dtype = right
.get_column_dtype(c)
.unwrap_or(polars::prelude::DataType::Null);
Expr::Literal(polars::prelude::LiteralValue::Null)
.cast(dtype)
.alias(c.as_str())
}
})
.collect();
let mut right_exprs: Vec<Expr> = Vec::with_capacity(all_columns.len());
for c in &all_columns {
let expr = match resolve(&right_names, c.as_str()) {
Some(r) => col(r.as_str()).alias(c.as_str()),
None if allow_missing_columns => {
let dtype = left
.get_column_dtype(c)
.unwrap_or(polars::prelude::DataType::Null);
Expr::Literal(polars::prelude::LiteralValue::Null)
.cast(dtype)
.alias(c.as_str())
}
None => {
return Err(PolarsError::InvalidOperation(
format!(
"union_by_name: column '{}' missing in right DataFrame (allow_missing_columns=False)",
c
)
.into(),
));
}
};
right_exprs.push(expr);
}
let lf1 = left.lazy_frame().select(&left_exprs);
let lf2 = right.lazy_frame().select(&right_exprs);
let out = polars::prelude::concat([lf1, lf2], UnionArgs::default())?;
Ok(super::DataFrame::from_lazy_with_options(
out,
case_sensitive,
))
}
pub fn distinct(
df: &DataFrame,
subset: Option<Vec<&str>>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let subset_names: Option<Vec<String>> = subset
.map(|cols| {
cols.iter()
.map(|s| df.resolve_column_name(s))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
let lf = df
.lazy_frame()
.unique(subset_names, UniqueKeepStrategy::First);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn drop(
df: &DataFrame,
columns: Vec<&str>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let resolved: Vec<String> = columns
.iter()
.map(|c| df.resolve_column_name(c))
.collect::<Result<Vec<_>, _>>()?;
let all_names = df.columns()?;
let to_keep: Vec<Expr> = all_names
.iter()
.filter(|n| !resolved.iter().any(|r| r == n.as_str()))
.map(|n| col(n.as_str()))
.collect();
let lf = df.lazy_frame().select(&to_keep);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn dropna(
df: &DataFrame,
subset: Option<Vec<&str>>,
how: &str,
thresh: Option<usize>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let cols: Vec<String> = match &subset {
Some(c) => c
.iter()
.map(|n| df.resolve_column_name(n))
.collect::<Result<Vec<_>, _>>()?,
None => df.columns()?,
};
let col_exprs: Vec<Expr> = cols.iter().map(|c| col(c.as_str())).collect();
let base_lf = df.lazy_frame();
let lf = if let Some(n) = thresh {
let count_expr: Expr = col_exprs
.iter()
.map(|e| e.clone().is_not_null().cast(DataType::Int32))
.fold(lit(0i32), |a, b| a + b);
base_lf.filter(count_expr.gt_eq(lit(n as i32)))
} else if how.eq_ignore_ascii_case("all") {
let any_not_null: Expr = col_exprs
.into_iter()
.map(|e| e.is_not_null())
.fold(lit(false), |a, b| a.or(b));
base_lf.filter(any_not_null)
} else {
base_lf.drop_nulls(Some(col_exprs))
};
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn fillna(
df: &DataFrame,
value_expr: Expr,
subset: Option<Vec<&str>>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let exprs: Vec<Expr> = match subset {
Some(cols) => cols
.iter()
.map(|n| {
let resolved = df.resolve_column_name(n)?;
Ok(col(resolved.as_str()).fill_null(value_expr.clone()))
})
.collect::<Result<Vec<_>, PolarsError>>()?,
None => df
.columns()?
.iter()
.map(|n| col(n.as_str()).fill_null(value_expr.clone()))
.collect(),
};
let lf = df.lazy_frame().with_columns(exprs);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn limit(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
let lf = df.lazy_frame().slice(0, n as u32);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn with_column_renamed(
df: &DataFrame,
old_name: &str,
new_name: &str,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let resolved = df.resolve_column_name(old_name)?;
let lf = df
.lazy_frame()
.rename([resolved.as_str()], [new_name], true);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn replace(
df: &DataFrame,
column_name: &str,
old_value: Expr,
new_value: Expr,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let resolved = df.resolve_column_name(column_name)?;
let repl = when(col(resolved.as_str()).eq(old_value))
.then(new_value)
.otherwise(col(resolved.as_str()));
let lf = df.lazy_frame().with_column(repl.alias(resolved.as_str()));
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn cross_join(
left: &DataFrame,
right: &DataFrame,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let lf_left = left.lazy_frame();
let lf_right = right.lazy_frame();
let out = lf_left.cross_join(lf_right, None);
Ok(super::DataFrame::from_lazy_with_options(
out,
case_sensitive,
))
}
pub fn describe(df: &DataFrame, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let pl_df = df.collect_inner()?.as_ref().clone();
let mut stat_values: Vec<Column> = Vec::new();
for col in pl_df.get_columns() {
let s = col.as_materialized_series();
let dtype = s.dtype();
if dtype.is_numeric() {
let name = s.name().clone();
let count = s.len() as i64 - s.null_count() as i64;
let mean_f = s.mean().unwrap_or(f64::NAN);
let std_f = s.std(1).unwrap_or(f64::NAN);
let s_f64 = s.cast(&DataType::Float64)?;
let ca = s_f64
.f64()
.map_err(|_| PolarsError::ComputeError("cast to f64 failed".into()))?;
let min_f = ca.min().unwrap_or(f64::NAN);
let max_f = ca.max().unwrap_or(f64::NAN);
let is_float = matches!(dtype, DataType::Float64 | DataType::Float32);
let count_s = count.to_string();
let mean_s = if mean_f.is_nan() {
"None".to_string()
} else {
format!("{:.1}", mean_f)
};
let std_s = if std_f.is_nan() {
"None".to_string()
} else {
format!("{:.1}", std_f)
};
let min_s = if min_f.is_nan() {
"None".to_string()
} else if min_f.fract() == 0.0 && is_float {
format!("{:.1}", min_f)
} else if min_f.fract() == 0.0 {
format!("{:.0}", min_f)
} else {
format!("{min_f}")
};
let max_s = if max_f.is_nan() {
"None".to_string()
} else if max_f.fract() == 0.0 && is_float {
format!("{:.1}", max_f)
} else if max_f.fract() == 0.0 {
format!("{:.0}", max_f)
} else {
format!("{max_f}")
};
let series = Series::new(
name,
[
count_s.as_str(),
mean_s.as_str(),
std_s.as_str(),
min_s.as_str(),
max_s.as_str(),
],
);
stat_values.push(series.into());
}
}
if stat_values.is_empty() {
let stat_col = Series::new(
"summary".into(),
&["count", "mean", "stddev", "min", "max" as &str],
)
.into();
let empty: Vec<f64> = Vec::new();
let empty_series = Series::new("placeholder".into(), empty).into();
let out_pl = polars::prelude::DataFrame::new(vec![stat_col, empty_series])?;
return Ok(super::DataFrame::from_polars_with_options(
out_pl,
case_sensitive,
));
}
let summary_col = Series::new(
"summary".into(),
&["count", "mean", "stddev", "min", "max" as &str],
)
.into();
let mut cols: Vec<Column> = vec![summary_col];
cols.extend(stat_values);
let out_pl = polars::prelude::DataFrame::new(cols)?;
Ok(super::DataFrame::from_polars_with_options(
out_pl,
case_sensitive,
))
}
pub fn subtract(
left: &DataFrame,
right: &DataFrame,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let left_names = left.columns()?;
let right_names = right.columns()?;
let right_on: Vec<Expr> = left_names
.iter()
.map(|ln| {
let resolved = if case_sensitive {
right_names
.iter()
.find(|rn| rn.as_str() == ln.as_str())
.cloned()
.ok_or_else(|| {
PolarsError::ColumnNotFound(
format!("subtract: column '{}' not found on right", ln).into(),
)
})?
} else {
let ln_lower = ln.to_lowercase();
right_names
.iter()
.find(|rn| rn.to_lowercase() == ln_lower)
.cloned()
.ok_or_else(|| {
PolarsError::ColumnNotFound(
format!("subtract: column '{}' not found on right", ln).into(),
)
})?
};
Ok(col(resolved.as_str()))
})
.collect::<Result<Vec<_>, PolarsError>>()?;
let left_on: Vec<Expr> = left_names.iter().map(|n| col(n.as_str())).collect();
let right_lf = right.lazy_frame();
let left_lf = left.lazy_frame();
let anti = left_lf.join(right_lf, left_on, right_on, JoinArgs::new(JoinType::Anti));
Ok(super::DataFrame::from_lazy_with_options(
anti,
case_sensitive,
))
}
pub fn intersect(
left: &DataFrame,
right: &DataFrame,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let left_names = left.columns()?;
let right_names = right.columns()?;
let right_on: Vec<Expr> = left_names
.iter()
.map(|ln| {
let resolved = if case_sensitive {
right_names
.iter()
.find(|rn| rn.as_str() == ln.as_str())
.cloned()
.ok_or_else(|| {
PolarsError::ColumnNotFound(
format!("intersect: column '{}' not found on right", ln).into(),
)
})?
} else {
let ln_lower = ln.to_lowercase();
right_names
.iter()
.find(|rn| rn.to_lowercase() == ln_lower)
.cloned()
.ok_or_else(|| {
PolarsError::ColumnNotFound(
format!("intersect: column '{}' not found on right", ln).into(),
)
})?
};
Ok(col(resolved.as_str()))
})
.collect::<Result<Vec<_>, PolarsError>>()?;
let left_on: Vec<Expr> = left_names.iter().map(|n| col(n.as_str())).collect();
let left_lf = left.lazy_frame();
let right_lf = right.lazy_frame();
let semi = left_lf
.join(right_lf, left_on, right_on, JoinArgs::new(JoinType::Semi))
.unique(None, UniqueKeepStrategy::First);
Ok(super::DataFrame::from_lazy_with_options(
semi,
case_sensitive,
))
}
pub fn sample(
df: &DataFrame,
with_replacement: bool,
fraction: f64,
seed: Option<u64>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::Series;
let pl = df.collect_inner()?;
let n = pl.height();
if n == 0 {
return Ok(super::DataFrame::from_lazy_with_options(
polars::prelude::DataFrame::empty().lazy(),
case_sensitive,
));
}
let take_n = (n as f64 * fraction).round() as usize;
let take_n = take_n.min(n).max(0);
if take_n == 0 {
return Ok(super::DataFrame::from_lazy_with_options(
pl.as_ref().head(Some(0)).lazy(),
case_sensitive,
));
}
let idx_series = Series::new("idx".into(), (0..n).map(|i| i as u32).collect::<Vec<_>>());
let sampled_idx = idx_series.sample_n(take_n, with_replacement, true, seed)?;
let idx_ca = sampled_idx
.u32()
.map_err(|_| PolarsError::ComputeError("sample: expected u32 indices".into()))?;
let pl_df = pl.as_ref().take(idx_ca)?;
Ok(super::DataFrame::from_polars_with_options(
pl_df,
case_sensitive,
))
}
pub fn random_split(
df: &DataFrame,
weights: &[f64],
seed: Option<u64>,
case_sensitive: bool,
) -> Result<Vec<DataFrame>, PolarsError> {
let total: f64 = weights.iter().sum();
if total <= 0.0 || weights.is_empty() {
return Ok(Vec::new());
}
let pl = df.collect_inner()?;
let n = pl.height();
if n == 0 {
return Ok(weights.iter().map(|_| super::DataFrame::empty()).collect());
}
let mut cum = Vec::with_capacity(weights.len());
let mut acc = 0.0_f64;
for w in weights {
acc += w / total;
cum.push(acc);
}
use polars::prelude::Series;
use rand::Rng;
use rand::SeedableRng;
let mut rng = rand::rngs::StdRng::seed_from_u64(seed.unwrap_or(0));
let mut bucket_indices: Vec<Vec<u32>> = (0..weights.len()).map(|_| Vec::new()).collect();
for i in 0..n {
let r: f64 = rng.gen();
let bucket = cum
.iter()
.position(|&c| r < c)
.unwrap_or(weights.len().saturating_sub(1));
bucket_indices[bucket].push(i as u32);
}
let pl = pl.as_ref();
let mut out = Vec::with_capacity(weights.len());
for indices in bucket_indices {
if indices.is_empty() {
out.push(super::DataFrame::from_polars_with_options(
pl.clone().head(Some(0)),
case_sensitive,
));
} else {
let idx_series = Series::new("idx".into(), indices);
let idx_ca = idx_series.u32().map_err(|_| {
PolarsError::ComputeError("random_split: expected u32 indices".into())
})?;
let taken = pl.take(idx_ca)?;
out.push(super::DataFrame::from_polars_with_options(
taken,
case_sensitive,
));
}
}
Ok(out)
}
pub fn sample_by(
df: &DataFrame,
col_name: &str,
fractions: &[(Expr, f64)],
seed: Option<u64>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
if fractions.is_empty() {
return Ok(super::DataFrame::from_lazy_with_options(
df.lazy_frame().slice(0, 0),
case_sensitive,
));
}
let resolved = df.resolve_column_name(col_name)?;
let mut parts = Vec::with_capacity(fractions.len());
for (value_expr, frac) in fractions {
let cond = col(resolved.as_str()).eq(value_expr.clone());
let filtered = df.lazy_frame().filter(cond).collect()?;
if filtered.height() == 0 {
parts.push(filtered.head(Some(0)));
continue;
}
let sampled = sample(
&super::DataFrame::from_polars_with_options(filtered, case_sensitive),
false,
*frac,
seed,
case_sensitive,
)?;
parts.push(sampled.collect_inner()?.as_ref().clone());
}
let mut out = parts
.first()
.ok_or_else(|| PolarsError::ComputeError("sample_by: no parts".into()))?
.clone();
for p in parts.iter().skip(1) {
out.vstack_mut(p)?;
}
Ok(super::DataFrame::from_polars_with_options(
out,
case_sensitive,
))
}
pub fn first(df: &DataFrame, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
let pl_df = df.collect_inner()?.as_ref().clone().head(Some(1));
Ok(super::DataFrame::from_polars_with_options(
pl_df,
case_sensitive,
))
}
pub fn head(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
limit(df, n, case_sensitive)
}
pub fn take(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
limit(df, n, case_sensitive)
}
pub fn tail(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
let pl = df.collect_inner()?;
let total = pl.height();
let skip = total.saturating_sub(n);
let pl_df = pl.as_ref().clone().slice(skip as i64, n);
Ok(super::DataFrame::from_polars_with_options(
pl_df,
case_sensitive,
))
}
pub fn is_empty(df: &DataFrame) -> bool {
df.count().map(|n| n == 0).unwrap_or(true)
}
pub fn to_df(
df: &DataFrame,
names: &[&str],
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let cols = df.columns()?;
if names.len() != cols.len() {
return Err(PolarsError::ComputeError(
format!(
"toDF: expected {} column names, got {}",
cols.len(),
names.len()
)
.into(),
));
}
let pl_df = df.collect_inner()?;
let mut pl_df = pl_df.as_ref().clone();
for (old, new) in cols.iter().zip(names.iter()) {
pl_df.rename(old.as_str(), (*new).into())?;
}
Ok(super::DataFrame::from_polars_with_options(
pl_df,
case_sensitive,
))
}
fn any_value_to_serde_value(av: &polars::prelude::AnyValue) -> serde_json::Value {
use polars::prelude::AnyValue;
use serde_json::Number;
match av {
AnyValue::Null => serde_json::Value::Null,
AnyValue::Boolean(v) => serde_json::Value::Bool(*v),
AnyValue::Int8(v) => serde_json::Value::Number(Number::from(*v as i64)),
AnyValue::Int32(v) => serde_json::Value::Number(Number::from(*v)),
AnyValue::Int64(v) => serde_json::Value::Number(Number::from(*v)),
AnyValue::UInt32(v) => serde_json::Value::Number(Number::from(*v)),
AnyValue::Float64(v) => Number::from_f64(*v)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
AnyValue::String(v) => serde_json::Value::String(v.to_string()),
_ => serde_json::Value::String(format!("{av:?}")),
}
}
pub fn to_json(df: &DataFrame) -> Result<Vec<String>, PolarsError> {
use polars::prelude::*;
let collected = df.collect_inner()?;
let pl = collected.as_ref();
let names = pl.get_column_names();
let mut out = Vec::with_capacity(pl.height());
for r in 0..pl.height() {
let mut row = serde_json::Map::new();
for (i, name) in names.iter().enumerate() {
let col = pl
.get_columns()
.get(i)
.ok_or_else(|| PolarsError::ComputeError("to_json: column index".into()))?;
let series = col.as_materialized_series();
let av = series
.get(r)
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
row.insert(name.to_string(), any_value_to_serde_value(&av));
}
out.push(
serde_json::to_string(&row)
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?,
);
}
Ok(out)
}
pub fn explain(_df: &DataFrame) -> String {
"DataFrame (eager Polars backend)".to_string()
}
pub fn print_schema(df: &DataFrame) -> Result<String, PolarsError> {
let schema = df.schema()?;
let mut s = "root\n".to_string();
for f in schema.fields() {
let dt = match &f.data_type {
crate::schema::DataType::String => "string",
crate::schema::DataType::Integer => "int",
crate::schema::DataType::Long => "bigint",
crate::schema::DataType::Double => "double",
crate::schema::DataType::Boolean => "boolean",
crate::schema::DataType::Date => "date",
crate::schema::DataType::Timestamp => "timestamp",
_ => "string",
};
s.push_str(&format!(" |-- {}: {}\n", f.name, dt));
}
Ok(s)
}
pub fn select_expr(
df: &DataFrame,
exprs: &[String],
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let mut cols = Vec::new();
for e in exprs {
let e = e.trim();
if let Some((left, right)) = e.split_once(" as ") {
let col_name = left.trim();
let _alias = right.trim();
cols.push(df.resolve_column_name(col_name)?);
} else {
cols.push(df.resolve_column_name(e)?);
}
}
let refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
select(df, refs, case_sensitive)
}
pub fn col_regex(
df: &DataFrame,
pattern: &str,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let re = regex::Regex::new(pattern).map_err(|e| {
PolarsError::ComputeError(format!("colRegex: invalid pattern {pattern:?}: {e}").into())
})?;
let names = df.columns()?;
let matched: Vec<&str> = names
.iter()
.filter(|n| re.is_match(n))
.map(|s| s.as_str())
.collect();
if matched.is_empty() {
return Err(PolarsError::ComputeError(
format!("colRegex: no columns matched pattern {pattern:?}").into(),
));
}
select(df, matched, case_sensitive)
}
pub fn with_columns(
df: &DataFrame,
exprs: &[(String, crate::column::Column)],
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let pl = df.collect_inner()?.as_ref().clone();
let mut current = super::DataFrame::from_polars_with_options(pl, case_sensitive);
for (name, col) in exprs {
current = with_column(¤t, name, col, case_sensitive)?;
}
Ok(current)
}
pub fn with_columns_renamed(
df: &DataFrame,
renames: &[(String, String)],
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
let mut mapping = Vec::new();
for (old_name, new_name) in renames {
let resolved = df.resolve_column_name(old_name)?;
mapping.push((resolved, new_name.clone()));
}
let mut lf = df.lazy_frame();
for (old, new) in mapping {
lf = lf.rename([old.as_str()], [new.as_str()], true);
}
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub struct DataFrameNa<'a> {
pub(crate) df: &'a DataFrame,
}
impl<'a> DataFrameNa<'a> {
pub fn fill(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
fillna(self.df, value, subset, self.df.case_sensitive)
}
pub fn replace(
&self,
old_value: Expr,
new_value: Expr,
subset: Option<Vec<&str>>,
) -> Result<DataFrame, PolarsError> {
let cols: Vec<String> = match &subset {
Some(s) => s.iter().map(|x| (*x).to_string()).collect(),
None => self.df.columns()?,
};
let mut result = self.df.clone();
for col_name in &cols {
result = replace(
&result,
col_name.as_str(),
old_value.clone(),
new_value.clone(),
self.df.case_sensitive,
)?;
}
Ok(result)
}
pub fn drop(
&self,
subset: Option<Vec<&str>>,
how: &str,
thresh: Option<usize>,
) -> Result<DataFrame, PolarsError> {
dropna(self.df, subset, how, thresh, self.df.case_sensitive)
}
}
pub fn offset(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
let lf = df.lazy_frame().slice(n as i64, u32::MAX);
Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
}
pub fn transform<F>(df: &DataFrame, f: F) -> Result<DataFrame, PolarsError>
where
F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
{
let df_out = f(df.clone())?;
Ok(df_out)
}
pub fn freq_items(
df: &DataFrame,
columns: &[&str],
support: f64,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::SeriesMethods;
if columns.is_empty() {
return Ok(super::DataFrame::from_lazy_with_options(
df.lazy_frame().slice(0, 0),
case_sensitive,
));
}
let support = support.clamp(1e-4, 1.0);
let collected = df.collect_inner()?;
let pl_df = collected.as_ref();
let n_total = pl_df.height() as f64;
if n_total == 0.0 {
let mut out = Vec::with_capacity(columns.len());
for col_name in columns {
let resolved = df.resolve_column_name(col_name)?;
let s = pl_df
.column(resolved.as_str())?
.as_series()
.ok_or_else(|| PolarsError::ComputeError("column not a series".into()))?
.clone();
let empty_sub = s.head(Some(0));
let list_chunked = polars::prelude::ListChunked::from_iter([empty_sub].into_iter())
.with_name(format!("{resolved}_freqItems").into());
out.push(list_chunked.into_series().into());
}
return Ok(super::DataFrame::from_polars_with_options(
polars::prelude::DataFrame::new(out)?,
case_sensitive,
));
}
let mut out_series = Vec::with_capacity(columns.len());
for col_name in columns {
let resolved = df.resolve_column_name(col_name)?;
let s = pl_df
.column(resolved.as_str())?
.as_series()
.ok_or_else(|| PolarsError::ComputeError("column not a series".into()))?
.clone();
let vc = s.value_counts(false, false, "counts".into(), false)?;
let count_col = vc
.column("counts")
.map_err(|_| PolarsError::ComputeError("value_counts missing counts column".into()))?;
let counts = count_col
.u32()
.map_err(|_| PolarsError::ComputeError("freq_items: counts column not u32".into()))?;
let value_col_name = s.name();
let values_col = vc
.column(value_col_name.as_str())
.map_err(|_| PolarsError::ComputeError("value_counts missing value column".into()))?;
let threshold = (support * n_total).ceil() as u32;
let indices: Vec<u32> = counts
.into_iter()
.enumerate()
.filter_map(|(i, c)| {
if c? >= threshold {
Some(i as u32)
} else {
None
}
})
.collect();
let idx_series = Series::new("idx".into(), indices);
let idx_ca = idx_series
.u32()
.map_err(|_| PolarsError::ComputeError("freq_items: index series not u32".into()))?;
let values_series = values_col
.as_series()
.ok_or_else(|| PolarsError::ComputeError("value column not a series".into()))?;
let filtered = values_series.take(idx_ca)?;
let list_chunked = polars::prelude::ListChunked::from_iter([filtered].into_iter())
.with_name(format!("{resolved}_freqItems").into());
let list_row = list_chunked.into_series();
out_series.push(list_row.into());
}
let out_df = polars::prelude::DataFrame::new(out_series)?;
Ok(super::DataFrame::from_polars_with_options(
out_df,
case_sensitive,
))
}
pub fn approx_quantile(
df: &DataFrame,
column: &str,
probabilities: &[f64],
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::{ChunkQuantile, QuantileMethod};
if probabilities.is_empty() {
return Ok(super::DataFrame::from_polars_with_options(
polars::prelude::DataFrame::new(vec![Series::new(
"quantile".into(),
Vec::<f64>::new(),
)
.into()])?,
case_sensitive,
));
}
let resolved = df.resolve_column_name(column)?;
let collected = df.collect_inner()?;
let s = collected
.column(resolved.as_str())?
.as_series()
.ok_or_else(|| PolarsError::ComputeError("approx_quantile: column not a series".into()))?
.clone();
let s_f64 = s.cast(&polars::prelude::DataType::Float64)?;
let ca = s_f64
.f64()
.map_err(|_| PolarsError::ComputeError("approx_quantile: need numeric column".into()))?;
let mut quantiles = Vec::with_capacity(probabilities.len());
for &p in probabilities {
let q = ca.quantile(p, QuantileMethod::Linear)?;
quantiles.push(q.unwrap_or(f64::NAN));
}
let out_df =
polars::prelude::DataFrame::new(vec![Series::new("quantile".into(), quantiles).into()])?;
Ok(super::DataFrame::from_polars_with_options(
out_df,
case_sensitive,
))
}
pub fn crosstab(
df: &DataFrame,
col1: &str,
col2: &str,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let c1 = df.resolve_column_name(col1)?;
let c2 = df.resolve_column_name(col2)?;
let collected = df.collect_inner()?;
let pl_df = collected.as_ref();
let grouped = pl_df
.clone()
.lazy()
.group_by([col(c1.as_str()), col(c2.as_str())])
.agg([len().alias("count")])
.collect()?;
Ok(super::DataFrame::from_polars_with_options(
grouped,
case_sensitive,
))
}
pub fn melt(
df: &DataFrame,
id_vars: &[&str],
value_vars: &[&str],
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use polars::prelude::*;
let collected = df.collect_inner()?;
let pl_df = collected.as_ref();
if value_vars.is_empty() {
return Ok(super::DataFrame::from_polars_with_options(
pl_df.head(Some(0)),
case_sensitive,
));
}
let id_resolved: Vec<String> = id_vars
.iter()
.map(|s| df.resolve_column_name(s).map(|r| r.to_string()))
.collect::<Result<Vec<_>, _>>()?;
let value_resolved: Vec<String> = value_vars
.iter()
.map(|s| df.resolve_column_name(s).map(|r| r.to_string()))
.collect::<Result<Vec<_>, _>>()?;
let mut parts = Vec::with_capacity(value_vars.len());
for vname in &value_resolved {
let select_cols: Vec<&str> = id_resolved
.iter()
.map(|s| s.as_str())
.chain([vname.as_str()])
.collect();
let mut part = pl_df.select(select_cols)?;
let var_series = Series::new("variable".into(), vec![vname.as_str(); part.height()]);
part.with_column(var_series)?;
part.rename(vname.as_str(), "value".into())?;
parts.push(part);
}
let mut out = parts
.first()
.ok_or_else(|| PolarsError::ComputeError("melt: no value columns".into()))?
.clone();
for p in parts.iter().skip(1) {
out.vstack_mut(p)?;
}
let col_order: Vec<&str> = id_resolved
.iter()
.map(|s| s.as_str())
.chain(["variable", "value"])
.collect();
let out = out.select(col_order)?;
Ok(super::DataFrame::from_polars_with_options(
out,
case_sensitive,
))
}
pub fn except_all(
left: &DataFrame,
right: &DataFrame,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
subtract(left, right, case_sensitive)
}
pub fn intersect_all(
left: &DataFrame,
right: &DataFrame,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
intersect(left, right, case_sensitive)
}
#[cfg(test)]
mod tests {
use super::{distinct, drop, dropna, first, head, limit, offset};
use crate::{DataFrame, SparkSession};
fn test_df() -> DataFrame {
let spark = SparkSession::builder()
.app_name("transform_tests")
.get_or_create();
spark
.create_dataframe(
vec![
(1i64, 10i64, "a".to_string()),
(2i64, 20i64, "b".to_string()),
(3i64, 30i64, "c".to_string()),
],
vec!["id", "v", "label"],
)
.unwrap()
}
#[test]
fn limit_zero() {
let df = test_df();
let out = limit(&df, 0, false).unwrap();
assert_eq!(out.count().unwrap(), 0);
}
#[test]
fn limit_more_than_rows() {
let df = test_df();
let out = limit(&df, 10, false).unwrap();
assert_eq!(out.count().unwrap(), 3);
}
#[test]
fn distinct_on_empty() {
let spark = SparkSession::builder()
.app_name("transform_tests")
.get_or_create();
let df = spark
.create_dataframe(vec![] as Vec<(i64, i64, String)>, vec!["a", "b", "c"])
.unwrap();
let out = distinct(&df, None, false).unwrap();
assert_eq!(out.count().unwrap(), 0);
}
#[test]
fn first_returns_one_row() {
let df = test_df();
let out = first(&df, false).unwrap();
assert_eq!(out.count().unwrap(), 1);
}
#[test]
fn head_n() {
let df = test_df();
let out = head(&df, 2, false).unwrap();
assert_eq!(out.count().unwrap(), 2);
}
#[test]
fn offset_skip_first() {
let df = test_df();
let out = offset(&df, 1, false).unwrap();
assert_eq!(out.count().unwrap(), 2);
}
#[test]
fn offset_beyond_length_returns_empty() {
let df = test_df();
let out = offset(&df, 10, false).unwrap();
assert_eq!(out.count().unwrap(), 0);
}
#[test]
fn drop_column() {
let df = test_df();
let out = drop(&df, vec!["v"], false).unwrap();
let cols = out.columns().unwrap();
assert!(!cols.contains(&"v".to_string()));
assert_eq!(out.count().unwrap(), 3);
}
#[test]
fn dropna_all_columns() {
let df = test_df();
let out = dropna(&df, None, "any", None, false).unwrap();
assert_eq!(out.count().unwrap(), 3);
}
}