use super::compute_err;
use chrono::{Datelike, TimeZone};
use chrono_tz::Tz;
use polars::prelude::*;
use regex::Regex;
use std::borrow::Cow;
use std::ops::Not;
fn split_str_by_regex_limit(s: &str, re: &Regex, limit: usize) -> Vec<String> {
if limit == 1 {
return vec![s.to_string()];
}
if limit == 0 || limit == usize::MAX {
return re.split(s).map(|p| p.to_string()).collect();
}
let mut result = Vec::with_capacity(limit);
let mut last_end = 0;
for (count, mat) in re.find_iter(s).enumerate() {
if count >= limit - 1 {
break;
}
result.push(s[last_end..mat.start()].to_string());
last_end = mat.end();
}
result.push(s[last_end..].to_string());
result
}
pub fn apply_split_with_limit(
column: Column,
delimiter: &str,
limit: i32,
) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("split_with_limit", e))?;
let n = if limit <= 0 {
usize::MAX
} else {
limit as usize
};
let re = if delimiter.is_empty() {
None
} else {
Some(Regex::new(delimiter).map_err(|e| compute_err("split_with_limit pattern", e))?)
};
let values_capacity = ca.len().saturating_mul(64);
let mut builder =
ListStringChunkedBuilder::new(name.as_str().into(), ca.len(), values_capacity);
for opt_s in ca.into_iter() {
match opt_s {
Some(s) => {
if delimiter.is_empty() {
builder.append_values_iter(s.split(delimiter));
} else {
let parts = split_str_by_regex_limit(s, re.as_ref().unwrap(), n);
builder.append_values_iter(parts.iter().map(String::as_str));
}
}
None => builder.append_null(),
}
}
let out = builder.finish();
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_split_part_regex(
column: Column,
pattern: &str,
part_num: i64,
) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("split_part_regex", e))?;
let re = Regex::new(pattern).map_err(|e| compute_err("split_part_regex pattern", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let parts: Vec<&str> = re.split(s).collect();
let idx = if part_num > 0 {
(part_num - 1) as usize
} else {
let n = parts.len() as i64;
(n + part_num) as usize
};
parts.get(idx).map(|&p| p.to_string()).unwrap_or_default()
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_literal_string_repeat(column: Column, value: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let len = series.len();
let out = StringChunked::from_iter_options(
name.as_str().into(),
(0..len).map(|_| Some(value.to_string())),
);
Ok(Some(Column::new(name, out.into_series())))
}
fn soundex_one(s: &str) -> Cow<'_, str> {
if s.is_empty() {
return Cow::Borrowed("");
}
use soundex::american_soundex;
let code = american_soundex(s);
Cow::Owned(code.chars().take(4).collect::<String>())
}
pub fn apply_isnan_pyspark_parity(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = match series.dtype() {
DataType::Float32 | DataType::Float64 => {
let bool_ca = series.is_nan().map_err(|e| compute_err("isnan", e))?;
let out = BooleanChunked::from_iter_options(
name.as_str().into(),
bool_ca
.into_iter()
.map(|opt_b| Some(opt_b.unwrap_or(false))),
);
out.into_series()
}
DataType::String => {
let ca = series.str().map_err(|e| compute_err("isnan", e))?;
let out = BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
Some(
opt_s
.map(|s| s.trim().eq_ignore_ascii_case("nan"))
.unwrap_or(false),
)
}),
);
out.into_series()
}
_ => BooleanChunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| Some(false)),
)
.into_series(),
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_soundex(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("soundex", e))?;
let out: StringChunked = ca.apply_values(soundex_one);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_crc32(column: Column) -> PolarsResult<Option<Column>> {
use crc32fast::Hasher;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("crc32", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let mut hasher = Hasher::new();
hasher.update(s.as_bytes());
hasher.finalize() as i64
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
const XXH64_SEED: u64 = 42;
pub fn apply_xxhash64(column: Column) -> PolarsResult<Option<Column>> {
use std::hash::Hasher;
use twox_hash::XxHash64;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("xxhash64", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
Some(
opt_s
.map(|s| {
let mut hasher = XxHash64::with_seed(XXH64_SEED);
hasher.write(s.as_bytes());
hasher.finish() as i64
})
.unwrap_or(42),
)
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
fn initcap_str(s: &str) -> Cow<'_, str> {
let mut out = String::with_capacity(s.len());
let mut word_start = true;
for c in s.chars() {
if c.is_alphabetic() || c.is_numeric() {
if word_start {
out.extend(c.to_uppercase());
word_start = false;
} else {
out.extend(c.to_lowercase());
}
} else {
out.push(c);
word_start = true;
}
}
Cow::Owned(out)
}
pub fn apply_initcap(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("initcap", e))?;
let out = ca.apply_values(initcap_str);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_levenshtein(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use strsim::levenshtein;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"levenshtein needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series.str().map_err(|e| compute_err("levenshtein", e))?;
let b_ca = b_series.str().map_err(|e| compute_err("levenshtein", e))?;
let len_a = a_ca.len();
let len_b = b_ca.len();
let len = len_a.max(len_b);
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
(0..len).map(|i| {
let ia = if len_a == 1 { 0 } else { i };
let ib = if len_b == 1 { 0 } else { i };
let a = a_ca.get(ia);
let b = b_ca.get(ib);
match (a, b) {
(Some(a), Some(b)) => Some(levenshtein(a, b) as i64),
_ => None,
}
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_array_flatten(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let list_ca = series.list().map_err(|e| compute_err("array_flatten", e))?;
let inner_dtype = match list_ca.inner_dtype() {
DataType::List(inner) => *inner.clone(),
other => other.clone(),
};
let out = list_ca.try_apply_amortized(|amort_s| {
let s = amort_s.as_ref();
let list_s = s.as_list();
if list_s.is_empty() {
return Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype));
}
let mut acc: Vec<Series> = Vec::new();
for elem in list_s.amortized_iter().flatten() {
acc.push(elem.deep_clone());
}
if acc.is_empty() {
Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
} else {
let mut result = acc.remove(0);
for s in acc {
result.extend(&s)?;
}
Ok(result)
}
})?;
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_posexplode_positions(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let list_ca = series
.list()
.map_err(|e| compute_err("posexplode_positions", e))?;
let values_capacity = list_ca.len().saturating_mul(4);
let mut builder = ListPrimitiveChunkedBuilder::<Int64Type>::new(
name.as_str().into(),
list_ca.len(),
values_capacity,
DataType::Int64,
);
for opt_s in list_ca.into_iter() {
match opt_s {
Some(s) => {
let len = s.len();
builder.append_iter((0..len).map(|i| Some(i as i64)));
}
None => builder.append_null(),
}
}
let out = builder.finish().into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_array_distinct_first_order(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let list_ca = series
.list()
.map_err(|e| compute_err("array_distinct", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let out = list_ca.try_apply_amortized(|amort_s| {
let list_s = amort_s.as_ref().as_list();
let mut result: Vec<Series> = Vec::new();
for elem in list_s.amortized_iter().flatten() {
let taken = elem.deep_clone();
let is_dup = result.iter().any(|s| s.get(0).ok() == taken.get(0).ok());
if !is_dup {
result.push(taken);
}
}
if result.is_empty() {
Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
} else {
let mut combined = result.remove(0);
for s in result {
combined.extend(&s)?;
}
Ok(combined)
}
})?;
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_array_contains(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use polars::prelude::{AnyValue, BooleanChunked};
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"array_contains needs two columns (array, value)".into(),
));
}
let name = columns[0].field().into_owned().name;
let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
let value_series = std::mem::take(&mut columns[1]).take_materialized_series();
let list_nulls = list_series.is_null();
let list_ca = list_series
.list()
.map_err(|e| compute_err("array_contains", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let value_casted = value_series.cast(&inner_dtype)?;
let elem_len = value_casted.len();
let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| value_casted.get(i).ok()).collect();
let mut results: Vec<Option<bool>> = Vec::with_capacity(list_ca.len());
for (row_idx, opt_list) in list_ca.amortized_iter().enumerate() {
let row_is_null = list_nulls.get(row_idx).unwrap_or(false);
if opt_list.is_none() || row_is_null {
results.push(None);
continue;
}
let ei = if elem_len == 1 { 0 } else { row_idx };
let contains = match (opt_list, elem_vec.get(ei)) {
(Some(amort), Some(Some(av))) => {
let val_series = any_value_to_single_series(av.clone(), &inner_dtype)?;
let val_key = series_to_set_key(&val_series);
let list_s = amort.as_ref().as_list();
let mut found = false;
for e in list_s.amortized_iter().flatten() {
let key = series_to_set_key(&e.deep_clone());
if key == val_key {
found = true;
break;
}
}
Some(found)
}
_ => Some(false),
};
results.push(contains);
}
let out = BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter());
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_literal_null_list_repeat(
column: Column,
dtype: &DataType,
) -> PolarsResult<Option<Column>> {
use polars::prelude::Series;
let name = column.field().into_owned().name;
let len = column.len();
let out = Series::full_null(name.clone(), len, dtype);
Ok(Some(Column::new(name, out)))
}
pub fn apply_array_repeat(column: Column, n: i64) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let n_usize = n.max(0) as usize;
if !matches!(series.dtype(), DataType::List(_)) {
use polars::chunked_array::builder::get_list_builder;
let inner_dtype = series.dtype().clone();
let len = series.len();
let mut builder = get_list_builder(&inner_dtype, 64, len, name.as_str().into());
for i in 0..len {
let opt_av = series.get(i);
let elem_series = match opt_av {
Ok(av) => any_value_to_single_series(av, &inner_dtype)?,
Err(_) => Series::new_empty(PlSmallStr::EMPTY, &inner_dtype),
};
let mut repeated = elem_series.clone();
for _ in 1..n_usize {
repeated.extend(&elem_series)?;
}
builder
.append_series(&repeated)
.map_err(|e| compute_err("array_repeat scalar", e))?;
}
let out = builder.finish().into_series();
return Ok(Some(Column::new(name, out)));
}
let list_ca = series.list().map_err(|e| compute_err("array_repeat", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let n = n.max(0) as usize;
let out = list_ca.try_apply_amortized(move |amort_s| {
let list_s = amort_s.as_ref().as_list();
let mut repeated: Vec<Series> = Vec::new();
for elem in list_s.amortized_iter().flatten() {
let taken = elem.deep_clone();
for _ in 0..n {
repeated.push(taken.clone());
}
}
if repeated.is_empty() {
Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
} else {
let mut result = repeated.remove(0);
for s in repeated {
result.extend(&s)?;
}
Ok(result)
}
})?;
Ok(Some(Column::new(name, out.into_series())))
}
fn any_value_to_single_series(av: AnyValue, dtype: &DataType) -> PolarsResult<Series> {
Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &[av], dtype, false)
}
pub fn apply_array_append(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use std::cell::RefCell;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"array_append needs two columns (array, element)".into(),
));
}
let name = columns[0].field().into_owned().name;
let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
let elem_series = std::mem::take(&mut columns[1]).take_materialized_series();
let list_ca = list_series
.list()
.map_err(|e| compute_err("array_append", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let elem_casted = elem_series.cast(&inner_dtype)?;
let elem_len = elem_casted.len();
let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
let idx = RefCell::new(0usize);
let out = list_ca.try_apply_amortized(|amort_s| {
let i = *idx.borrow();
*idx.borrow_mut() += 1;
let ei = if elem_len == 1 { 0 } else { i };
let list_s = amort_s.as_ref().as_list();
let mut acc: Vec<Series> = Vec::new();
for e in list_s.amortized_iter().flatten() {
acc.push(e.deep_clone());
}
if let Some(Some(av)) = elem_vec.get(ei) {
let single = any_value_to_single_series(av.clone(), &inner_dtype)?;
acc.push(single);
}
if acc.is_empty() {
Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
} else {
let mut result = acc.remove(0);
for s in acc {
result.extend(&s)?;
}
Ok(result)
}
})?;
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_array_prepend(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use std::cell::RefCell;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"array_prepend needs two columns (array, element)".into(),
));
}
let name = columns[0].field().into_owned().name;
let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
let elem_series = std::mem::take(&mut columns[1]).take_materialized_series();
let list_ca = list_series
.list()
.map_err(|e| compute_err("array_prepend", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let elem_casted = elem_series.cast(&inner_dtype)?;
let elem_len = elem_casted.len();
let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
let idx = RefCell::new(0usize);
let out = list_ca.try_apply_amortized(|amort_s| {
let i = *idx.borrow();
*idx.borrow_mut() += 1;
let ei = if elem_len == 1 { 0 } else { i };
let list_s = amort_s.as_ref().as_list();
let mut acc: Vec<Series> = Vec::new();
if let Some(Some(av)) = elem_vec.get(ei) {
let single = any_value_to_single_series(av.clone(), &inner_dtype)?;
acc.push(single);
}
for e in list_s.amortized_iter().flatten() {
acc.push(e.deep_clone());
}
if acc.is_empty() {
Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
} else {
let mut result = acc.remove(0);
for s in acc {
result.extend(&s)?;
}
Ok(result)
}
})?;
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_array_insert(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use std::cell::RefCell;
if columns.len() < 3 {
return Err(PolarsError::ComputeError(
"array_insert needs three columns (array, position, element)".into(),
));
}
let name = columns[0].field().into_owned().name;
let list_series = std::mem::take(&mut columns[0]).take_materialized_series();
let pos_series = std::mem::take(&mut columns[1]).take_materialized_series();
let elem_series = std::mem::take(&mut columns[2]).take_materialized_series();
let list_ca = list_series
.list()
.map_err(|e| compute_err("array_insert", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let pos_ca = pos_series
.cast(&DataType::Int64)?
.i64()
.map_err(|e| compute_err("array_insert: position column", e))?
.clone();
let elem_casted = elem_series.cast(&inner_dtype)?;
let pos_len = pos_ca.len();
let pos_vec: Vec<i64> = (0..pos_len).map(|i| pos_ca.get(i).unwrap_or(1)).collect();
let elem_len = elem_casted.len();
let elem_vec: Vec<Option<AnyValue>> = (0..elem_len).map(|i| elem_casted.get(i).ok()).collect();
let idx = RefCell::new(0usize);
let out = list_ca.try_apply_amortized(|amort_s| {
let i = *idx.borrow();
*idx.borrow_mut() += 1;
let pi = if pos_len == 1 { 0 } else { i };
let ei = if elem_len == 1 { 0 } else { i };
let list_s = amort_s.as_ref().as_list();
let pos_val = pos_vec.get(pi).copied().unwrap_or(1);
let mut acc: Vec<Series> = Vec::new();
for e in list_s.amortized_iter().flatten() {
acc.push(e.deep_clone());
}
let len = acc.len() as i64;
let pos = if pos_val < 0 {
(len + pos_val + 1).max(0).min(len) as usize
} else {
((pos_val - 1).max(0)).min(len) as usize
};
let single = elem_vec
.get(ei)
.and_then(|o| o.as_ref())
.map(|av: &AnyValue| any_value_to_single_series(av.clone(), &inner_dtype));
if let Some(Ok(s)) = single {
acc.insert(pos, s);
}
if acc.is_empty() {
Ok(Series::new_empty(PlSmallStr::EMPTY, &inner_dtype))
} else {
let mut result = acc.remove(0);
for s in acc {
result.extend(&s)?;
}
Ok(result)
}
})?;
Ok(Some(Column::new(name, out.into_series())))
}
fn series_to_set_key(s: &Series) -> String {
if s.len() == 1 {
if let Ok(av) = s.get(0) {
return format!("{:?}", av);
}
}
std::string::ToString::to_string(s)
}
pub fn apply_array_except(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"array_except needs two columns (array1, array2)".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series
.list()
.map_err(|e| compute_err("array_except", e))?;
let b_ca = b_series
.list()
.map_err(|e| compute_err("array_except", e))?;
let inner_dtype = a_ca.inner_dtype().clone();
let mut builder = polars::chunked_array::builder::get_list_builder(
&inner_dtype,
64,
a_ca.len(),
name.as_str().into(),
);
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
match (opt_a, opt_b) {
(Some(a_amort), Some(b_amort)) => {
let a_list = a_amort.as_ref().as_list();
let b_list = b_amort.as_ref().as_list();
let b_keys: std::collections::HashSet<String> = b_list
.amortized_iter()
.flatten()
.map(|e| series_to_set_key(&e.deep_clone()))
.collect();
let mut acc: Vec<Series> = Vec::new();
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
for e in a_list.amortized_iter().flatten() {
let s = e.deep_clone();
let key = series_to_set_key(&s);
if !b_keys.contains(&key) && seen.insert(key) {
acc.push(s);
}
}
let result = if acc.is_empty() {
Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
} else {
let mut r = acc.remove(0);
for s in acc {
r.extend(&s)?;
}
r
};
builder.append_series(&result)?;
}
_ => builder.append_null(),
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
pub fn apply_arrays_overlap(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"arrays_overlap needs two columns (array1, array2)".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series
.list()
.map_err(|e| compute_err("arrays_overlap", e))?;
let b_ca = b_series
.list()
.map_err(|e| compute_err("arrays_overlap", e))?;
let mut results: Vec<bool> = Vec::with_capacity(a_ca.len());
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
let overlap = match (opt_a, opt_b) {
(Some(a_amort), Some(b_amort)) => {
let a_list = a_amort.as_ref().as_list();
let b_list = b_amort.as_ref().as_list();
let a_keys: std::collections::HashSet<String> = a_list
.amortized_iter()
.flatten()
.map(|e| series_to_set_key(&e.deep_clone()))
.collect();
let b_keys: std::collections::HashSet<String> = b_list
.amortized_iter()
.flatten()
.map(|e| series_to_set_key(&e.deep_clone()))
.collect();
!a_keys.is_disjoint(&b_keys)
}
_ => false,
};
results.push(overlap);
}
let out =
BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter().map(Some));
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_arrays_zip(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"arrays_zip needs at least two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let n = columns.len();
let mut series_vec: Vec<Series> = Vec::with_capacity(n);
for col in columns.iter_mut() {
series_vec.push(std::mem::take(col).take_materialized_series());
}
let list_cas: Vec<_> = series_vec
.iter()
.map(|s| s.list().map_err(|e| compute_err("arrays_zip", e)))
.collect::<PolarsResult<Vec<_>>>()?;
let len = list_cas[0].len();
let inner_dtype = list_cas[0].inner_dtype().clone();
use polars::chunked_array::StructChunked;
use polars::chunked_array::builder::get_list_builder;
use polars::datatypes::Field;
let struct_fields: Vec<Field> = (0..n)
.map(|i| Field::new(format!("field_{i}").into(), inner_dtype.clone()))
.collect();
let out_struct = DataType::Struct(struct_fields);
let mut builder = get_list_builder(&out_struct, 64, len, name.as_str().into());
for row_idx in 0..len {
let mut max_len = 0usize;
let mut row_lists: Vec<Vec<Series>> = Vec::with_capacity(n);
for ca in &list_cas {
let opt_amort = ca.amortized_iter().nth(row_idx).flatten();
if let Some(amort) = opt_amort {
let list_s = amort.as_ref().as_list();
let elems: Vec<Series> = list_s
.amortized_iter()
.flatten()
.map(|e| e.deep_clone())
.collect();
max_len = max_len.max(elems.len());
row_lists.push(elems);
} else {
row_lists.push(vec![]);
}
}
if max_len == 0 {
builder.append_null();
} else {
let mut struct_parts: Vec<Vec<Series>> =
(0..n).map(|_| Vec::with_capacity(max_len)).collect();
for i in 0..max_len {
for (j, lst) in row_lists.iter().enumerate() {
let elem = lst
.get(i)
.cloned()
.unwrap_or_else(|| Series::full_null(PlSmallStr::EMPTY, 1, &inner_dtype));
struct_parts[j].push(elem);
}
}
let field_series: Vec<Series> = struct_parts
.into_iter()
.enumerate()
.map(|(j, mut parts)| {
let mut r = parts.remove(0);
for s in parts {
let _ = r.extend(&s);
}
r.with_name(format!("field_{j}").as_str().into())
})
.collect();
let field_refs: Vec<&Series> = field_series.iter().collect();
let st =
StructChunked::from_series(PlSmallStr::EMPTY, max_len, field_refs.iter().copied())
.map_err(|e| compute_err("arrays_zip struct", e))?
.into_series();
builder.append_series(&st)?;
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
pub fn apply_array_intersect(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"array_intersect needs two columns (array1, array2)".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series
.list()
.map_err(|e| compute_err("array_intersect", e))?;
let b_ca = b_series
.list()
.map_err(|e| compute_err("array_intersect", e))?;
let inner_dtype = a_ca.inner_dtype().clone();
let mut builder = polars::chunked_array::builder::get_list_builder(
&inner_dtype,
64,
a_ca.len(),
name.as_str().into(),
);
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
match (opt_a, opt_b) {
(Some(a_amort), Some(b_amort)) => {
let a_list = a_amort.as_ref().as_list();
let b_list = b_amort.as_ref().as_list();
let b_keys: std::collections::HashSet<String> = b_list
.amortized_iter()
.flatten()
.map(|e| series_to_set_key(&e.deep_clone()))
.collect();
let mut acc: Vec<Series> = Vec::new();
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
for e in a_list.amortized_iter().flatten() {
let s = e.deep_clone();
let key = series_to_set_key(&s);
if b_keys.contains(&key) && seen.insert(key) {
acc.push(s);
}
}
let result = if acc.is_empty() {
Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
} else {
let mut r = acc.remove(0);
for s in acc {
r.extend(&s)?;
}
r
};
builder.append_series(&result)?;
}
_ => builder.append_null(),
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
pub fn apply_array_union(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"array_union needs two columns (array1, array2)".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series.list().map_err(|e| compute_err("array_union", e))?;
let b_ca = b_series.list().map_err(|e| compute_err("array_union", e))?;
let inner_dtype = a_ca.inner_dtype().clone();
let mut builder = polars::chunked_array::builder::get_list_builder(
&inner_dtype,
64,
a_ca.len(),
name.as_str().into(),
);
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
match (opt_a, opt_b) {
(Some(a_amort), Some(b_amort)) => {
let a_list = a_amort.as_ref().as_list();
let b_list = b_amort.as_ref().as_list();
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut acc: Vec<Series> = Vec::new();
for e in a_list.amortized_iter().flatten() {
let s = e.deep_clone();
let key = series_to_set_key(&s);
if seen.insert(key) {
acc.push(s);
}
}
for e in b_list.amortized_iter().flatten() {
let s = e.deep_clone();
let key = series_to_set_key(&s);
if seen.insert(key) {
acc.push(s);
}
}
let result = if acc.is_empty() {
Series::new_empty(PlSmallStr::EMPTY, &inner_dtype)
} else {
let mut r = acc.remove(0);
for s in acc {
r.extend(&s)?;
}
r
};
builder.append_series(&result)?;
}
_ => builder.append_null(),
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
pub fn apply_str_to_map(
column: Column,
pair_delim: &str,
key_value_delim: &str,
) -> PolarsResult<Option<Column>> {
use polars::chunked_array::StructChunked;
use polars::chunked_array::builder::get_list_builder;
use polars::datatypes::Field;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("str_to_map", e))?;
let out_struct = DataType::Struct(vec![
Field::new("key".into(), DataType::String),
Field::new("value".into(), DataType::String),
]);
let mut builder = get_list_builder(&out_struct, 64, ca.len(), name.as_str().into());
for opt_s in ca.into_iter() {
if let Some(s) = opt_s {
let pairs: Vec<(String, String)> = s
.split(pair_delim)
.filter_map(|part| {
let kv: Vec<&str> = part.splitn(2, key_value_delim).collect();
if kv.len() >= 2 {
Some((kv[0].to_string(), kv[1].to_string()))
} else if kv.len() == 1 && !kv[0].is_empty() {
Some((kv[0].to_string(), String::new()))
} else {
None
}
})
.collect();
if pairs.is_empty() {
builder.append_null();
} else {
let keys: Vec<String> = pairs.iter().map(|(k, _)| k.clone()).collect();
let vals: Vec<String> = pairs.iter().map(|(_, v)| v.clone()).collect();
let k_series = Series::new("key".into(), keys);
let v_series = Series::new("value".into(), vals);
let fields: [&Series; 2] = [&k_series, &v_series];
let st = StructChunked::from_series(
PlSmallStr::EMPTY,
pairs.len(),
fields.iter().copied(),
)
.map_err(|e| compute_err("str_to_map struct", e))?
.into_series();
builder.append_series(&st)?;
}
} else {
builder.append_null();
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
pub fn apply_map_concat(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use polars::chunked_array::StructChunked;
use polars::chunked_array::builder::get_list_builder;
use polars::datatypes::Field;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"map_concat needs at least two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series.list().map_err(|e| compute_err("map_concat", e))?;
let b_ca = b_series.list().map_err(|e| compute_err("map_concat", e))?;
let struct_dtype = a_ca.inner_dtype().clone();
let (key_dtype, value_dtype) = match &struct_dtype {
DataType::Struct(fields) => {
let k = fields
.iter()
.find(|f| f.name == "key")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
let v = fields
.iter()
.find(|f| f.name == "value")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
(k, v)
}
_ => (DataType::String, DataType::String),
};
let out_struct = DataType::Struct(vec![
Field::new("key".into(), key_dtype),
Field::new("value".into(), value_dtype),
]);
let n = a_ca.len();
let mut builder = get_list_builder(&out_struct, 64, n, name.as_str().into());
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
let mut merged: std::collections::BTreeMap<String, (Series, Series)> =
std::collections::BTreeMap::new();
for amort in [opt_a, opt_b].into_iter().flatten() {
let list_s = amort.as_ref().as_list();
for elem in list_s.amortized_iter().flatten() {
let s = elem.deep_clone();
let st = s
.struct_()
.map_err(|e| compute_err("map_concat struct", e))?;
let k_s = st
.field_by_name("key")
.map_err(|e| compute_err("map_concat key", e))?;
let v_s = st
.field_by_name("value")
.map_err(|e| compute_err("map_concat value", e))?;
let key = std::string::ToString::to_string(&k_s);
merged.insert(key, (k_s, v_s));
}
}
if merged.is_empty() {
builder.append_null();
} else {
let mut row_structs: Vec<Series> = Vec::new();
for (_, (k_s, v_s)) in merged {
let len = k_s.len();
let fields: [&Series; 2] = [&k_s, &v_s];
let st = StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
.map_err(|e| compute_err("map_concat build", e))?
.into_series();
row_structs.push(st);
}
let mut combined = row_structs.remove(0);
for s in row_structs {
combined.extend(&s)?;
}
builder.append_series(&combined)?;
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
pub fn apply_map_contains_key(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"map_contains_key needs two columns (map, key)".into(),
));
}
let name = columns[0].field().into_owned().name;
let map_series = std::mem::take(&mut columns[0]).take_materialized_series();
let key_series = std::mem::take(&mut columns[1]).take_materialized_series();
let map_ca = map_series
.list()
.map_err(|e| compute_err("map_contains_key", e))?;
let key_str = key_series.cast(&DataType::String)?;
let key_vec: Vec<String> = (0..key_str.len())
.map(|i| key_str.get(i).map(|av| av.to_string()).unwrap_or_default())
.collect();
let key_len = key_vec.len();
let mut results: Vec<bool> = Vec::with_capacity(map_ca.len());
for (i, opt_amort) in map_ca.amortized_iter().enumerate() {
let target = key_vec
.get(if key_len == 1 { 0 } else { i })
.map(|s| s.as_str())
.unwrap_or("");
let mut found = false;
if let Some(amort) = opt_amort {
let list_s = amort.as_ref().as_list();
for elem in list_s.amortized_iter().flatten() {
let s = elem.deep_clone();
if let Ok(st) = s.struct_() {
if let Ok(k) = st.field_by_name("key") {
let k_str: String =
k.get(0).ok().map(|av| av.to_string()).unwrap_or_default();
if k_str == target {
found = true;
break;
}
}
}
}
}
results.push(found);
}
let out =
BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter().map(Some));
Ok(Some(Column::new(name, out.into_series())))
}
fn parse_json_object_str(s: &str) -> Option<serde_json::Map<String, serde_json::Value>> {
let s = s.trim();
let v: serde_json::Value = serde_json::from_str(s).ok()?;
if let Some(obj) = v.as_object() {
return Some(obj.clone());
}
if let Some(inner) = v.as_str() {
return parse_json_object_str(inner);
}
None
}
fn json_value_to_option_string(v: &serde_json::Value) -> Option<String> {
match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s.clone()),
other => Some(other.to_string()),
}
}
fn infer_dtype_from_json_values(values: &[Option<serde_json::Value>]) -> DataType {
for j in values.iter().flatten() {
return match j {
serde_json::Value::Null => continue,
serde_json::Value::Number(n) => {
if n.is_i64() {
DataType::Int64
} else {
DataType::Float64
}
}
serde_json::Value::Bool(_) => DataType::Boolean,
serde_json::Value::String(_) => DataType::String,
serde_json::Value::Array(_) | serde_json::Value::Object(_) => DataType::String,
};
}
DataType::String
}
pub fn apply_get(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"get needs two columns (map, key)".into(),
));
}
let name = columns[0].field().into_owned().name;
let mut map_series = std::mem::take(&mut columns[0]).take_materialized_series();
let key_series = std::mem::take(&mut columns[1]).take_materialized_series();
let key_str_series = key_series
.cast(&DataType::String)
.map_err(|e| compute_err("get cast key to string", e))?;
let key_utf8 = key_str_series
.str()
.map_err(|e| compute_err("get key utf8", e))?;
let key_len = key_utf8.len();
let map_len = map_series.len();
if let DataType::Struct(field_metas) = map_series.dtype() {
let st = map_series
.struct_()
.map_err(|e| compute_err("get struct", e))?;
let fields_series = st.fields_as_series();
use std::collections::HashMap;
let mut key_to_field_and_row: HashMap<String, (usize, usize)> = HashMap::new();
for (field_idx, meta) in field_metas.iter().enumerate() {
let series = &fields_series[field_idx];
for row_idx in 0..series.len() {
if let Ok(av) = series.get(row_idx) {
if !matches!(av, polars::prelude::AnyValue::Null) {
key_to_field_and_row
.entry(meta.name.to_string())
.or_insert((field_idx, row_idx));
break;
}
}
}
}
let value_dtype = field_metas
.first()
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
let out_len = map_len.max(key_len);
let mut result_series: Vec<Series> = Vec::with_capacity(out_len);
for i in 0..out_len {
let key_idx = if key_len == 1 { 0 } else { i };
let key_name = key_utf8.get(key_idx).unwrap_or("");
let one_series =
if let Some((field_idx, rep_row)) = key_to_field_and_row.get(key_name).copied() {
let series = &fields_series[field_idx];
let av = series
.get(rep_row)
.unwrap_or(polars::prelude::AnyValue::Null);
any_value_to_single_series(av, &value_dtype)?
} else {
Series::full_null(PlSmallStr::EMPTY, 1, &value_dtype)
};
result_series.push(one_series);
}
let mut out = result_series.remove(0);
for s in result_series {
out.extend(&s)?;
}
return Ok(Some(Column::new(name, out)));
}
let map_len_initial = map_len;
if map_len_initial == 1 && key_len > 1 {
let indices: Vec<u32> = (0..key_len).map(|_| 0u32).collect();
let idx_ca = UInt32Chunked::from_vec("".into(), indices);
map_series = map_series
.take(&idx_ca)
.map_err(|e| compute_err("get broadcast map", e))?;
}
if map_series.dtype() == &DataType::String {
let map_ca = map_series
.str()
.map_err(|e| compute_err("get map str", e))?;
let out_len = map_ca.len();
let values: Vec<Option<serde_json::Value>> = (0..out_len)
.map(|i| {
let key_idx = if key_len == 1 { 0 } else { i };
let key = key_utf8.get(key_idx).unwrap_or("");
let map_s = map_ca.get(i).unwrap_or("");
let obj = parse_json_object_str(map_s);
obj.and_then(|m| m.get(key).or(m.get(key.trim())).cloned())
})
.collect();
let value_dtype = infer_dtype_from_json_values(&values);
let out_series: Series = match &value_dtype {
DataType::Int64 => Int64Chunked::from_iter_options(
name.as_str().into(),
values.iter().map(|v| v.as_ref().and_then(|j| j.as_i64())),
)
.into_series(),
DataType::Float64 => Float64Chunked::from_iter_options(
name.as_str().into(),
values.iter().map(|v| v.as_ref().and_then(|j| j.as_f64())),
)
.into_series(),
DataType::Boolean => BooleanChunked::from_iter_options(
name.as_str().into(),
values.iter().map(|v| v.as_ref().and_then(|j| j.as_bool())),
)
.into_series(),
_ => StringChunked::from_iter_options(
name.as_str().into(),
values
.iter()
.map(|v| v.as_ref().and_then(json_value_to_option_string)),
)
.into_series(),
};
return Ok(Some(Column::new(name, out_series)));
}
let map_ca = map_series.list().map_err(|e| compute_err("get", e))?;
let map_len = map_ca.len();
let value_dtype = match map_ca.inner_dtype() {
DataType::Struct(fields) => fields
.iter()
.find(|f| f.name == "value")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String),
_ => DataType::String,
};
let mut result_series: Vec<Series> = Vec::with_capacity(map_len);
for (i, opt_amort) in map_ca.amortized_iter().enumerate() {
let key_idx = if key_len == 1 { 0 } else { i };
let target = key_utf8.get(key_idx).unwrap_or("");
let target_trim = target.trim();
let mut found: Option<Series> = None;
if let Some(amort) = opt_amort {
let list_s = amort.as_ref().as_list();
for elem in list_s.amortized_iter().flatten() {
let s = elem.deep_clone();
if let Ok(st) = s.struct_() {
if let Ok(k) = st.field_by_name("key") {
let k_str: String = k
.get(0)
.ok()
.map(|av| match av {
polars::prelude::AnyValue::String(s) => s.to_string(),
_ => av.to_string(),
})
.unwrap_or_default();
if k_str.trim() == target_trim {
if let Ok(v) = st.field_by_name("value") {
found = Some(v);
}
break;
}
}
}
}
}
result_series
.push(found.unwrap_or_else(|| Series::full_null(PlSmallStr::EMPTY, 1, &value_dtype)));
}
let mut out = result_series.remove(0);
for s in result_series {
out.extend(&s)?;
}
Ok(Some(Column::new(name, out)))
}
pub fn apply_ascii(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("ascii", e))?;
let out = Int32Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_s| opt_s.and_then(|s| s.chars().next().map(|c| c as i32))),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_format_number(column: Column, decimals: u32) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let prec = decimals as usize;
let out: StringChunked = match series.dtype() {
DataType::Float64 => {
let ca = series.f64().map_err(|e| compute_err("format_number", e))?;
StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
)
}
DataType::Float32 => {
let ca = series.f32().map_err(|e| compute_err("format_number", e))?;
StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
)
}
_ => {
let f64_series = series
.cast(&DataType::Float64)
.map_err(|e| compute_err("format_number cast", e))?;
let ca = f64_series
.f64()
.map_err(|e| compute_err("format_number", e))?;
StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_v| opt_v.map(|v| format!("{v:.prec$}"))),
)
}
};
Ok(Some(Column::new(name, out.into_series())))
}
fn series_value_at_as_string(s: &Series, i: usize) -> Option<String> {
match s.dtype() {
DataType::String => s.str().ok()?.get(i).map(|v| v.to_string()),
DataType::Int32 => s.i32().ok()?.get(i).map(|v| v.to_string()),
DataType::Int64 => s.i64().ok()?.get(i).map(|v| v.to_string()),
DataType::Float32 => s.f32().ok()?.get(i).map(|v| v.to_string()),
DataType::Float64 => s.f64().ok()?.get(i).map(|v| v.to_string()),
DataType::Boolean => s.bool().ok()?.get(i).map(|v| v.to_string()),
_ => s.get(i).ok().map(|av| av.to_string()),
}
}
pub fn apply_format_string(columns: &mut [Column], format: &str) -> PolarsResult<Option<Column>> {
let n = columns.len();
if n == 0 {
return Err(PolarsError::ComputeError(
"format_string needs at least one column".into(),
));
}
let name = columns[0].field().into_owned().name;
let mut series_vec: Vec<Series> = Vec::with_capacity(n);
for col in columns.iter_mut() {
series_vec.push(std::mem::take(col).take_materialized_series());
}
let len = series_vec[0].len();
let mut result = Vec::with_capacity(len);
for i in 0..len {
let values: Vec<Option<String>> = series_vec
.iter()
.map(|s| series_value_at_as_string(s, i))
.collect();
let out = format_string_row(format, &values);
result.push(out);
}
let out = StringChunked::from_iter_options(name.as_str().into(), result.into_iter());
Ok(Some(Column::new(name, out.into_series())))
}
fn format_string_row(format: &str, values: &[Option<String>]) -> Option<String> {
const NULL_STR: &str = "null";
let mut out = String::new();
let mut idx = 0usize;
let mut chars = format.chars().peekable();
while let Some(c) = chars.next() {
if c != '%' {
out.push(c);
continue;
}
if let Some('%') = chars.peek().copied() {
chars.next();
out.push('%');
continue;
}
let mut flags = String::new();
while let Some(ch) = chars.peek().copied() {
if "-+ 0#".contains(ch) {
flags.push(ch);
chars.next();
} else {
break;
}
}
let mut width_str = String::new();
while let Some(ch) = chars.peek().copied() {
if ch.is_ascii_digit() {
width_str.push(ch);
chars.next();
} else {
break;
}
}
let width = width_str.parse::<usize>().ok();
let mut precision: Option<usize> = None;
if let Some('.') = chars.peek().copied() {
chars.next();
let mut prec_str = String::new();
while let Some(ch) = chars.peek().copied() {
if ch.is_ascii_digit() {
prec_str.push(ch);
chars.next();
} else {
break;
}
}
precision = prec_str.parse::<usize>().ok();
}
while let Some(ch) = chars.peek().copied() {
if matches!(ch, 'h' | 'l' | 'L') {
chars.next();
} else {
break;
}
}
let spec = chars.next()?;
if idx >= values.len() {
return None;
}
let val_opt = &values[idx];
idx += 1;
let zero_pad = flags.contains('0');
match spec {
's' => {
let s = val_opt.as_deref().unwrap_or(NULL_STR);
out.push_str(s);
}
'd' | 'i' | 'o' | 'x' | 'X' => {
if val_opt.is_none() {
out.push_str(NULL_STR);
continue;
}
let s_raw = val_opt.as_deref().unwrap();
let mut n: i64 = match s_raw.parse() {
Ok(v) => v,
Err(_) => {
out.push_str(s_raw);
continue;
}
};
let negative = n < 0;
if negative {
n = -n;
}
let mut s = match spec {
'd' | 'i' => format!("{n}"),
'o' => format!("{n:o}"),
'x' => format!("{n:x}"),
'X' => format!("{n:X}"),
_ => unreachable!(),
};
if let Some(w) = width {
if s.len() < w {
let pad_len = w - s.len();
let pad_char = if zero_pad { '0' } else { ' ' };
let pad = pad_char.to_string().repeat(pad_len);
s = format!("{pad}{s}");
}
}
if negative {
if zero_pad {
if let Some(w) = width {
if s.len() < w {
let pad_len = w - s.len();
let pad = "0".repeat(pad_len);
s = format!("-{pad}{s}");
} else {
s = format!("-{s}");
}
} else {
s = format!("-{s}");
}
} else {
s = format!("-{s}");
}
}
out.push_str(&s);
}
'f' | 'g' | 'e' | 'E' => {
if val_opt.is_none() {
out.push_str(NULL_STR);
continue;
}
let s_raw = val_opt.as_deref().unwrap();
let v: f64 = match s_raw.parse() {
Ok(v) => v,
Err(_) => {
out.push_str(s_raw);
continue;
}
};
let prec = precision.unwrap_or(6);
let formatted = match spec {
'f' => format!("{v:.prec$}"),
'g' => format!("{v:.prec$}"),
'e' => format!("{v:.prec$e}"),
'E' => format!("{v:.prec$E}"),
_ => unreachable!(),
};
out.push_str(&formatted);
}
'%' => {
out.push('%');
}
_ => {
return None;
}
}
}
Some(out)
}
pub fn apply_find_in_set(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"find_in_set needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let str_series = std::mem::take(&mut columns[0]).take_materialized_series();
let set_series = std::mem::take(&mut columns[1]).take_materialized_series();
let str_ca = str_series
.str()
.map_err(|e| compute_err("find_in_set", e))?;
let set_ca = set_series
.str()
.map_err(|e| compute_err("find_in_set", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
str_ca
.into_iter()
.zip(set_ca)
.map(|(opt_str, opt_set)| match (opt_str, opt_set) {
(Some(s), Some(set_str)) => {
if s.contains(',') {
Some(0i64)
} else {
let parts: Vec<&str> = set_str.split(',').collect();
let idx = parts.iter().position(|p| *p == s);
Some(idx.map(|i| (i + 1) as i64).unwrap_or(0))
}
}
_ => None,
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_regexp_extract_lookaround(
column: Column,
pattern: &str,
group_index: usize,
) -> PolarsResult<Option<Column>> {
use fancy_regex::Regex;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("regexp_extract", e))?;
let re = Regex::new(pattern).map_err(|e| {
compute_err(
&format!("regexp_extract invalid regex (lookaround) '{pattern}'"),
e,
)
})?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let caps_opt = re.captures(s).ok().flatten();
if let Some(caps) = caps_opt {
if let Some(m) = caps.get(group_index) {
return m.as_str().to_string();
}
}
String::new()
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_regexp_extract(
column: Column,
pattern: &str,
group_index: usize,
) -> PolarsResult<Option<Column>> {
use polars::prelude::*;
use regex::Regex;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let casted = if series.dtype() == &DataType::String {
series
} else {
series
.cast(&DataType::String)
.map_err(|e| compute_err("regexp_extract cast", e))?
};
let ca = casted.str().map_err(|e| compute_err("regexp_extract", e))?;
let re = Regex::new(pattern)
.map_err(|e| compute_err(&format!("regexp_extract invalid regex '{pattern}'"), e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let caps_opt = re.captures(s);
if let Some(caps) = caps_opt {
if let Some(m) = caps.get(group_index) {
return m.as_str().to_string();
}
}
String::new()
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_regexp_extract_all_group(
column: Column,
pattern: &str,
group_index: usize,
) -> PolarsResult<Option<Column>> {
use fancy_regex::Regex;
use polars::prelude::*;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("regexp_extract_all", e))?;
let re = Regex::new(pattern)
.map_err(|e| compute_err(&format!("regexp_extract_all invalid regex '{pattern}'"), e))?;
let mut out: ListChunked = ca
.into_iter()
.map(|opt_s| {
opt_s.map(|s| {
let mut vals: Vec<String> = Vec::new();
for caps in re.captures_iter(s).flatten() {
if let Some(m) = caps.get(group_index) {
vals.push(m.as_str().to_string());
}
}
Series::new(PlSmallStr::EMPTY, vals)
})
})
.collect();
out.rename(name.clone());
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_regexp_like_lookaround(column: Column, pattern: &str) -> PolarsResult<Option<Column>> {
use fancy_regex::Regex;
use polars::prelude::*;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let casted = if series.dtype() == &DataType::String {
series
} else {
series
.cast(&DataType::String)
.map_err(|e| compute_err("rlike cast", e))?
};
let ca = casted.str().map_err(|e| compute_err("rlike", e))?;
let re = Regex::new(pattern)
.map_err(|e| compute_err(&format!("rlike invalid regex '{pattern}'"), e))?;
let out = BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_s| opt_s.map(|s| re.is_match(s).unwrap_or(false))),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_regexp_instr(
column: Column,
pattern: String,
group_idx: usize,
) -> PolarsResult<Option<Column>> {
use regex::Regex;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("regexp_instr", e))?;
let re = Regex::new(&pattern)
.map_err(|e| compute_err(&format!("regexp_instr invalid regex '{pattern}'"), e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let m = if group_idx == 0 {
re.find(s)
} else {
re.captures(s).and_then(|cap| cap.get(group_idx))
};
m.map(|m| (m.start() + 1) as i64)
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_base64(column: Column) -> PolarsResult<Option<Column>> {
use base64::Engine;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("base64", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| base64::engine::general_purpose::STANDARD.encode(s.as_bytes()))
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_unbase64(column: Column) -> PolarsResult<Option<Column>> {
use base64::Engine;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("unbase64", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let decoded = base64::engine::general_purpose::STANDARD
.decode(s.as_bytes())
.ok()?;
String::from_utf8(decoded).ok()
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_sha1(column: Column) -> PolarsResult<Option<Column>> {
use sha1::Digest;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("sha1", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let mut hasher = sha1::Sha1::new();
hasher.update(s.as_bytes());
format!("{:x}", hasher.finalize())
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_sha2(column: Column, bit_length: i32) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("sha2", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let bytes = s.as_bytes();
use sha2::Digest;
match bit_length {
256 => format!("{:x}", sha2::Sha256::digest(bytes)),
384 => format!("{:x}", sha2::Sha384::digest(bytes)),
512 => format!("{:x}", sha2::Sha512::digest(bytes)),
_ => format!("{:x}", sha2::Sha256::digest(bytes)),
}
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_md5(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("md5", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_s| opt_s.map(|s| format!("{:x}", md5::compute(s.as_bytes())))),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_encode(column: Column, charset: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("encode", e))?;
let cs = charset.to_lowercase();
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
let bytes: Vec<u8> = match cs.as_str() {
"utf-8" | "utf8" => s.as_bytes().to_vec(),
_ => s.as_bytes().to_vec(), };
hex::encode(bytes)
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_decode(column: Column, charset: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("decode", e))?;
let _ = charset;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let bytes = hex::decode(s.as_bytes()).ok()?;
String::from_utf8(bytes).ok()
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_to_binary(column: Column, fmt: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("to_binary", e))?;
let fmt_lower = fmt.to_lowercase();
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let hex_str = match fmt_lower.as_str() {
"hex" => {
hex::decode(s.as_bytes()).ok()?;
Some(s.to_string())
}
"utf-8" | "utf8" => Some(hex::encode(s.as_bytes())),
_ => Some(hex::encode(s.as_bytes())),
};
hex_str
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_try_to_binary(column: Column, fmt: &str) -> PolarsResult<Option<Column>> {
apply_to_binary(column, fmt)
}
fn aes_gcm_encrypt_one(plaintext: &[u8], key: &[u8]) -> Option<String> {
use aes_gcm::Aes128Gcm;
use aes_gcm::aead::generic_array::GenericArray;
use aes_gcm::aead::{Aead, KeyInit};
use rand::RngCore;
let key_arr: [u8; 16] = key
.iter()
.copied()
.chain(std::iter::repeat(0))
.take(16)
.collect::<Vec<_>>()
.try_into()
.ok()?;
let cipher = Aes128Gcm::new(GenericArray::from_slice(&key_arr));
let mut nonce = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce);
let nonce = GenericArray::from_slice(&nonce);
let ciphertext = cipher.encrypt(nonce, plaintext).ok()?;
let mut out = nonce.as_slice().to_vec();
out.extend(ciphertext);
Some(hex::encode(out))
}
fn aes_gcm_decrypt_one(hex_input: &str, key: &[u8]) -> Option<String> {
use aes_gcm::Aes128Gcm;
use aes_gcm::aead::generic_array::GenericArray;
use aes_gcm::aead::{Aead, KeyInit};
let bytes = hex::decode(hex_input.as_bytes()).ok()?;
if bytes.len() < 12 + 16 {
return None; }
let (nonce_bytes, ct) = bytes.split_at(12);
let key_arr: [u8; 16] = key
.iter()
.copied()
.chain(std::iter::repeat(0))
.take(16)
.collect::<Vec<_>>()
.try_into()
.ok()?;
let cipher = Aes128Gcm::new(GenericArray::from_slice(&key_arr));
let nonce = GenericArray::from_slice(nonce_bytes);
let plaintext = cipher.decrypt(nonce, ct).ok()?;
String::from_utf8(plaintext).ok()
}
pub fn apply_aes_encrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("aes_encrypt", e))?;
let key_bytes = key.as_bytes();
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_s| opt_s.and_then(|s| aes_gcm_encrypt_one(s.as_bytes(), key_bytes))),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_aes_decrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("aes_decrypt", e))?;
let key_bytes = key.as_bytes();
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_s| opt_s.and_then(|s| aes_gcm_decrypt_one(s, key_bytes))),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_try_aes_decrypt(column: Column, key: &str) -> PolarsResult<Option<Column>> {
apply_aes_decrypt(column, key)
}
pub fn apply_char(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let to_char = |v: i64| -> String {
let u = v as u32;
if u <= 0x10FFFF {
char::from_u32(u).map(|c| c.to_string()).unwrap_or_default()
} else {
String::new()
}
};
let out: StringChunked = match series.dtype() {
DataType::Int32 => {
let ca = series.i32().map_err(|e| compute_err("char", e))?;
StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_v| opt_v.map(|v| to_char(v as i64))),
)
}
DataType::Int64 => {
let ca = series.i64().map_err(|e| compute_err("char", e))?;
StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_v| opt_v.map(to_char)),
)
}
_ => {
let i64_series = series
.cast(&DataType::Int64)
.map_err(|e| compute_err("char cast", e))?;
let ca = i64_series.i64().map_err(|e| compute_err("char", e))?;
StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_v| opt_v.map(to_char)),
)
}
};
Ok(Some(Column::new(name, out.into_series())))
}
fn date_series_to_days(series: &Series) -> PolarsResult<Int32Chunked> {
let casted = series.cast(&DataType::Date)?;
let days_series = casted.cast(&DataType::Int32)?;
days_series
.i32()
.map_err(|e| compute_err("date_series_to_days", e))
.cloned()
}
fn days_to_naive_date(days: i32) -> Option<chrono::NaiveDate> {
let base = robin_sparkless_core::date_utils::epoch_naive_date();
base.checked_add_signed(chrono::TimeDelta::days(days as i64))
}
fn naivedate_to_days(d: chrono::NaiveDate) -> i32 {
let base = robin_sparkless_core::date_utils::epoch_naive_date();
(d.signed_duration_since(base).num_days()) as i32
}
pub fn apply_add_months(column: Column, n: i32) -> PolarsResult<Option<Column>> {
use chrono::Months;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = date_series_to_days(&series)?;
let u = n.unsigned_abs();
let out = ca.into_iter().map(|opt_d| {
opt_d.and_then(|days| {
let d = days_to_naive_date(days)?;
let next = if n >= 0 {
d.checked_add_months(Months::new(u))?
} else {
d.checked_sub_months(Months::new(u))?
};
Some(naivedate_to_days(next))
})
});
let out = Int32Chunked::from_iter_options(name.as_str().into(), out);
let out_series = out.into_series().cast(&DataType::Date)?;
Ok(Some(Column::new(name, out_series)))
}
fn parse_day_of_week(s: &str) -> Option<u32> {
let s = s.trim().to_lowercase();
match s.as_str() {
"sun" | "sunday" => Some(1),
"mon" | "monday" => Some(2),
"tue" | "tuesday" => Some(3),
"wed" | "wednesday" => Some(4),
"thu" | "thursday" => Some(5),
"fri" | "friday" => Some(6),
"sat" | "saturday" => Some(7),
_ => None,
}
}
fn chrono_weekday_to_dayofweek(w: chrono::Weekday) -> u32 {
match w {
chrono::Weekday::Mon => 2,
chrono::Weekday::Tue => 3,
chrono::Weekday::Wed => 4,
chrono::Weekday::Thu => 5,
chrono::Weekday::Fri => 6,
chrono::Weekday::Sat => 7,
chrono::Weekday::Sun => 1,
}
}
pub fn apply_next_day(column: Column, day_of_week: &str) -> PolarsResult<Option<Column>> {
let target = parse_day_of_week(day_of_week).ok_or_else(|| {
PolarsError::ComputeError(format!("next_day: invalid day '{day_of_week}'").into())
})?;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = date_series_to_days(&series)?;
let out = ca.into_iter().map(|opt_d| {
opt_d.and_then(|days| {
let d = days_to_naive_date(days)?;
let current = chrono_weekday_to_dayofweek(d.weekday());
let diff = if target >= current {
(target - current) as i64
} else {
(7 - (current - target)) as i64
};
let days_to_add = if diff == 0 { 7 } else { diff }; let next = d.checked_add_signed(chrono::TimeDelta::days(days_to_add))?;
Some(naivedate_to_days(next))
})
});
let out = Int32Chunked::from_iter_options(name.as_str().into(), out);
let out_series = out.into_series().cast(&DataType::Date)?;
Ok(Some(Column::new(name, out_series)))
}
pub fn apply_dayname(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = date_series_to_days(&series)?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_d| {
opt_d.and_then(|days| {
let d = days_to_naive_date(days)?;
Some(d.weekday().to_string())
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_weekday(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = date_series_to_days(&series)?;
let out = Int32Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_d| {
opt_d.and_then(|days| {
let d = days_to_naive_date(days)?;
Some(d.weekday().num_days_from_monday() as i32)
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_months_between(
columns: &mut [Column],
round_off: bool,
) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"months_between needs two columns (end, start)".into(),
));
}
let name = columns[0].field().into_owned().name;
let end_series = std::mem::take(&mut columns[0]).take_materialized_series();
let start_series = std::mem::take(&mut columns[1]).take_materialized_series();
let end_ca = date_series_to_days(&end_series)?;
let start_ca = date_series_to_days(&start_series)?;
let out = end_ca
.into_iter()
.zip(&start_ca)
.map(|(oe, os)| match (oe, os) {
(Some(e), Some(s)) => {
let end_d = days_to_naive_date(e)?;
let start_d = days_to_naive_date(s)?;
let month_diff = (end_d.year() - start_d.year()) * 12
+ (end_d.month() as i32 - start_d.month() as i32);
let day_diff = end_d.day() as i32 - start_d.day() as i32;
let months = month_diff as f64 + (day_diff as f64 / 31.0);
Some(if round_off {
(months * 1e8).round() / 1e8
} else {
months
})
}
_ => None,
});
let out = Float64Chunked::from_iter_options(name.as_str().into(), out);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_pow_pyspark(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"pow_pyspark needs base and exponent columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let base_series = std::mem::take(&mut columns[0]).take_materialized_series();
let exp_series = std::mem::take(&mut columns[1]).take_materialized_series();
let base_ca = float_series_to_f64(&base_series)?;
let exp_ca = float_series_to_f64(&exp_series)?;
let scalar_exp = (exp_ca.len() == 1).then(|| exp_ca.get(0));
let scalar_base = (base_ca.len() == 1).then(|| base_ca.get(0));
let out = if let Some(Some(e)) = scalar_exp {
Float64Chunked::from_iter_options(
name.as_str().into(),
base_ca.into_iter().map(|b| {
let b = b?;
let result = if b == 0.0 {
if e > 0.0 {
0.0
} else if e == 0.0 {
1.0
} else {
f64::INFINITY
}
} else {
b.powf(e)
};
Some(result)
}),
)
} else if let Some(Some(b)) = scalar_base {
Float64Chunked::from_iter_options(
name.as_str().into(),
exp_ca.into_iter().map(|e| {
let e = e?;
let result = if b == 0.0 {
if e > 0.0 {
0.0
} else if e == 0.0 {
1.0
} else {
f64::INFINITY
}
} else {
b.powf(e)
};
Some(result)
}),
)
} else {
Float64Chunked::from_iter_options(
name.as_str().into(),
base_ca.into_iter().zip(&exp_ca).map(|(b, e)| {
let (b, e): (f64, f64) = match (b, e) {
(Some(bb), Some(ee)) => (bb, ee),
_ => return None,
};
let result = if b == 0.0 {
if e > 0.0 {
0.0
} else if e == 0.0 {
1.0
} else {
f64::INFINITY
}
} else {
b.powf(e)
};
Some(result)
}),
)
};
Ok(Some(Column::new(name, out.into_series())))
}
fn float_series_to_f64(series: &Series) -> PolarsResult<Float64Chunked> {
if series.dtype() == &DataType::String {
let ca = series
.str()
.map_err(|e| compute_err("float_series_to_f64", e))?;
let name = series.name().as_str().into();
let results: Vec<Option<f64>> = ca
.into_iter()
.map(|opt_s| opt_s.and_then(parse_str_to_double))
.collect();
return Ok(Float64Chunked::from_iter_options(name, results.into_iter()));
}
let casted = series.cast(&DataType::Float64)?;
casted
.f64()
.map_err(|e| compute_err("float_series_to_f64", e))
.cloned()
}
pub fn apply_sin(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::sin).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_cos(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::cos).into_series();
Ok(Some(Column::new(name, out)))
}
fn bround_one(x: f64, scale: i32) -> f64 {
if x.is_nan() || x.is_infinite() {
return x;
}
let factor = 10_f64.powi(scale);
let scaled = x * factor;
let rounded =
if scaled.fract().abs() > 0.5_f64 - 1e-10 && scaled.fract().abs() < 0.5_f64 + 1e-10 {
let floor_val = scaled.trunc();
if floor_val as i64 % 2 == 0 {
floor_val
} else {
floor_val + scaled.signum()
}
} else {
scaled.round()
};
rounded / factor
}
fn conv_one(s: &str, from_base: i32, to_base: i32) -> Option<String> {
if !(2..=36).contains(&from_base) || !(2..=36).contains(&to_base) {
return None;
}
let s_trim = s.trim();
if s_trim.is_empty() {
return None;
}
let n = i64::from_str_radix(s_trim, from_base as u32).ok()?;
let to_b = to_base as u32;
const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz";
if n == 0 {
return Some("0".to_string());
}
let mut result = String::new();
let mut val = if n < 0 {
result.push('-');
n.unsigned_abs()
} else {
n as u64
};
let mut buf = String::new();
while val > 0 {
buf.push(DIGITS[(val % to_b as u64) as usize] as char);
val /= to_b as u64;
}
result.push_str(&buf.chars().rev().collect::<String>());
Some(result)
}
pub fn apply_conv(column: Column, from_base: i32, to_base: i32) -> PolarsResult<Option<Column>> {
use std::borrow::Cow;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = if series.dtype() == &DataType::String {
let ca = series.str().map_err(|e| compute_err("conv", e))?;
ca.apply(|opt_s| opt_s.and_then(|s| conv_one(s, from_base, to_base).map(Cow::Owned)))
.into_series()
} else if series.dtype() == &DataType::Int64 {
let ca = series.i64().map_err(|e| compute_err("conv", e))?;
let to_b = to_base as u32;
const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz";
let format_int = |n: i64| -> Option<String> {
if !(2..=36).contains(&to_b) {
return None;
}
if n == 0 {
return Some("0".to_string());
}
let mut result = String::new();
let mut val = if n < 0 {
result.push('-');
n.unsigned_abs()
} else {
n as u64
};
let mut buf = String::new();
while val > 0 {
buf.push(DIGITS[(val % to_b as u64) as usize] as char);
val /= to_b as u64;
}
result.push_str(&buf.chars().rev().collect::<String>());
Some(result)
};
let out_ca = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt| opt.and_then(format_int)),
);
out_ca.into_series()
} else {
let s_str = series.cast(&DataType::String)?;
let ca = s_str
.str()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
ca.apply(|opt_s| opt_s.and_then(|s| conv_one(s, from_base, to_base).map(Cow::Owned)))
.into_series()
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_hex(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = if series.dtype() == &DataType::Int64 || series.dtype() == &DataType::Int32 {
let s = series.cast(&DataType::Int64)?;
let ca = s
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out_ca = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt| opt.map(|n| format!("{n:X}"))),
);
out_ca.into_series()
} else if series.dtype() == &DataType::String {
let ca = series
.str()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out_ca = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt| {
opt.map(|s| {
s.as_bytes()
.iter()
.map(|b| format!("{b:02X}"))
.collect::<String>()
})
}),
);
out_ca.into_series()
} else {
let s = series.cast(&DataType::Int64)?;
let ca = s
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out_ca = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt| opt.map(|n| format!("{n:X}"))),
);
out_ca.into_series()
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_unhex(column: Column) -> PolarsResult<Option<Column>> {
use std::borrow::Cow;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("unhex", e))?;
let unhex_one = |s: &str| -> Option<Vec<u8>> {
let s = s.trim();
let chars: Vec<char> = if s.len() % 2 == 1 {
s.chars().skip(1).collect()
} else {
s.chars().collect()
};
let mut bytes = Vec::with_capacity(chars.len() / 2);
for chunk in chars.chunks(2) {
let hex_pair: String = chunk.iter().collect();
let byte = u8::from_str_radix(&hex_pair, 16).ok()?;
bytes.push(byte);
}
Some(bytes)
};
let out = ca
.apply(|opt_s| {
opt_s.and_then(|s| {
unhex_one(s)
.and_then(|b| String::from_utf8(b).ok())
.map(Cow::Owned)
})
})
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_bin(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let s = series.cast(&DataType::Int64)?;
let ca = s
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out_ca = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt| opt.map(|n| format!("{n:b}"))),
);
Ok(Some(Column::new(name, out_ca.into_series())))
}
pub fn apply_getbit(column: Column, pos: i64) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let s = series.cast(&DataType::Int64)?;
let ca = s
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let pos = pos.max(0);
let out_ca = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt| opt.map(|n| (n >> pos) & 1)),
);
Ok(Some(Column::new(name, out_ca.into_series())))
}
pub fn apply_bit_count(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let s = series.cast(&DataType::Int64)?;
let ca = s
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out_ca = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt| opt.map(|n| i64::from(n.count_ones()))),
);
Ok(Some(Column::new(name, out_ca.into_series())))
}
pub fn apply_assert_true(column: Column, err_msg: Option<&str>) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.bool()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let len = ca.len();
let failed = ca.into_iter().any(|opt| match opt {
Some(true) => false,
Some(false) | None => true,
});
if failed {
let msg = err_msg
.map(String::from)
.unwrap_or_else(|| format!("assert_true failed on column '{name}'"));
return Err(PolarsError::ComputeError(msg.into()));
}
let null_col = BooleanChunked::from_iter_options(name.as_str().into(), (0..len).map(|_| None));
Ok(Some(Column::new(name, null_col.into_series())))
}
pub fn apply_rand_with_seed(column: Column, seed: Option<u64>) -> PolarsResult<Option<Column>> {
use rand::Rng;
use rand::SeedableRng;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let n = series.len();
let values: Vec<f64> = if let Some(s) = seed {
let mut rng = rand::rngs::StdRng::seed_from_u64(s);
(0..n).map(|_| rng.r#gen::<f64>()).collect()
} else {
let mut rng = rand::thread_rng();
(0..n).map(|_| rng.r#gen::<f64>()).collect()
};
let out = Float64Chunked::from_vec(name.as_str().into(), values);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_randn_with_seed(column: Column, seed: Option<u64>) -> PolarsResult<Option<Column>> {
use rand::SeedableRng;
use rand_distr::Distribution;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let n = series.len();
let dist = rand_distr::StandardNormal;
let values: Vec<f64> = if let Some(s) = seed {
let mut rng = rand::rngs::StdRng::seed_from_u64(s);
(0..n).map(|_| dist.sample(&mut rng)).collect()
} else {
let mut rng = rand::thread_rng();
(0..n).map(|_| dist.sample(&mut rng)).collect()
};
let out = Float64Chunked::from_vec(name.as_str().into(), values);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn series_rand_n(name: &str, n: usize, seed: Option<u64>) -> Series {
use rand::Rng;
use rand::SeedableRng;
let values: Vec<f64> = if let Some(s) = seed {
let mut rng = rand::rngs::StdRng::seed_from_u64(s);
(0..n).map(|_| rng.r#gen::<f64>()).collect()
} else {
let mut rng = rand::thread_rng();
(0..n).map(|_| rng.r#gen::<f64>()).collect()
};
Float64Chunked::from_vec(name.into(), values).into_series()
}
pub fn series_randn_n(name: &str, n: usize, seed: Option<u64>) -> Series {
use rand::SeedableRng;
use rand_distr::Distribution;
let dist = rand_distr::StandardNormal;
let values: Vec<f64> = if let Some(s) = seed {
let mut rng = rand::rngs::StdRng::seed_from_u64(s);
(0..n).map(|_| dist.sample(&mut rng)).collect()
} else {
let mut rng = rand::thread_rng();
(0..n).map(|_| dist.sample(&mut rng)).collect()
};
Float64Chunked::from_vec(name.into(), values).into_series()
}
pub fn apply_bit_and(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"bit_and needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_cast = a_series.cast(&DataType::Int64)?;
let b_cast = b_series.cast(&DataType::Int64)?;
let a = a_cast
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let b = b_cast
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
(Some(x), Some(y)) => Some(x & y),
_ => None,
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_bit_or(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError("bit_or needs two columns".into()));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_cast = a_series.cast(&DataType::Int64)?;
let b_cast = b_series.cast(&DataType::Int64)?;
let a = a_cast
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let b = b_cast
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
(Some(x), Some(y)) => Some(x | y),
_ => None,
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_coerce_to_int64_for_bitwise(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = series.cast(&DataType::Int64).or_else(|_| {
Series::new_null(PlSmallStr::from(name.as_str()), series.len()).cast(&DataType::Int64)
})?;
Ok(Some(Column::new(name, out)))
}
pub fn apply_bit_xor(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"bit_xor needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_cast = a_series.cast(&DataType::Int64)?;
let b_cast = b_series.cast(&DataType::Int64)?;
let a = a_cast
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let b = b_cast
.i64()
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(b).map(|(av, bv)| match (av, bv) {
(Some(x), Some(y)) => Some(x ^ y),
_ => None,
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_round(column: Column, scale: i32) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let factor = 10_f64.powi(scale);
let out = ca
.apply_values(|x| (x * factor).round() / factor)
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_bround(column: Column, scale: i32) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(|x| bround_one(x, scale)).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_cot(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(|x| 1.0 / x.tan()).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_csc(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(|x| 1.0 / x.sin()).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_sec(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(|x| 1.0 / x.cos()).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_tan(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::tan).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_asin(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::asin).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_acos(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::acos).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_atan(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::atan).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_atan2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"atan2 needs two columns (y, x)".into(),
));
}
let name = columns[0].field().into_owned().name;
let y_series = std::mem::take(&mut columns[0]).take_materialized_series();
let x_series = std::mem::take(&mut columns[1]).take_materialized_series();
let y_ca = float_series_to_f64(&y_series)?;
let x_ca = float_series_to_f64(&x_series)?;
let out = y_ca.into_iter().zip(&x_ca).map(|(oy, ox)| match (oy, ox) {
(Some(y), Some(x)) => Some(f64::atan2(y, x)),
_ => None,
});
let out = Float64Chunked::from_iter_options(name.as_str().into(), out);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_degrees(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(|r| r.to_degrees()).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_radians(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(|d| d.to_radians()).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_signum(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca
.apply_values(|v| {
if v > 0.0 {
1.0
} else if v < 0.0 {
-1.0
} else {
0.0
}
})
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_cosh(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::cosh).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_sinh(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::sinh).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_tanh(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::tanh).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_acosh(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::acosh).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_asinh(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::asinh).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_atanh(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::atanh).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_cbrt(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::cbrt).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_expm1(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::exp_m1).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_log1p(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::ln_1p).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_log10(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::log10).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_log2(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::log2).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_rint(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = float_series_to_f64(&series)?;
let out = ca.apply_values(f64::round).into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_greatest2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"greatest2 needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let out = match (a_series.dtype(), b_series.dtype()) {
(DataType::Float64, _) | (_, DataType::Float64) => {
let a = float_series_to_f64(&a_series)?;
let b = float_series_to_f64(&b_series)?;
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(x.max(y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
(DataType::Int64, _)
| (DataType::Int32, _)
| (_, DataType::Int64)
| (_, DataType::Int32) => {
let a = a_series.cast(&DataType::Int64)?;
let b = b_series.cast(&DataType::Int64)?;
let ca_a = a.i64().map_err(|e| compute_err("greatest", e))?;
let ca_b = b.i64().map_err(|e| compute_err("greatest", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(x.max(y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
(DataType::String, _) | (_, DataType::String) => {
let a = a_series.cast(&DataType::String)?;
let b = b_series.cast(&DataType::String)?;
let ca_a = a.str().map_err(|e| compute_err("greatest", e))?;
let ca_b = b.str().map_err(|e| compute_err("greatest", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(if x >= y { x } else { y }),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
_ => {
let a = float_series_to_f64(&a_series)?;
let b = float_series_to_f64(&b_series)?;
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(x.max(y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_least2(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError("least2 needs two columns".into()));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let out = match (a_series.dtype(), b_series.dtype()) {
(DataType::Float64, _) | (_, DataType::Float64) => {
let a = float_series_to_f64(&a_series)?;
let b = float_series_to_f64(&b_series)?;
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(x.min(y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
(DataType::Int64, _)
| (DataType::Int32, _)
| (_, DataType::Int64)
| (_, DataType::Int32) => {
let a = a_series.cast(&DataType::Int64)?;
let b = b_series.cast(&DataType::Int64)?;
let ca_a = a.i64().map_err(|e| compute_err("least", e))?;
let ca_b = b.i64().map_err(|e| compute_err("least", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(x.min(y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
(DataType::String, _) | (_, DataType::String) => {
let a = a_series.cast(&DataType::String)?;
let b = b_series.cast(&DataType::String)?;
let ca_a = a.str().map_err(|e| compute_err("least", e))?;
let ca_b = b.str().map_err(|e| compute_err("least", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter().zip(ca_b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(if x <= y { x } else { y }),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
_ => {
let a = float_series_to_f64(&a_series)?;
let b = float_series_to_f64(&b_series)?;
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) => Some(x.min(y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}),
);
out.into_series()
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_map_from_arrays(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use polars::chunked_array::StructChunked;
use polars::chunked_array::builder::get_list_builder;
use polars::datatypes::Field;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"map_from_arrays needs two columns (keys, values)".into(),
));
}
let name = columns[0].field().into_owned().name;
let keys_series = std::mem::take(&mut columns[0]).take_materialized_series();
let values_series = std::mem::take(&mut columns[1]).take_materialized_series();
let keys_ca = keys_series
.list()
.map_err(|e| compute_err("map_from_arrays keys", e))?;
let values_ca = values_series
.list()
.map_err(|e| compute_err("map_from_arrays values", e))?;
let key_dtype = keys_ca.inner_dtype().clone();
let value_dtype = values_ca.inner_dtype().clone();
let struct_dtype = DataType::Struct(vec![
Field::new("key".into(), key_dtype),
Field::new("value".into(), value_dtype),
]);
let mut builder = get_list_builder(&struct_dtype, 64, keys_ca.len(), name.as_str().into());
for (opt_k, opt_v) in keys_ca.amortized_iter().zip(values_ca.amortized_iter()) {
match (opt_k, opt_v) {
(Some(k_amort), Some(v_amort)) => {
let k_list = k_amort.as_ref().as_list();
let v_list = v_amort.as_ref().as_list();
let mut row_structs: Vec<Series> = Vec::new();
for (opt_ke, opt_ve) in k_list.amortized_iter().zip(v_list.amortized_iter()) {
if let (Some(ke), Some(ve)) = (opt_ke, opt_ve) {
let ke_s = ke.deep_clone();
let ve_s = ve.deep_clone();
let len = ke_s.len();
let fields: [&Series; 2] = [&ke_s, &ve_s];
let st = StructChunked::from_series(
PlSmallStr::EMPTY,
len,
fields.iter().copied(),
)
.map_err(|e| compute_err("struct", e))?
.into_series();
row_structs.push(st);
}
}
if row_structs.is_empty() {
builder
.append_series(&Series::new_empty(PlSmallStr::EMPTY, &struct_dtype))
.map_err(|e| compute_err("builder", e))?;
} else {
let mut combined = row_structs.remove(0);
for s in row_structs {
combined.extend(&s)?;
}
builder
.append_series(&combined)
.map_err(|e| compute_err("builder", e))?;
}
}
_ => {
builder.append_null();
}
}
}
let out = builder.finish().into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_zip_arrays_to_struct(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use polars::chunked_array::StructChunked;
use polars::chunked_array::builder::get_list_builder;
use polars::datatypes::Field;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"zip_arrays_to_struct needs two columns (left, right)".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series
.list()
.map_err(|e| compute_err("zip_with left", e))?;
let b_ca = b_series
.list()
.map_err(|e| compute_err("zip_with right", e))?;
let left_dtype = a_ca.inner_dtype().clone();
let right_dtype = b_ca.inner_dtype().clone();
let struct_dtype = DataType::Struct(vec![
Field::new("left".into(), left_dtype.clone()),
Field::new("right".into(), right_dtype.clone()),
]);
let mut builder = get_list_builder(&struct_dtype, 64, a_ca.len(), name.as_str().into());
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
match (opt_a, opt_b) {
(Some(a_amort), Some(b_amort)) => {
let a_list = a_amort.as_ref().as_list();
let b_list = b_amort.as_ref().as_list();
let a_elems: Vec<Series> = a_list
.amortized_iter()
.flatten()
.map(|amort| amort.deep_clone())
.collect();
let b_elems: Vec<Series> = b_list
.amortized_iter()
.flatten()
.map(|amort| amort.deep_clone())
.collect();
let max_len = a_elems.len().max(b_elems.len());
let mut row_structs: Vec<Series> = Vec::new();
for i in 0..max_len {
let left_s = a_elems.get(i).cloned();
let right_s = b_elems.get(i).cloned();
let (mut left_series, mut right_series) = match (left_s, right_s) {
(Some(l), Some(r)) => (l, r),
(Some(l), None) => {
let r = Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[AnyValue::Null],
&right_dtype,
false,
)
.map_err(|e| compute_err("zip null", e))?;
(l, r)
}
(None, Some(r)) => {
let l = Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[AnyValue::Null],
&left_dtype,
false,
)
.map_err(|e| compute_err("zip null", e))?;
(l, r)
}
(None, None) => {
let mut l = Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[AnyValue::Null],
&left_dtype,
false,
)
.map_err(|e| compute_err("zip null", e))?;
l.rename("left".into());
let mut r = Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[AnyValue::Null],
&right_dtype,
false,
)
.map_err(|e| compute_err("zip null", e))?;
r.rename("right".into());
(l, r)
}
};
left_series.rename("left".into());
right_series.rename("right".into());
let len = left_series.len();
let fields: [&Series; 2] = [&left_series, &right_series];
let st =
StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
.map_err(|e| compute_err("zip struct", e))?
.into_series();
row_structs.push(st);
}
if row_structs.is_empty() {
builder
.append_series(&Series::new_empty(PlSmallStr::EMPTY, &struct_dtype))
.map_err(|e| compute_err("zip builder", e))?;
} else {
let mut combined = row_structs.remove(0);
for s in row_structs {
combined.extend(&s)?;
}
builder
.append_series(&combined)
.map_err(|e| compute_err("zip builder", e))?;
}
}
_ => builder.append_null(),
}
}
let out = builder.finish().into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_map_zip_to_struct(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use polars::chunked_array::StructChunked;
use polars::chunked_array::builder::get_list_builder;
use polars::datatypes::Field;
use std::collections::BTreeMap;
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"map_zip_to_struct needs two columns (map1, map2)".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a_ca = a_series
.list()
.map_err(|e| compute_err("map_zip_with map1", e))?;
let b_ca = b_series
.list()
.map_err(|e| compute_err("map_zip_with map2", e))?;
let struct_dtype_in = a_ca.inner_dtype().clone();
let (key_dtype, value_dtype) = match &struct_dtype_in {
DataType::Struct(fields) => {
let k = fields
.iter()
.find(|f| f.name == "key")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
let v = fields
.iter()
.find(|f| f.name == "value")
.map(|f| f.dtype.clone())
.unwrap_or(DataType::String);
(k, v)
}
_ => (DataType::String, DataType::String),
};
let out_struct_dtype = DataType::Struct(vec![
Field::new("key".into(), key_dtype.clone()),
Field::new("value1".into(), value_dtype.clone()),
Field::new("value2".into(), value_dtype.clone()),
]);
let mut builder = get_list_builder(&out_struct_dtype, 64, a_ca.len(), name.as_str().into());
for (opt_a, opt_b) in a_ca.amortized_iter().zip(b_ca.amortized_iter()) {
match (opt_a, opt_b) {
(Some(a_amort), Some(b_amort)) => {
let a_list = a_amort.as_ref().as_list();
let b_list = b_amort.as_ref().as_list();
let mut key_to_vals: BTreeMap<String, (Series, Option<Series>, Option<Series>)> =
BTreeMap::new();
for elem in a_list.amortized_iter().flatten() {
let s = elem.deep_clone();
if let Ok(st) = s.struct_() {
if let (Ok(k), Ok(v)) = (st.field_by_name("key"), st.field_by_name("value"))
{
let key_str: String = std::string::ToString::to_string(&k);
key_to_vals
.entry(key_str.clone())
.or_insert_with(|| (k.clone(), None, None))
.1 = Some(v);
}
}
}
for elem in b_list.amortized_iter().flatten() {
let s = elem.deep_clone();
if let Ok(st) = s.struct_() {
if let (Ok(k), Ok(v)) = (st.field_by_name("key"), st.field_by_name("value"))
{
let key_str: String = std::string::ToString::to_string(&k);
key_to_vals
.entry(key_str.clone())
.or_insert_with(|| (k.clone(), None, None))
.2 = Some(v);
}
}
}
let mut row_structs: Vec<Series> = Vec::new();
for (_, (k_series, v1_opt, v2_opt)) in key_to_vals {
let mut k_renamed = k_series;
k_renamed.rename("key".into());
let v1_fallback = || {
Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[AnyValue::Null],
&value_dtype,
false,
)
.map_err(|e| compute_err("map_zip null fallback", e))
};
let mut v1_series = match v1_opt {
Some(s) => s,
None => v1_fallback()?,
};
v1_series.rename("value1".into());
let v2_fallback = || {
Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[AnyValue::Null],
&value_dtype,
false,
)
.map_err(|e| compute_err("map_zip null fallback", e))
};
let mut v2_series = match v2_opt {
Some(s) => s,
None => v2_fallback()?,
};
v2_series.rename("value2".into());
let len = k_renamed.len();
let fields: [&Series; 3] = [&k_renamed, &v1_series, &v2_series];
let st =
StructChunked::from_series(PlSmallStr::EMPTY, len, fields.iter().copied())
.map_err(|e| compute_err("map_zip struct", e))?
.into_series();
row_structs.push(st);
}
if row_structs.is_empty() {
builder
.append_series(&Series::new_empty(PlSmallStr::EMPTY, &out_struct_dtype))
.map_err(|e| compute_err("map_zip builder", e))?;
} else {
let mut combined = row_structs.remove(0);
for s in row_structs {
combined.extend(&s)?;
}
builder
.append_series(&combined)
.map_err(|e| compute_err("map_zip builder", e))?;
}
}
_ => builder.append_null(),
}
}
let out = builder.finish().into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_typeof(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let dtype_str = format!("{:?}", series.dtype());
let len = series.len();
let out = StringChunked::from_iter_options(
name.as_str().into(),
(0..len).map(|_| Some(dtype_str.clone())),
);
Ok(Some(Column::new(name, out.into_series())))
}
fn binary_series_i64(
a: &Series,
b: &Series,
ctx: &str,
) -> PolarsResult<(Int64Chunked, Int64Chunked)> {
let ca_a = a.i64().map_err(|e| compute_err(ctx, e))?.clone();
let ca_b = b.i64().map_err(|e| compute_err(ctx, e))?.clone();
Ok((ca_a, ca_b))
}
fn binary_series_i32(
a: &Series,
b: &Series,
ctx: &str,
) -> PolarsResult<(Int32Chunked, Int32Chunked)> {
let ca_a = a.i32().map_err(|e| compute_err(ctx, e))?.clone();
let ca_b = b.i32().map_err(|e| compute_err(ctx, e))?.clone();
Ok((ca_a, ca_b))
}
fn binary_series_f64(
a: &Series,
b: &Series,
ctx: &str,
) -> PolarsResult<(Float64Chunked, Float64Chunked)> {
let a_f = a.cast(&DataType::Float64)?;
let b_f = b.cast(&DataType::Float64)?;
let ca_a = a_f.f64().map_err(|e| compute_err(ctx, e))?.clone();
let ca_b = b_f.f64().map_err(|e| compute_err(ctx, e))?.clone();
Ok((ca_a, ca_b))
}
fn series_to_f64_pyspark(s: &Series, ctx: &str) -> PolarsResult<Float64Chunked> {
match s.dtype() {
DataType::String => {
let name = s.name();
let ca = s.str().map_err(|e| compute_err(ctx, e))?;
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|raw| {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
trimmed.parse::<f64>().ok()
})
}),
);
Ok(out)
}
DataType::Null => {
Ok(Float64Chunked::full_null(PlSmallStr::EMPTY, s.len()))
}
_ => {
let casted = s
.cast(&DataType::Float64)
.map_err(|e| compute_err(ctx, e))?;
casted.f64().cloned().map_err(|e| compute_err(ctx, e))
}
}
}
#[allow(clippy::useless_conversion)]
fn pyspark_binary_arith(
columns: &mut [Column],
ctx: &str,
op: fn(f64, f64) -> f64,
) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
format!("{ctx} needs two columns").into(),
));
}
let name = columns[0].field().into_owned().name;
let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
let mut ca_a = series_to_f64_pyspark(&a_s, ctx)?;
let mut ca_b = series_to_f64_pyspark(&b_s, ctx)?;
let len_a = ca_a.len();
let len_b = ca_b.len();
if len_a == 1 && len_b > 1 {
let val = ca_a.get(0);
ca_a = Float64Chunked::from_iter_options(
name.as_str().into(),
std::iter::repeat_n(val, len_b),
);
} else if len_b == 1 && len_a > 1 {
let val = ca_b.get(0);
ca_b = Float64Chunked::from_iter_options(
name.as_str().into(),
std::iter::repeat_n(val, len_a),
);
}
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(ca_b.into_iter())
.map(|(oa, ob)| match (oa, ob) {
(Some(a), Some(b)) => Some(op(a, b)),
_ => None,
}),
)
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_pyspark_add(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
pyspark_binary_arith(columns, "pyspark_add", |a, b| a + b)
}
pub fn apply_pyspark_subtract(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
pyspark_binary_arith(columns, "pyspark_subtract", |a, b| a - b)
}
pub fn apply_pyspark_multiply(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
pyspark_binary_arith(columns, "pyspark_multiply", |a, b| a * b)
}
#[allow(clippy::useless_conversion)] pub fn apply_pyspark_divide(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"pyspark_divide needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
let mut ca_a = series_to_f64_pyspark(&a_s, "pyspark_divide")?;
let mut ca_b = series_to_f64_pyspark(&b_s, "pyspark_divide")?;
let len_a = ca_a.len();
let len_b = ca_b.len();
if len_a == 1 && len_b > 1 {
let val = ca_a.get(0);
ca_a = Float64Chunked::from_iter_options(
name.as_str().into(),
std::iter::repeat_n(val, len_b),
);
} else if len_b == 1 && len_a > 1 {
let val = ca_b.get(0);
ca_b = Float64Chunked::from_iter_options(
name.as_str().into(),
std::iter::repeat_n(val, len_a),
);
}
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(ca_b.into_iter())
.map(|(oa, ob)| match (oa, ob) {
(Some(a), Some(b)) => {
if b == 0.0 {
None } else {
Some(a / b)
}
}
_ => None,
}),
)
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_pyspark_mod(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
pyspark_binary_arith(columns, "pyspark_mod", |a, b| a % b)
}
pub fn apply_try_add(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"try_add needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
let out = match (a_s.dtype(), b_s.dtype()) {
(DataType::Int64, DataType::Int64) => {
let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_add")?;
Int64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_add(b)))),
)
.into_series()
}
(DataType::Int32, DataType::Int32) => {
let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_add")?;
Int32Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_add(b)))),
)
.into_series()
}
_ => {
let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_add")?;
Float64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a + b))),
)
.into_series()
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_try_subtract(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"try_subtract needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
let out = match (a_s.dtype(), b_s.dtype()) {
(DataType::Int64, DataType::Int64) => {
let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_subtract")?;
Int64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_sub(b)))),
)
.into_series()
}
(DataType::Int32, DataType::Int32) => {
let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_subtract")?;
Int32Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_sub(b)))),
)
.into_series()
}
_ => {
let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_subtract")?;
Float64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a - b))),
)
.into_series()
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_try_multiply(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError(
"try_multiply needs two columns".into(),
));
}
let name = columns[0].field().into_owned().name;
let a_s = std::mem::take(&mut columns[0]).take_materialized_series();
let b_s = std::mem::take(&mut columns[1]).take_materialized_series();
let out = match (a_s.dtype(), b_s.dtype()) {
(DataType::Int64, DataType::Int64) => {
let (ca_a, ca_b) = binary_series_i64(&a_s, &b_s, "try_multiply")?;
Int64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_mul(b)))),
)
.into_series()
}
(DataType::Int32, DataType::Int32) => {
let (ca_a, ca_b) = binary_series_i32(&a_s, &b_s, "try_multiply")?;
Int32Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.and_then(|b| a.checked_mul(b)))),
)
.into_series()
}
_ => {
let (ca_a, ca_b) = binary_series_f64(&a_s, &b_s, "try_multiply")?;
Float64Chunked::from_iter_options(
name.as_str().into(),
ca_a.into_iter()
.zip(&ca_b)
.map(|(oa, ob)| oa.and_then(|a| ob.map(|b| a * b))),
)
.into_series()
}
};
Ok(Some(Column::new(name, out)))
}
fn unquote_simple_date_format(s: &str) -> String {
let mut out = String::with_capacity(s.len());
let mut chars = s.chars().peekable();
while let Some(c) = chars.next() {
if c == '\'' {
if chars.peek() == Some(&'\'') {
chars.next();
out.push('\'');
} else {
for q in chars.by_ref() {
if q == '\'' {
break;
}
out.push(q);
}
}
} else {
out.push(c);
}
}
out
}
pub(crate) fn pyspark_format_to_chrono(s: &str) -> String {
let s = unquote_simple_date_format(s);
s.replace("yyyy", "%Y")
.replace("MM", "%m")
.replace("dd", "%d")
.replace("HH", "%H")
.replace("mm", "%M")
.replace("ss", "%S")
.replace("EEEE", "%A")
.replace("EEE", "%a")
.replace("EE", "%a")
.replace("E", "%a")
}
pub fn apply_unix_timestamp(column: Column, format: Option<&str>) -> PolarsResult<Option<Column>> {
use chrono::{DateTime, NaiveDateTime, Utc};
let chrono_fmt = format
.map(pyspark_format_to_chrono)
.unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = match series.dtype() {
DataType::String => {
let ca = series.str().map_err(|e| compute_err("unix_timestamp", e))?;
Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
NaiveDateTime::parse_from_str(s, &chrono_fmt)
.ok()
.map(|ndt| DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc).timestamp())
})
}),
)
}
DataType::Datetime(_, _) => {
let casted = series
.cast(&DataType::Int64)
.map_err(|e| compute_err("unix_timestamp datetime cast", e))?;
let ca = casted
.i64()
.map_err(|e| compute_err("unix_timestamp datetime", e))?;
Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_us| opt_us.map(|us| us / 1_000_000)),
)
}
DataType::Date => {
let casted = series
.cast(&DataType::Int32)
.map_err(|e| compute_err("unix_timestamp date cast", e))?;
let ca = casted
.i32()
.map_err(|e| compute_err("unix_timestamp date", e))?;
Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_days| opt_days.map(|d| (d as i64) * 86_400)),
)
}
_ => {
return Err(PolarsError::ComputeError(
format!(
"unix_timestamp: invalid series dtype: expected `String`, got `{}` for series with name `{}`",
series.dtype(),
name.as_str()
)
.into(),
))
}
};
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_from_unixtime(column: Column, format: Option<&str>) -> PolarsResult<Option<Column>> {
use chrono::{DateTime, Utc};
let chrono_fmt = format
.map(pyspark_format_to_chrono)
.unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let casted = series
.cast(&DataType::Int64)
.map_err(|e| compute_err("from_unixtime cast", e))?;
let ca = casted.i64().map_err(|e| compute_err("from_unixtime", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_secs| {
opt_secs.and_then(|secs| {
DateTime::<Utc>::from_timestamp(secs, 0)
.map(|dt| dt.format(&chrono_fmt).to_string())
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_make_timestamp(
columns: &mut [Column],
timezone: Option<&str>,
) -> PolarsResult<Option<Column>> {
use chrono::{NaiveDate, Utc};
use polars::datatypes::TimeUnit;
if columns.len() < 6 {
return Err(PolarsError::ComputeError(
"make_timestamp needs six columns (year, month, day, hour, min, sec)".into(),
));
}
let tz: Option<Tz> = timezone
.map(|s| {
s.parse()
.map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {s}").into()))
})
.transpose()?;
let name = columns[0].field().into_owned().name;
let series: Vec<Series> = (0..6)
.map(|i| std::mem::take(&mut columns[i]).take_materialized_series())
.collect();
let ca: Vec<Int32Chunked> = series
.iter()
.map(|s| {
let c = s.cast(&DataType::Int32)?;
Ok(c.i32()
.map_err(|e| compute_err("make_timestamp", e))?
.clone())
})
.collect::<PolarsResult<Vec<_>>>()?;
let len = ca[0].len();
let out =
Int64Chunked::from_iter_options(
name.as_str().into(),
(0..len).map(|i| {
let y = ca[0].get(i)?;
let m = ca[1].get(i)?;
let d = ca[2].get(i)?;
let h = ca[3].get(i).unwrap_or(0);
let min = ca[4].get(i).unwrap_or(0);
let s = ca[5].get(i).unwrap_or(0);
let date = NaiveDate::from_ymd_opt(y, m as u32, d as u32)?;
let naive = date.and_hms_opt(h as u32, min as u32, s as u32)?;
match &tz {
Some(tz) => tz.from_local_datetime(&naive).single().map(
|dt_tz: chrono::DateTime<Tz>| dt_tz.with_timezone(&Utc).timestamp_micros(),
),
None => Some(naive.and_utc().timestamp_micros()),
}
}),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
Ok(Some(Column::new(name, out_series)))
}
pub(crate) fn is_simple_column_ref(expr: &polars::prelude::Expr) -> bool {
use polars::prelude::Expr as PlExpr;
let mut e = expr;
while let PlExpr::Alias(inner, _) = e {
e = inner.as_ref();
}
matches!(e, PlExpr::Column(_))
}
pub fn apply_to_timestamp_format(
column: Column,
format: Option<&str>,
strict: bool,
use_recent_null: bool,
) -> PolarsResult<Option<Column>> {
use chrono::NaiveDateTime;
use polars::datatypes::TimeUnit;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
if series.dtype() != &DataType::String {
let dtype = series.dtype();
match dtype {
DataType::Datetime(_, _) => {
let out_series = series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
return Ok(Some(Column::new(name, out_series)));
}
DataType::Date => {
let out_series = series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
return Ok(Some(Column::new(name, out_series)));
}
DataType::Int32 | DataType::Int64 => {
let casted = series.cast(&DataType::Int64)?;
let ca = casted.i64().map_err(|e| compute_err("to_timestamp", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| opt_s.map(|secs| secs * 1_000_000)),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
return Ok(Some(Column::new(name, out_series)));
}
DataType::Float64 => {
let ca = series.f64().map_err(|e| compute_err("to_timestamp", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|secs| (secs * 1_000_000.0).round() as i64)
}),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
return Ok(Some(Column::new(name, out_series)));
}
DataType::Boolean => {
let len = series.len();
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
(0..len).map(|_| None::<i64>),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
return Ok(Some(Column::new(name, out_series)));
}
_ => {
return Err(PolarsError::ComputeError(
"to_timestamp requires StringType, TimestampType, IntegerType, LongType, DateType, or DoubleType"
.into(),
))
}
}
}
let chrono_fmt = format
.map(pyspark_format_to_chrono)
.unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
let strict_iso_no_fraction = format
.map(|f| f.trim() == "yyyy-MM-dd'T'HH:mm:ss")
.unwrap_or(false);
let ref_ts = chrono::Utc::now();
let recent_cutoff = ref_ts - chrono::Duration::days(31);
let chrono_fmt_alt = chrono_fmt.replace('T', " ");
let ca = series.str().map_err(|e| compute_err("to_timestamp", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let s = s.trim();
if strict_iso_no_fraction && (s.len() != 19 || s.contains('.')) {
return None;
}
let ndt = NaiveDateTime::parse_from_str(s, &chrono_fmt)
.or_else(|_| NaiveDateTime::parse_from_str(s, &chrono_fmt_alt))
.ok()?;
let parsed_utc = ndt.and_utc();
if use_recent_null && parsed_utc >= recent_cutoff && parsed_utc <= ref_ts {
return None;
}
Some(parsed_utc.timestamp_micros())
})
}),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
if strict {
Ok(Some(Column::new(name, out_series)))
} else {
Ok(Some(Column::new(name, out_series)))
}
}
pub fn apply_to_timestamp_strip_fraction_recent_null(
column: Column,
format: &str,
ref_ts: chrono::DateTime<chrono::Utc>,
) -> PolarsResult<Option<Column>> {
use chrono::NaiveDateTime;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
if series.dtype() != &DataType::String {
return Err(PolarsError::ComputeError(
"to_timestamp fused path requires StringType".into(),
));
}
let chrono_fmt = pyspark_format_to_chrono(format);
let chrono_fmt_alt = chrono_fmt.replace('T', " ");
let ca = series.str().map_err(|e| compute_err("to_timestamp", e))?;
let recent_cutoff = ref_ts - chrono::Duration::days(31);
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let s = s.trim();
let stripped = if let Some(dot) = s.find('.') {
&s[..dot]
} else {
s
};
let ndt = NaiveDateTime::parse_from_str(stripped, &chrono_fmt)
.or_else(|_| NaiveDateTime::parse_from_str(stripped, &chrono_fmt_alt))
.ok()?;
let parsed_utc = ndt.and_utc();
if parsed_utc >= recent_cutoff && parsed_utc <= ref_ts {
return None;
}
Some(parsed_utc.timestamp_micros())
})
}),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
Ok(Some(Column::new(name, out_series)))
}
fn has_tz_offset(s: &str) -> bool {
let s = s.trim();
if s.ends_with('Z') || s.ends_with('z') {
return true;
}
if s.len() >= 5 {
let rest = &s[s.len() - 5..];
if (rest.starts_with('+') || rest.starts_with('-'))
&& rest[1..].chars().all(|c| c.is_ascii_digit())
{
return true;
}
}
if s.len() >= 6 {
let rest = &s[s.len() - 6..];
if (rest.starts_with('+') || rest.starts_with('-'))
&& !rest.contains(':')
&& rest[1..].chars().all(|c| c.is_ascii_digit())
{
return true;
}
}
false
}
fn parse_timestamp_string_with_offset(s: &str) -> Option<i64> {
use chrono::DateTime;
let s = s.trim();
let s_normalized = if s.ends_with('Z') || s.ends_with("z") {
format!("{}+0000", s.trim_end_matches(['Z', 'z']))
} else {
s.to_string()
};
let s_ref = s_normalized.as_str();
let with_tz = DateTime::parse_from_str(s_ref, "%Y-%m-%dT%H:%M:%S%.f%z")
.or_else(|_| DateTime::parse_from_str(s_ref, "%Y-%m-%dT%H:%M:%S%z"))
.or_else(|_| DateTime::parse_from_str(s_ref, "%Y-%m-%d %H:%M:%S%.f%z"))
.or_else(|_| DateTime::parse_from_str(s_ref, "%Y-%m-%d %H:%M:%S%z"));
if let Ok(dt_offset) = with_tz {
return Some(dt_offset.with_timezone(&chrono::Utc).timestamp_micros());
}
None
}
#[allow(dead_code)]
pub(crate) fn parse_timestamp_string_flexible(s: &str) -> Option<i64> {
parse_timestamp_string_flexible_with_tz(s, None)
}
pub(crate) fn parse_timestamp_string_flexible_with_tz(
s: &str,
session_tz: Option<&str>,
) -> Option<i64> {
use chrono::{NaiveDate, NaiveDateTime, TimeZone};
let s = s.trim();
if has_tz_offset(s) {
if let Some(utc_micros) = parse_timestamp_string_with_offset(s) {
return Some(utc_micros);
}
}
let s_no_tz = s
.strip_suffix('Z')
.or_else(|| s.strip_suffix("z"))
.unwrap_or(s);
let s_no_tz = if s_no_tz.len() >= 5 {
let rest = &s_no_tz[s_no_tz.len() - 5..];
if (rest.starts_with('+') || rest.starts_with('-'))
&& rest[1..].chars().all(|c| c.is_ascii_digit())
{
&s_no_tz[..s_no_tz.len() - 5]
} else if s_no_tz.len() >= 6 {
let rest = &s_no_tz[s_no_tz.len() - 6..];
if (rest.starts_with('+') || rest.starts_with('-'))
&& !rest.contains(':')
&& rest[1..].chars().all(|c| c.is_ascii_digit())
{
&s_no_tz[..s_no_tz.len() - 6]
} else {
s_no_tz
}
} else {
s_no_tz
}
} else {
s_no_tz
};
let to_utc_micros = |ndt: NaiveDateTime| {
let tz_str = session_tz.unwrap_or("UTC");
if tz_str.eq_ignore_ascii_case("UTC") {
Some(ndt.and_utc().timestamp_micros())
} else {
tz_str.parse::<Tz>().ok().and_then(|tz| {
tz.from_local_datetime(&ndt)
.single()
.map(|dt| dt.with_timezone(&chrono::Utc).timestamp_micros())
})
}
};
if let Some(dot) = s_no_tz.find('.') {
let base = &s_no_tz[..dot];
let subsec = &s_no_tz[dot + 1..];
let subsec_digits: String = subsec.chars().take_while(|c| c.is_ascii_digit()).collect();
if let Ok(ndt) = NaiveDateTime::parse_from_str(base, "%Y-%m-%dT%H:%M:%S")
.or_else(|_| NaiveDateTime::parse_from_str(base, "%Y-%m-%d %H:%M:%S"))
{
let micros = if subsec_digits.is_empty() {
0_i64
} else {
let frac = subsec_digits.parse::<u32>().unwrap_or(0) as i64;
let digits = subsec_digits.len();
(if digits <= 6 {
frac * 10_i64.pow((6 - digits) as u32)
} else {
frac / 10_i64.pow((digits - 6) as u32)
})
.min(999_999)
};
let ndt = ndt + chrono::TimeDelta::microseconds(micros);
return to_utc_micros(ndt);
}
}
const FORMATS: &[&str] = &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"];
for fmt in FORMATS {
if let Ok(ndt) = NaiveDateTime::parse_from_str(s_no_tz, fmt) {
return to_utc_micros(ndt);
}
}
if let Ok(d) = NaiveDate::parse_from_str(s_no_tz, "%Y-%m-%d") {
let ndt = d.and_hms_opt(0, 0, 0).unwrap();
return to_utc_micros(ndt);
}
None
}
fn series_to_datetime_micros(series: &Series) -> PolarsResult<Series> {
use polars::datatypes::TimeUnit;
if series.dtype() == &DataType::String {
let name = series.name().as_str().into();
let session_tz = crate::get_thread_session_time_zone();
let tz_ref = session_tz.as_str();
let ca = series.str().map_err(|e| compute_err("date on string", e))?;
let out = Int64Chunked::from_iter_options(
name,
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| parse_timestamp_string_flexible_with_tz(s, Some(tz_ref)))
}),
);
out.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
} else {
series.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
}
}
fn utc_micros_to_hour_in_tz(micros: i64, tz_str: &str) -> Option<i32> {
use chrono::Timelike;
if tz_str.eq_ignore_ascii_case("UTC") {
return chrono::Utc
.timestamp_micros(micros)
.single()
.map(|dt: chrono::DateTime<chrono::Utc>| dt.hour() as i32);
}
let tz: Tz = tz_str.parse().ok()?;
chrono::Utc
.timestamp_micros(micros)
.single()
.map(|dt_utc: chrono::DateTime<chrono::Utc>| dt_utc.with_timezone(&tz).hour() as i32)
}
fn utc_micros_to_minute_in_tz(micros: i64, tz_str: &str) -> Option<i32> {
use chrono::Timelike;
if tz_str.eq_ignore_ascii_case("UTC") {
return chrono::Utc
.timestamp_micros(micros)
.single()
.map(|dt: chrono::DateTime<chrono::Utc>| dt.minute() as i32);
}
let tz: Tz = tz_str.parse().ok()?;
chrono::Utc
.timestamp_micros(micros)
.single()
.map(|dt_utc: chrono::DateTime<chrono::Utc>| dt_utc.with_timezone(&tz).minute() as i32)
}
fn utc_micros_to_second_in_tz(micros: i64, tz_str: &str) -> Option<i32> {
use chrono::Timelike;
if tz_str.eq_ignore_ascii_case("UTC") {
return chrono::Utc
.timestamp_micros(micros)
.single()
.map(|dt: chrono::DateTime<chrono::Utc>| dt.second() as i32);
}
let tz: Tz = tz_str.parse().ok()?;
chrono::Utc
.timestamp_micros(micros)
.single()
.map(|dt_utc: chrono::DateTime<chrono::Utc>| dt_utc.with_timezone(&tz).second() as i32)
}
pub fn apply_hour(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let dt_series = series_to_datetime_micros(&series)?;
let ca = dt_series.datetime().map_err(|e| compute_err("hour", e))?;
let session_tz = crate::get_thread_session_time_zone();
let tz_str = session_tz.as_str();
let out = Int32Chunked::from_iter_options(
name.as_str().into(),
ca.phys
.iter()
.map(|opt| opt.and_then(|t_us| utc_micros_to_hour_in_tz(t_us, tz_str))),
)
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_minute(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let dt_series = series_to_datetime_micros(&series)?;
let ca = dt_series.datetime().map_err(|e| compute_err("minute", e))?;
let session_tz = crate::get_thread_session_time_zone();
let tz_str = session_tz.as_str();
let out = Int32Chunked::from_iter_options(
name.as_str().into(),
ca.phys
.iter()
.map(|opt| opt.and_then(|t_us| utc_micros_to_minute_in_tz(t_us, tz_str))),
)
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_second(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let dt_series = series_to_datetime_micros(&series)?;
let ca = dt_series.datetime().map_err(|e| compute_err("second", e))?;
let session_tz = crate::get_thread_session_time_zone();
let tz_str = session_tz.as_str();
let out = Int32Chunked::from_iter_options(
name.as_str().into(),
ca.phys
.iter()
.map(|opt| opt.and_then(|t_us| utc_micros_to_second_in_tz(t_us, tz_str))),
)
.into_series();
Ok(Some(Column::new(name, out)))
}
pub fn apply_date_trunc(column: Column, polars_duration: &str) -> PolarsResult<Option<Column>> {
use chrono::{Datelike, NaiveDateTime, TimeZone, Timelike};
use polars::datatypes::TimeUnit;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let dt_series = series_to_datetime_micros(&series)?;
let ca = dt_series
.datetime()
.map_err(|e| compute_err("date_trunc", e))?;
let truncate_by = match polars_duration.to_lowercase().as_str() {
"1y" => 0,
"1mo" => 1,
"1w" => 2,
"1d" => 3,
"1h" => 4,
"1m" => 5,
"1s" => 6,
_ => {
return Err(PolarsError::ComputeError(
format!("date_trunc: unsupported duration {:?}", polars_duration).into(),
));
}
};
let results: Vec<Option<i64>> = ca
.phys
.iter()
.map(|opt: Option<i64>| {
opt.and_then(|t_us| {
let dt = match chrono::Utc.timestamp_micros(t_us) {
chrono::LocalResult::Single(d) => d,
_ => return None,
};
let ndt = dt.naive_utc();
let (y, mo, d, h, min, s) = (
ndt.year(),
ndt.month(),
ndt.day(),
ndt.hour(),
ndt.minute(),
ndt.second(),
);
let (tr_y, tr_mo, tr_d, tr_h, tr_min, tr_s) = match truncate_by {
0 => (y, 1, 1, 0, 0, 0),
1 => (y, mo, 1, 0, 0, 0),
2 => {
let w = ndt.date().week(chrono::Weekday::Mon);
let start = w.first_day();
(start.year(), start.month(), start.day(), 0, 0, 0)
}
3 => (y, mo, d, 0, 0, 0),
4 => (y, mo, d, h, 0, 0),
5 => (y, mo, d, h, min, 0),
6 => (y, mo, d, h, min, s),
_ => (y, mo, d, h, min, s),
};
let truncated = NaiveDateTime::parse_from_str(
&format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
tr_y, tr_mo, tr_d, tr_h, tr_min, tr_s
),
"%Y-%m-%d %H:%M:%S",
)
.ok()?;
Some(chrono::Utc.from_utc_datetime(&truncated).timestamp_micros())
})
})
.collect();
let chunked = Int64Chunked::from_iter_options(name.as_str().into(), results.into_iter());
let out = chunked
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
Ok(Some(Column::new(name, out)))
}
pub fn apply_make_date(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
use chrono::NaiveDate;
if columns.len() < 3 {
return Err(PolarsError::ComputeError(
"make_date needs three columns (year, month, day)".into(),
));
}
let name = columns[0].field().into_owned().name;
let y_series = std::mem::take(&mut columns[0]).take_materialized_series();
let m_series = std::mem::take(&mut columns[1]).take_materialized_series();
let d_series = std::mem::take(&mut columns[2]).take_materialized_series();
let y_ca = y_series
.cast(&DataType::Int32)?
.i32()
.map_err(|e| compute_err("make_date", e))?
.clone();
let m_ca = m_series
.cast(&DataType::Int32)?
.i32()
.map_err(|e| compute_err("make_date", e))?
.clone();
let d_ca = d_series
.cast(&DataType::Int32)?
.i32()
.map_err(|e| compute_err("make_date", e))?
.clone();
let out = Int32Chunked::from_iter_options(
name.as_str().into(),
y_ca.into_iter()
.zip(&m_ca)
.zip(&d_ca)
.map(|((oy, om), od)| match (oy, om, od) {
(Some(y), Some(m), Some(d)) => {
NaiveDate::from_ymd_opt(y, m as u32, d as u32).map(naivedate_to_days)
}
_ => None,
}),
);
let out_series = out.into_series().cast(&DataType::Date)?;
Ok(Some(Column::new(name, out_series)))
}
pub fn apply_unix_date(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let casted = series.cast(&DataType::Date)?;
let days = casted.cast(&DataType::Int32)?;
Ok(Some(Column::new(name, days)))
}
pub fn apply_date_from_unix_date(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let days = series.cast(&DataType::Int32)?;
let out = days.cast(&DataType::Date)?;
Ok(Some(Column::new(name, out)))
}
pub fn apply_pmod(columns: &mut [Column]) -> PolarsResult<Option<Column>> {
if columns.len() < 2 {
return Err(PolarsError::ComputeError("pmod needs two columns".into()));
}
let name = columns[0].field().into_owned().name;
let a_series = std::mem::take(&mut columns[0]).take_materialized_series();
let b_series = std::mem::take(&mut columns[1]).take_materialized_series();
let a = float_series_to_f64(&a_series)?;
let b = float_series_to_f64(&b_series)?;
let out = Float64Chunked::from_iter_options(
name.as_str().into(),
a.into_iter().zip(&b).map(|(oa, ob)| match (oa, ob) {
(Some(x), Some(y)) if y != 0.0 => {
let r = x % y;
Some(if r >= 0.0 { r } else { r + y.abs() })
}
_ => None,
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
fn factorial_u64(n: i64) -> Option<i64> {
if n < 0 {
return None;
}
if n > 20 {
return None; }
let mut acc: i64 = 1;
for i in 1..=n {
acc = acc.checked_mul(i)?;
}
Some(acc)
}
pub fn apply_from_utc_timestamp(column: Column, tz_str: &str) -> PolarsResult<Option<Column>> {
let _: Tz = tz_str
.parse()
.map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {tz_str}").into()))?;
Ok(Some(column))
}
pub fn apply_to_utc_timestamp(column: Column, tz_str: &str) -> PolarsResult<Option<Column>> {
let _: Tz = tz_str
.parse()
.map_err(|_| PolarsError::ComputeError(format!("invalid timezone: {tz_str}").into()))?;
Ok(Some(column))
}
pub fn apply_convert_timezone(
column: Column,
_source_tz: &str,
_target_tz: &str,
) -> PolarsResult<Option<Column>> {
Ok(Some(column))
}
pub fn apply_factorial(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let casted = series
.cast(&DataType::Int64)
.map_err(|e| compute_err("factorial cast", e))?;
let ca = casted.i64().map_err(|e| compute_err("factorial", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_n| opt_n.and_then(factorial_u64)),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_url_decode(column: Column) -> PolarsResult<Option<Column>> {
use percent_encoding::percent_decode_str;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("url_decode", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
percent_decode_str(s)
.decode_utf8()
.ok()
.map(|c| c.into_owned())
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_url_encode(column: Column) -> PolarsResult<Option<Column>> {
use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("url_encode", e))?;
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.map(|s| {
utf8_percent_encode(s, NON_ALPHANUMERIC)
.to_string()
.replace("%20", "+")
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_shift_right_unsigned(column: Column, n: i32) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let s = series.cast(&DataType::Int64)?;
let ca = s
.i64()
.map_err(|e| compute_err("shift_right_unsigned", e))?;
let u = n as u32;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter()
.map(|opt_v| opt_v.map(|v| ((v as u64) >> u) as i64)),
);
Ok(Some(Column::new(name, out.into_series())))
}
fn get_json_object_walk(
v: &serde_json::Value,
path_steps: &[(String, Option<usize>)],
) -> Option<serde_json::Value> {
let mut current: &serde_json::Value = v;
for (key, idx) in path_steps {
if !key.is_empty() {
current = current.get(key)?;
}
if let Some(i) = idx {
current = current.as_array()?.get(*i)?;
}
}
Some(current.clone())
}
fn get_json_object_value_to_string(v: &serde_json::Value) -> Option<String> {
match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
serde_json::Value::Object(_) | serde_json::Value::Array(_) => serde_json::to_string(v).ok(),
}
}
fn parse_json_path(path: &str) -> Vec<(String, Option<usize>)> {
let path = path.trim_start_matches('$').trim_start_matches('.');
if path.is_empty() {
return vec![];
}
let mut steps = Vec::new();
for part in path.split('.') {
let part = part.trim();
if let Some(bracket) = part.find('[') {
let key = part[..bracket].to_string();
let rest = &part[bracket..];
if !key.is_empty() {
steps.push((key, None));
}
if let Some(idx_str) = rest.strip_prefix('[').and_then(|s| s.strip_suffix(']')) {
if let Ok(i) = idx_str.parse::<usize>() {
steps.push((String::new(), Some(i)));
}
}
} else {
steps.push((part.to_string(), None));
}
}
steps
}
pub fn apply_get_json_object(column: Column, path: &str) -> PolarsResult<Option<Column>> {
use polars::prelude::DataType;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("get_json_object", e))?;
let path_steps = parse_json_path(path);
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let v: serde_json::Value = serde_json::from_str(s).ok()?;
let val = get_json_object_walk(&v, &path_steps)?;
get_json_object_value_to_string(&val)
})
}),
);
let s = out.into_series();
let s = s.cast(&DataType::String)?;
Ok(Some(Column::new(name, s)))
}
pub fn apply_json_array_length(column: Column, path: &str) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("json_array_length", e))?;
let path = path.trim_start_matches('$').trim_start_matches('.');
let path_parts: Vec<&str> = if path.is_empty() {
vec![]
} else {
path.split('.').collect()
};
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let v: serde_json::Value = serde_json::from_str(s).ok()?;
let mut current = &v;
for part in &path_parts {
current = current.get(part)?;
}
current.as_array().map(|a| a.len() as i64)
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_json_object_keys(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("json_object_keys", e))?;
let out: ListChunked = ca
.into_iter()
.map(|opt_s| {
opt_s.and_then(|s| {
let v: serde_json::Value = serde_json::from_str(s).ok()?;
let obj = v.as_object()?;
let keys: Vec<String> = obj.keys().map(String::from).collect();
Some(Series::new("".into(), keys))
})
})
.collect();
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_json_tuple(column: Column, keys: &[String]) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("json_tuple", e))?;
let keys = keys.to_vec();
let mut columns_per_key: Vec<Vec<Option<String>>> =
(0..keys.len()).map(|_| Vec::new()).collect();
for opt_s in ca.into_iter() {
for (i, key) in keys.iter().enumerate() {
let val = opt_s.and_then(|s| {
let v: serde_json::Value = serde_json::from_str(s).ok()?;
let obj = v.as_object()?;
obj.get(key).and_then(|x| x.as_str()).map(String::from)
});
columns_per_key[i].push(val);
}
}
let field_series: Vec<Series> = keys
.iter()
.zip(columns_per_key.iter())
.map(|(k, vals)| Series::new(k.as_str().into(), vals.clone()))
.collect();
let out_df = DataFrame::new_infer_height(field_series.into_iter().map(|s| s.into()).collect())?;
let out_struct = out_df.into_struct(name.as_str().into());
Ok(Some(Column::new(name, out_struct.into_series())))
}
pub fn apply_from_csv(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("from_csv", e))?;
const MAX_COLS: usize = 32;
let mut columns: Vec<Vec<Option<String>>> = (0..MAX_COLS).map(|_| Vec::new()).collect();
for opt_s in ca.into_iter() {
let parts: Vec<&str> = opt_s
.map(|s| s.split(',').collect::<Vec<_>>())
.unwrap_or_default();
for (i, col) in columns.iter_mut().enumerate().take(MAX_COLS) {
let v = parts.get(i).map(|p| (*p).to_string());
col.push(v);
}
}
let field_series: Vec<Series> = (0..MAX_COLS)
.map(|i| Series::new(format!("_c{i}").into(), columns[i].clone()))
.collect();
let out_df = DataFrame::new_infer_height(field_series.into_iter().map(|s| s.into()).collect())?;
let out_series = out_df.into_struct(name.as_str().into()).into_series();
Ok(Some(Column::new(name, out_series)))
}
pub fn apply_to_csv(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = series.cast(&DataType::String)?;
Ok(Some(Column::new(name, out)))
}
pub fn apply_parse_url(
column: Column,
part: &str,
key: Option<&str>,
) -> PolarsResult<Option<Column>> {
use url::Url;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series.str().map_err(|e| compute_err("parse_url", e))?;
let part_upper = part.trim().to_uppercase();
let key_owned = key.map(String::from);
let out = StringChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
let u = Url::parse(s).ok()?;
let out: Option<String> = match part_upper.as_str() {
"PROTOCOL" | "PROT" => Some(u.scheme().to_string()),
"HOST" => u.host_str().map(String::from),
"PATH" | "FILE" | "PATHNAME" => Some(u.path().to_string()),
"QUERY" | "REF" | "QUERYSTRING" => {
if let Some(ref k) = key_owned {
u.query_pairs()
.find(|(name, _)| name.as_ref() == k.as_str())
.map(|(_, value)| value.into_owned())
} else {
u.query().map(String::from)
}
}
"USERINFO" => Some(format!("{}:{}", u.username(), u.password().unwrap_or(""))),
"AUTHORITY" => u.host_str().map(|h| h.to_string()),
_ => None,
};
out
})
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_hash_one(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = Int64Chunked::from_iter_options(name.as_str().into(), series_to_hash_iter(series));
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_hash_struct(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out = Int64Chunked::from_iter_options(name.as_str().into(), series_to_hash_iter(series));
Ok(Some(Column::new(name, out.into_series())))
}
fn series_to_hash_iter(series: Series) -> impl Iterator<Item = Option<i64>> {
use std::io::Cursor;
(0..series.len()).map(move |i| {
let av = series.get(i).ok()?;
let bytes = any_value_to_hash_bytes(&av);
let h = murmur3::murmur3_32(&mut Cursor::new(bytes), 0).ok()?;
Some(h as i32 as i64)
})
}
fn any_value_to_hash_bytes(av: &polars::datatypes::AnyValue) -> Vec<u8> {
use polars::datatypes::AnyValue;
let mut buf = Vec::new();
match av {
AnyValue::Null => buf.push(0),
AnyValue::Boolean(v) => buf.push(*v as u8),
AnyValue::Int32(v) => buf.extend_from_slice(&v.to_le_bytes()),
AnyValue::Int64(v) => buf.extend_from_slice(&v.to_le_bytes()),
AnyValue::UInt32(v) => buf.extend_from_slice(&v.to_le_bytes()),
AnyValue::UInt64(v) => buf.extend_from_slice(&v.to_le_bytes()),
AnyValue::Float32(v) => buf.extend_from_slice(&v.to_bits().to_le_bytes()),
AnyValue::Float64(v) => buf.extend_from_slice(&v.to_bits().to_le_bytes()),
AnyValue::String(v) => buf.extend_from_slice(v.as_bytes()),
AnyValue::Binary(v) => buf.extend_from_slice(v),
_ => buf.extend_from_slice(av.to_string().as_bytes()),
}
buf
}
pub fn apply_sequence(column: Column) -> PolarsResult<Option<Column>> {
use polars::chunked_array::builder::get_list_builder;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let st = series
.struct_()
.map_err(|e| compute_err("sequence: expected struct column", e))?;
let start_s = st
.field_by_name("0")
.map_err(|e| compute_err("sequence field 0", e))?;
let stop_s = st
.field_by_name("1")
.map_err(|e| compute_err("sequence field 1", e))?;
let step_s = st.field_by_name("2").ok(); let start_series = start_s
.cast(&DataType::Int64)
.map_err(|e| compute_err("sequence start", e))?;
let stop_series = stop_s
.cast(&DataType::Int64)
.map_err(|e| compute_err("sequence stop", e))?;
let step_series_opt: Option<Series> = step_s
.as_ref()
.map(|s| s.cast(&DataType::Int64))
.transpose()
.map_err(|e| compute_err("sequence step", e))?;
let start_ca = start_series.i64().map_err(|e| compute_err("sequence", e))?;
let stop_ca = stop_series.i64().map_err(|e| compute_err("sequence", e))?;
let step_ca = step_series_opt.as_ref().and_then(|s| s.i64().ok());
let n = start_ca.len();
let mut builder = get_list_builder(&DataType::Int64, 64, n, name.as_str().into());
for i in 0..n {
let start_v = start_ca.get(i);
let stop_v = stop_ca.get(i);
let step_v: Option<i64> = step_ca.as_ref().and_then(|ca| ca.get(i)).or(Some(1));
match (start_v, stop_v, step_v) {
(Some(s), Some(st), Some(step)) if step != 0 => {
let mut vals: Vec<i64> = Vec::new();
if step > 0 {
let mut v = s;
while v <= st {
vals.push(v);
v += step;
}
} else {
let mut v = s;
while v >= st {
vals.push(v);
v += step;
}
}
let series = Series::new("".into(), vals);
builder.append_series(&series)?;
}
_ => builder.append_null(),
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
fn series_set_nulls_where(series: &Series, mask: &BooleanChunked) -> PolarsResult<Series> {
use polars::chunked_array::ops::ChunkSet;
let name = series.name().to_string();
let dtype = series.dtype().clone();
let len = series.len();
let mask_len = mask.len();
let mask_to_use: BooleanChunked = if mask_len == len {
mask.clone()
} else if mask_len > len && len == 1 {
mask.clone()
} else {
mask.clone()
};
let masked = match dtype {
DataType::Int64 => series
.i64()
.map_err(|e| compute_err("with_field mask i64", e))?
.set(&mask_to_use, None)
.map_err(|e| compute_err("with_field set i64", e))?
.into_series(),
DataType::Int32 => series
.i32()
.map_err(|e| compute_err("with_field mask i32", e))?
.set(&mask_to_use, None)
.map_err(|e| compute_err("with_field set i32", e))?
.into_series(),
DataType::String => {
let ca = series
.str()
.map_err(|e| compute_err("with_field mask str", e))?;
let n = mask_to_use.len();
let mut opts: Vec<Option<String>> = Vec::with_capacity(n);
for i in 0..n {
let set_null = mask_to_use.get(i).unwrap_or(false);
let val = ca
.get(if i < ca.len() { i } else { 0 })
.map(|s| s.to_string());
opts.push(if set_null { None } else { val });
}
StringChunked::from_iter_options(name.as_str().into(), opts.into_iter()).into_series()
}
DataType::Float64 => series
.f64()
.map_err(|e| compute_err("with_field mask f64", e))?
.set(&mask_to_use, None)
.map_err(|e| compute_err("with_field set f64", e))?
.into_series(),
DataType::Boolean => series
.bool()
.map_err(|e| compute_err("with_field mask bool", e))?
.set(&mask_to_use, None)
.map_err(|e| compute_err("with_field set bool", e))?
.into_series(),
DataType::Unknown(uk) => {
if let Some(dt) = uk.materialize() {
let casted = series
.cast(&dt)
.map_err(|e| compute_err("with_field cast unknown", e))?;
return series_set_nulls_where(&casted, mask);
}
series.clone()
}
DataType::Struct(_) => {
use polars::chunked_array::StructChunked;
let st = series
.struct_()
.map_err(|e| compute_err("with_field mask struct", e))?;
let len = st.len();
let fields_series = st.fields_as_series();
let masked_fields: Vec<Series> = fields_series
.iter()
.map(|s| series_set_nulls_where(s, mask).unwrap_or_else(|_| s.clone()))
.collect();
let out = StructChunked::from_series(name.as_str().into(), len, masked_fields.iter())
.map_err(|e| compute_err("with_field rebuild struct", e))?;
out.into_series()
}
DataType::List(_) => {
series.clone()
}
_ => {
if let Ok(casted) = series.cast(&DataType::String) {
return series_set_nulls_where(&casted, mask);
}
series.clone()
}
};
Ok(masked.with_name(PlSmallStr::from(name.as_str())))
}
pub fn apply_struct_with_field(
struct_col: Column,
value_col: Column,
field_name: &str,
) -> PolarsResult<Option<Column>> {
use polars::chunked_array::StructChunked;
let name = struct_col.field().into_owned().name;
let struct_series = struct_col.take_materialized_series();
let st = struct_series
.struct_()
.map_err(|e| compute_err("with_field: expected struct column", e))?;
let len = st.len();
let struct_null: BooleanChunked = {
let explicit_null = struct_series.is_null();
let fields_series_for_mask = st.fields_as_series();
let all_fields_null: BooleanChunked = fields_series_for_mask
.iter()
.map(|s| s.is_null())
.fold(None, |acc: Option<BooleanChunked>, m| {
Some(match acc {
Some(a) => a & m,
None => m,
})
})
.unwrap_or_else(|| {
BooleanChunked::from_iter_options(PlSmallStr::EMPTY, (0..len).map(|_| Some(false)))
});
explicit_null | all_fields_null
};
let fields_series = st.fields_as_series();
let mut value_series = value_col.take_materialized_series();
if let DataType::Unknown(uk) = value_series.dtype() {
if let Some(dt) = uk.materialize() {
value_series = value_series
.cast(&dt)
.map_err(|e| compute_err("with_field materialize literal", e))?;
}
}
if value_series.len() != len {
let indices: Vec<u32> = (0..len).map(|_| 0u32).collect();
let idx_ca = UInt32Chunked::from_vec("".into(), indices);
value_series = value_series
.take(&idx_ca)
.map_err(|e| compute_err("with_field broadcast literal", e))?;
}
let value_masked: Series = match value_series.dtype() {
DataType::String | DataType::Unknown(_) => {
let value_str = if value_series.dtype() == &DataType::String {
value_series.clone()
} else {
value_series
.cast(&DataType::String)
.unwrap_or_else(|_| value_series.clone())
};
let name_val = value_series.name().to_string();
let mut opts: Vec<Option<String>> = Vec::with_capacity(len);
if let Ok(ca) = value_str.str() {
for i in 0..len {
let set_null = struct_null.get(i).unwrap_or(false);
let val = ca
.get(if i < ca.len() { i } else { 0 })
.map(|s| s.to_string());
opts.push(if set_null { None } else { val });
}
StringChunked::from_iter_options(name_val.as_str().into(), opts.into_iter())
.into_series()
} else {
value_series.clone()
}
}
_ => series_set_nulls_where(&value_series, &struct_null)
.unwrap_or_else(|_| value_series.clone()),
};
let mut new_fields: Vec<Series> = Vec::with_capacity(fields_series.len() + 1);
let mut replaced = false;
for s in &fields_series {
let fname = s.name().as_str();
let new_s = if fname == field_name {
replaced = true;
let mut v = value_masked.clone();
v.rename(PlSmallStr::from(field_name));
v
} else {
s.clone()
};
new_fields.push(new_s);
}
if !replaced {
let mut v = value_masked.clone();
v.rename(PlSmallStr::from(field_name));
new_fields.push(v);
}
let new_fields: Vec<Series> = new_fields
.into_iter()
.map(|s| series_set_nulls_where(&s, &struct_null).unwrap_or(s))
.collect();
let out = StructChunked::from_series(name.as_str().into(), len, new_fields.iter())
.map_err(|e| compute_err("with_field: build struct", e))?;
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_shuffle(column: Column) -> PolarsResult<Option<Column>> {
use polars::chunked_array::builder::get_list_builder;
use rand::seq::SliceRandom;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let list_ca = series.list().map_err(|e| compute_err("shuffle", e))?;
let inner_dtype = list_ca.inner_dtype().clone();
let mut builder = get_list_builder(&inner_dtype, 64, list_ca.len(), name.as_str().into());
for opt_list in list_ca.amortized_iter() {
match opt_list {
None => builder.append_null(),
Some(amort) => {
let list_s = amort.as_ref();
let n = list_s.len();
let mut indices: Vec<u32> = (0..n as u32).collect();
indices.shuffle(&mut rand::thread_rng());
let idx_ca = UInt32Chunked::from_vec("".into(), indices);
let taken = list_s
.take(&idx_ca)
.map_err(|e| compute_err("shuffle take", e))?;
builder.append_series(&taken)?;
}
}
}
Ok(Some(Column::new(name, builder.finish().into_series())))
}
const BITMAP_BYTES: usize = 4096;
fn parse_str_to_datetime_micros(s: &str) -> Option<i64> {
use chrono::{NaiveDate, NaiveDateTime};
let s = s.trim();
NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f")
.ok()
.or_else(|| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").ok())
.or_else(|| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").ok())
.or_else(|| {
if s.len() >= 10 {
NaiveDate::parse_from_str(&s[0..10], "%Y-%m-%d")
.ok()
.and_then(|d| d.and_hms_opt(0, 0, 0))
} else {
None
}
})
.map(|dt| dt.and_utc().timestamp_micros())
}
fn parse_str_to_date(s: &str) -> Option<chrono::NaiveDate> {
use chrono::{NaiveDate, NaiveDateTime};
let s = s.trim();
NaiveDate::parse_from_str(s, "%Y-%m-%d")
.ok()
.or_else(|| {
if s.len() >= 10 {
NaiveDate::parse_from_str(&s[0..10], "%Y-%m-%d").ok()
} else {
None
}
})
.or_else(|| {
NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
.ok()
.map(|dt| dt.date())
})
}
fn parse_str_to_int(s: &str) -> Option<i64> {
let s = s.trim();
if s.is_empty() {
return None;
}
s.parse::<i64>()
.ok()
.or_else(|| s.parse::<f64>().ok().and_then(f64_to_int_trunc))
}
fn f64_to_int_trunc(f: f64) -> Option<i64> {
if f.is_nan() || f.is_infinite() {
return None;
}
let truncated = f.trunc();
if truncated >= i64::MIN as f64 && truncated <= i64::MAX as f64 {
Some(truncated as i64)
} else {
None
}
}
pub fn apply_string_to_int(
column: Column,
strict: bool,
target: DataType,
) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out: Series = match series.dtype() {
DataType::Null => {
Series::new_null(name.clone(), series.len()).cast(&target)?
}
DataType::String => {
let ca = series.str().map_err(|e| compute_err("string to int", e))?;
let mut results: Vec<Option<i64>> = Vec::with_capacity(ca.len());
for opt_s in ca.into_iter() {
let v = opt_s.and_then(parse_str_to_int);
if strict {
if let Some(s) = opt_s {
if v.is_none() {
return Err(PolarsError::ComputeError(
format!(
"conversion from `str` to `{}` failed in column '{}' for value \"{s}\"",
target,
name.as_str()
)
.into(),
));
}
}
}
results.push(v);
}
match &target {
DataType::Int32 => {
let vals: Vec<Option<i32>> = results
.into_iter()
.map(|o| {
o.and_then(|n| {
(n >= i64::from(i32::MIN) && n <= i64::from(i32::MAX))
.then_some(n as i32)
})
})
.collect();
let chunked =
Int32Chunked::from_iter_options(name.as_str().into(), vals.into_iter());
chunked.into_series()
}
DataType::Int64 => {
let chunked =
Int64Chunked::from_iter_options(name.as_str().into(), results.into_iter());
chunked.into_series()
}
_ => unreachable!("target is Int32 or Int64"),
}
}
DataType::Int32 | DataType::Int64 => series.cast(&target)?,
DataType::Float32 | DataType::Float64 => {
let casted = series.cast(&DataType::Float64)?;
let ca = casted.f64()?;
let vals_f64: Vec<Option<f64>> = ca.into_iter().collect();
let mut results: Vec<Option<i64>> = Vec::with_capacity(vals_f64.len());
for opt_v in vals_f64 {
let v: Option<i64> = match opt_v {
None => None,
Some(f) => {
if f.is_nan() || f.is_infinite() {
if strict {
return Err(PolarsError::ComputeError(
format!(
"casting from {} to {} failed for value {} (NaN/inf)",
series.dtype(),
target,
f
)
.into(),
));
}
None
} else {
let n = f.trunc() as i64;
if target == DataType::Int32
&& (n < i64::from(i32::MIN) || n > i64::from(i32::MAX))
{
if strict {
return Err(PolarsError::ComputeError(
format!(
"casting from {} to {} overflow for value {}",
series.dtype(),
target,
f
)
.into(),
));
}
None
} else {
Some(n)
}
}
}
};
results.push(v);
}
let out_series = match &target {
DataType::Int32 => {
let vals: Vec<Option<i32>> = results
.into_iter()
.map(|o| {
o.and_then(|n| {
(n >= i64::from(i32::MIN) && n <= i64::from(i32::MAX))
.then_some(n as i32)
})
})
.collect();
Int32Chunked::from_iter_options(name.as_str().into(), vals.into_iter())
.into_series()
}
DataType::Int64 => {
Int64Chunked::from_iter_options(name.as_str().into(), results.into_iter())
.into_series()
}
_ => unreachable!("target is Int32 or Int64"),
};
out_series
}
_ => {
if strict {
return Err(PolarsError::ComputeError(
format!(
"casting from {} to {} not supported",
series.dtype(),
target
)
.into(),
));
}
series.cast(&target)?
}
};
Ok(Some(Column::new(name, out)))
}
fn parse_str_to_bool(s: &str, strict: bool) -> Option<bool> {
let lower = s.trim().to_lowercase();
if lower.is_empty() {
return Some(false);
}
match lower.as_str() {
"true" | "1" | "yes" => Some(true),
"false" | "0" | "no" => Some(false),
_ if strict => None, _ => None,
}
}
pub fn apply_logical_not_boolean_only(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
if series.dtype() != &DataType::Boolean {
return Err(PolarsError::ComputeError(
"PySpark: '~' on Column is boolean NOT; cannot apply to numeric/integer columns. Use F.expr(\"~x\") for bitwise NOT.".into(),
));
}
let ca = series.bool().map_err(|e| compute_err("logical_not", e))?;
let not_ca = ca.not();
Ok(Some(Column::new(name, not_ca.into_series())))
}
pub fn apply_string_to_boolean(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out: BooleanChunked = match series.dtype() {
DataType::String => {
let ca = series
.str()
.map_err(|e| compute_err("string to boolean", e))?;
let mut results = Vec::with_capacity(ca.len());
for opt_s in ca.into_iter() {
let v = match opt_s {
None => None,
Some(s) => {
let parsed = parse_str_to_bool(s, strict);
if strict && parsed.is_none() {
return Err(PolarsError::ComputeError(
format!("casting from string to boolean failed for value '{s}'")
.into(),
));
}
parsed
}
};
results.push(v);
}
BooleanChunked::from_iter_options(name.as_str().into(), results.into_iter())
}
DataType::Boolean => {
let ca = series.bool().map_err(|e| compute_err("boolean", e))?;
BooleanChunked::from_iter_options(name.as_str().into(), ca.into_iter())
}
DataType::Int8 => {
let ca = series.i8().map_err(|e| compute_err("i8", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::Int16 => {
let ca = series.i16().map_err(|e| compute_err("i16", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::Int32 => {
let ca = series.i32().map_err(|e| compute_err("i32", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::Int64 => {
let ca = series.i64().map_err(|e| compute_err("i64", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::UInt8 => {
let ca = series.u8().map_err(|e| compute_err("u8", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::UInt16 => {
let ca = series.u16().map_err(|e| compute_err("u16", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::UInt32 => {
let ca = series.u32().map_err(|e| compute_err("u32", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::UInt64 => {
let ca = series.u64().map_err(|e| compute_err("u64", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0)),
)
}
DataType::Float32 => {
let ca = series.f32().map_err(|e| compute_err("f32", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0.0)),
)
}
DataType::Float64 => {
let ca = series.f64().map_err(|e| compute_err("f64", e))?;
BooleanChunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|o| o.map(|v| v != 0.0)),
)
}
DataType::Null => {
BooleanChunked::from_iter_options(name.as_str().into(), (0..series.len()).map(|_| None))
}
_ => {
if strict {
return Err(PolarsError::ComputeError(
format!("casting from {} to boolean not supported", series.dtype()).into(),
));
}
BooleanChunked::from_iter_options(name.as_str().into(), (0..series.len()).map(|_| None))
}
};
Ok(Some(Column::new(name, out.into_series())))
}
fn parse_str_to_double(s: &str) -> Option<f64> {
let s = s.trim();
if s.is_empty() {
return None;
}
s.parse::<f64>().ok()
}
pub fn apply_string_to_double(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let out: Series = match series.dtype() {
DataType::String => {
let ca = series
.str()
.map_err(|e| compute_err("string to double", e))?;
let mut results: Vec<Option<f64>> = Vec::with_capacity(ca.len());
for opt_s in ca.into_iter() {
let v = opt_s.and_then(parse_str_to_double);
if strict {
if let Some(s) = opt_s {
if v.is_none() {
return Err(PolarsError::ComputeError(
format!(
"conversion from `str` to `double` failed in column '{}' for value \"{s}\"",
name.as_str()
)
.into(),
));
}
}
}
results.push(v);
}
Float64Chunked::from_iter_options(name.as_str().into(), results.into_iter())
.into_series()
}
DataType::Float32 | DataType::Float64 => series.cast(&DataType::Float64)?,
DataType::Int32 | DataType::Int64 | DataType::UInt32 | DataType::UInt64 => {
series.cast(&DataType::Float64)?
}
DataType::Null => {
Float64Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<f64>),
)
.into_series()
}
_ => {
if strict {
return Err(PolarsError::ComputeError(
format!("casting from {} to double not supported", series.dtype()).into(),
));
}
Float64Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<f64>),
)
.into_series()
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_string_to_date(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let epoch = robin_sparkless_core::date_utils::epoch_naive_date();
let out: Series = match series.dtype() {
DataType::String => {
let ca = series.str().map_err(|e| compute_err("string to date", e))?;
let mut results = Vec::with_capacity(ca.len());
for opt_s in ca.into_iter() {
let v =
opt_s.and_then(|s| parse_str_to_date(s).map(|d| (d - epoch).num_days() as i32));
if strict {
if let Some(s) = opt_s {
if v.is_none() {
return Err(PolarsError::ComputeError(
format!(
"conversion from `str` to `date` failed in column '{}' for value \"{s}\"",
name.as_str()
)
.into(),
));
}
}
}
results.push(v);
}
let chunked =
Int32Chunked::from_iter_options(name.as_str().into(), results.into_iter());
chunked.into_series().cast(&DataType::Date)?
}
DataType::Date => series,
DataType::Datetime(_, _) => series.cast(&DataType::Date)?,
DataType::Null => {
let days = Int32Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<i32>),
);
days.into_series().cast(&DataType::Date)?
}
_ => {
if strict {
return Err(PolarsError::ComputeError(
format!("casting from {} to date not supported", series.dtype()).into(),
));
}
let days = Int32Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<i32>),
);
days.into_series().cast(&DataType::Date)?
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_string_to_datetime(column: Column, strict: bool) -> PolarsResult<Option<Column>> {
use polars::datatypes::TimeUnit;
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let dtype_out = DataType::Datetime(TimeUnit::Microseconds, None);
let out: Series = match series.dtype() {
DataType::String => {
let ca = series
.str()
.map_err(|e| compute_err("string to datetime", e))?;
let mut results = Vec::with_capacity(ca.len());
for opt_s in ca.into_iter() {
let v = opt_s.and_then(parse_str_to_datetime_micros);
if strict {
if let Some(s) = opt_s {
if v.is_none() {
return Err(PolarsError::ComputeError(
format!(
"conversion from `str` to `datetime` failed in column '{}' for value \"{s}\"",
name.as_str()
)
.into(),
));
}
}
}
results.push(v);
}
let chunked =
Int64Chunked::from_iter_options(name.as_str().into(), results.into_iter());
chunked.into_series().cast(&dtype_out)?
}
DataType::Datetime(_, _) => series.cast(&dtype_out)?,
DataType::Date => series.cast(&dtype_out)?,
DataType::Null => {
let micros = Int64Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<i64>),
);
micros.into_series().cast(&dtype_out)?
}
_ => {
if strict {
return Err(PolarsError::ComputeError(
format!("casting from {} to datetime not supported", series.dtype()).into(),
));
}
let micros = Int64Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<i64>),
);
micros.into_series().cast(&dtype_out)?
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_string_to_date_format(
column: Column,
format: Option<&str>,
strict: bool,
) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let epoch = robin_sparkless_core::date_utils::epoch_naive_date();
let out: Series = match series.dtype() {
DataType::String => {
let ca = series.str().map_err(|e| compute_err("string to date", e))?;
let fmt = format.map(pyspark_format_to_chrono);
let mut results = Vec::with_capacity(ca.len());
for opt_s in ca.into_iter() {
let v = opt_s.and_then(|s| {
let parsed = if let Some(ref chrono_fmt) = fmt {
chrono::NaiveDate::parse_from_str(s.trim(), chrono_fmt).ok()
} else {
parse_str_to_date(s)
};
parsed.map(|d| (d.signed_duration_since(epoch).num_days()) as i32)
});
results.push(v);
}
let chunked =
Int32Chunked::from_iter_options(name.as_str().into(), results.into_iter());
chunked.into_series().cast(&DataType::Date)?
}
DataType::Date => series,
DataType::Datetime(_, _) => series.cast(&DataType::Date)?,
DataType::Null => {
let days = Int32Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<i32>),
);
days.into_series().cast(&DataType::Date)?
}
_ => {
if strict {
return Err(PolarsError::ComputeError(
"to_date requires StringType, TimestampType, or DateType input".into(),
));
}
let days = Int32Chunked::from_iter_options(
name.as_str().into(),
(0..series.len()).map(|_| None::<i32>),
);
days.into_series().cast(&DataType::Date)?
}
};
Ok(Some(Column::new(name, out)))
}
pub fn apply_bitmap_count(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.binary()
.map_err(|e| compute_err("bitmap_count", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_b| {
opt_b.map(|b| b.iter().map(|&byte| byte.count_ones() as i64).sum::<i64>())
}),
);
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_bitmap_construct_agg(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let list_ca = series
.list()
.map_err(|e| compute_err("bitmap_construct_agg", e))?;
let out: BinaryChunked = list_ca
.amortized_iter()
.map(|opt_list| {
opt_list.and_then(|list_series| {
let mut buf = vec![0u8; BITMAP_BYTES];
let ca = list_series.as_ref().i64().ok()?;
for pos in ca.into_iter().flatten() {
let pos = pos as usize;
if pos < BITMAP_BYTES * 8 {
let byte_idx = pos / 8;
let bit_idx = pos % 8;
buf[byte_idx] |= 1 << bit_idx;
}
}
Some(bytes::Bytes::from(buf))
})
})
.collect();
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_bitmap_or_agg(column: Column) -> PolarsResult<Option<Column>> {
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let list_ca = series.list().map_err(|e| compute_err("bitmap_or_agg", e))?;
let out: BinaryChunked = list_ca
.amortized_iter()
.map(|opt_list| {
opt_list.and_then(|list_series| {
let list_c = list_series.as_ref().as_list();
let mut buf = vec![0u8; BITMAP_BYTES];
for opt_bin in list_c.amortized_iter().flatten() {
let bin_ca: &BinaryChunked = opt_bin.as_ref().binary().ok()?;
for b in bin_ca.into_iter().flatten() {
let b: &[u8] = b;
for (i, &byte) in b.iter().take(BITMAP_BYTES).enumerate() {
buf[i] |= byte;
}
}
}
Some(bytes::Bytes::from(buf))
})
})
.collect();
Ok(Some(Column::new(name, out.into_series())))
}
pub fn apply_to_timestamp_ltz_format(
column: Column,
format: Option<&str>,
strict: bool,
) -> PolarsResult<Option<Column>> {
use chrono::offset::TimeZone;
use chrono::{Local, NaiveDateTime, Utc};
use polars::datatypes::TimeUnit;
let chrono_fmt = format
.map(pyspark_format_to_chrono)
.unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("to_timestamp_ltz", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
NaiveDateTime::parse_from_str(s, &chrono_fmt)
.ok()
.and_then(|ndt| {
Local
.from_local_datetime(&ndt)
.single()
.map(|dt| dt.with_timezone(&Utc).timestamp_micros())
})
})
}),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
let _ = strict;
Ok(Some(Column::new(name, out_series)))
}
pub fn apply_to_timestamp_ntz_format(
column: Column,
format: Option<&str>,
strict: bool,
) -> PolarsResult<Option<Column>> {
use chrono::NaiveDateTime;
use polars::datatypes::TimeUnit;
let chrono_fmt = format
.map(pyspark_format_to_chrono)
.unwrap_or_else(|| "%Y-%m-%d %H:%M:%S".to_string());
let name = column.field().into_owned().name;
let series = column.take_materialized_series();
let ca = series
.str()
.map_err(|e| compute_err("to_timestamp_ntz", e))?;
let out = Int64Chunked::from_iter_options(
name.as_str().into(),
ca.into_iter().map(|opt_s| {
opt_s.and_then(|s| {
NaiveDateTime::parse_from_str(s, &chrono_fmt)
.ok()
.map(|ndt| ndt.and_utc().timestamp_micros())
})
}),
);
let out_series = out
.into_series()
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))?;
let _ = strict;
Ok(Some(Column::new(name, out_series)))
}
#[cfg(test)]
mod tests_parse_timestamp {
use super::parse_timestamp_string_flexible;
#[test]
fn test_parse_iso_with_tz() {
let s = "2023-02-07T04:00:01.730+0000";
let micros = parse_timestamp_string_flexible(s);
assert!(micros.is_some(), "expected Some for {s:?}");
let t = micros.unwrap();
let hour = (t / 1_000_000 / 3600) % 24;
assert_eq!(hour, 4, "hour should be 4 for {s:?}");
}
}