use std::borrow::Cow;
use arrow::compute::utils::combine_validities_and_many;
use polars_row::{RowEncodingContext, RowEncodingOptions, RowsEncoded, convert_columns};
use polars_utils::itertools::Itertools;
use rayon::prelude::*;
use crate::POOL;
use crate::prelude::*;
use crate::utils::_split_offsets;
pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
}
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
by: &[Column],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
let validities = sliced
.iter()
.flat_map(|s| {
let s = s.rechunk();
#[allow(clippy::unnecessary_to_owned)]
s.as_materialized_series()
.chunks()
.to_vec()
.into_iter()
.map(|arr| arr.validity().cloned())
})
.collect::<Vec<_>>();
let validity = combine_validities_and_many(&validities);
Ok(rows.into_array().with_validity_typed(validity))
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
}
pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
match dtype {
DataType::Boolean
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::UInt128
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int128
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::String
| DataType::Binary
| DataType::BinaryOffset
| DataType::Null
| DataType::Time
| DataType::Date
| DataType::Datetime(_, _)
| DataType::Duration(_) => None,
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_, mapping) | DataType::Enum(_, mapping) => {
use polars_row::RowEncodingCategoricalContext;
Some(RowEncodingContext::Categorical(
RowEncodingCategoricalContext {
is_enum: matches!(dtype, DataType::Enum(_, _)),
mapping: mapping.clone(),
},
))
},
DataType::Unknown(_) => panic!("Unsupported in row encoding"),
#[cfg(feature = "object")]
DataType::Object(_) => panic!("Unsupported in row encoding"),
#[cfg(feature = "dtype-decimal")]
DataType::Decimal(precision, _) => Some(RowEncodingContext::Decimal(*precision)),
#[cfg(feature = "dtype-array")]
DataType::Array(dtype, _) => get_row_encoding_context(dtype),
DataType::List(dtype) => get_row_encoding_context(dtype),
#[cfg(feature = "dtype-struct")]
DataType::Struct(fs) => {
let mut ctxts = Vec::new();
for (i, f) in fs.iter().enumerate() {
if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
ctxts.reserve(fs.len());
ctxts.extend(std::iter::repeat_n(None, i));
ctxts.push(Some(ctxt));
break;
}
}
if ctxts.is_empty() {
return None;
}
ctxts.extend(
fs[ctxts.len()..]
.iter()
.map(|f| get_row_encoding_context(f.dtype())),
);
Some(RowEncodingContext::Struct(ctxts))
},
#[cfg(feature = "dtype-extension")]
DataType::Extension(_, storage) => get_row_encoding_context(storage),
}
}
pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk(
PlSmallStr::EMPTY,
rows.into_array(),
))
}
pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut opts = Vec::with_capacity(by.len());
let mut ctxts = Vec::with_capacity(by.len());
let num_rows = by.first().map_or(0, |c| c.len());
for by in by {
debug_assert_eq!(by.len(), num_rows);
let by = by
.trim_lists_to_normalized_offsets()
.map_or(Cow::Borrowed(by), Cow::Owned);
let by = by.propagate_nulls().map_or(by, Cow::Owned);
let by = by.as_materialized_series();
let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
let opt = RowEncodingOptions::new_unsorted();
let ctxt = get_row_encoding_context(by.dtype());
cols.push(arr);
opts.push(opt);
ctxts.push(ctxt);
}
Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
}
pub fn _get_rows_encoded(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<RowsEncoded> {
debug_assert_eq!(by.len(), descending.len());
debug_assert_eq!(by.len(), nulls_last.len());
let mut cols = Vec::with_capacity(by.len());
let mut opts = Vec::with_capacity(by.len());
let mut ctxts = Vec::with_capacity(by.len());
let num_rows = by.first().map_or(0, |c| c.len());
for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
debug_assert_eq!(by.len(), num_rows);
let by = by
.trim_lists_to_normalized_offsets()
.map_or(Cow::Borrowed(by), Cow::Owned);
let by = by.propagate_nulls().map_or(by, Cow::Owned);
let by = by.as_materialized_series();
let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
let ctxt = get_row_encoding_context(by.dtype());
cols.push(arr);
opts.push(opt);
ctxts.push(ctxt);
}
Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
}
pub fn _get_rows_encoded_ca(
name: PlSmallStr,
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
broadcast_nulls: bool,
) -> PolarsResult<BinaryOffsetChunked> {
let mut rows_arr = _get_rows_encoded(by, descending, nulls_last)?.into_array();
if broadcast_nulls {
let validities = by
.iter()
.map(|c| c.as_materialized_series().rechunk_validity())
.collect_vec();
let combined = combine_validities_and_many(&validities);
rows_arr.set_validity(combined);
}
Ok(BinaryOffsetChunked::with_chunk(name, rows_arr))
}
pub fn _get_rows_encoded_arr(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
broadcast_nulls: bool,
) -> PolarsResult<BinaryArray<i64>> {
let mut rows_arr = _get_rows_encoded(by, descending, nulls_last)?.into_array();
if broadcast_nulls {
let validities = by
.iter()
.map(|c| c.as_materialized_series().rechunk_validity())
.collect_vec();
let combined = combine_validities_and_many(&validities);
rows_arr.set_validity(combined);
}
Ok(rows_arr)
}
pub fn _get_rows_encoded_ca_unordered(
name: PlSmallStr,
by: &[Column],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}
#[cfg(feature = "dtype-struct")]
pub fn row_encoding_decode(
ca: &BinaryOffsetChunked,
fields: &[Field],
opts: &[RowEncodingOptions],
) -> PolarsResult<StructChunked> {
let (ctxts, dtypes) = fields
.iter()
.map(|f| {
(
get_row_encoding_context(f.dtype()),
f.dtype().to_physical().to_arrow(CompatLevel::newest()),
)
})
.collect::<(Vec<_>, Vec<_>)>();
let struct_arrow_dtype = ArrowDataType::Struct(
fields
.iter()
.map(|v| v.to_physical().to_arrow(CompatLevel::newest()))
.collect(),
);
let mut rows = Vec::new();
let chunks = ca
.downcast_iter()
.map(|array| {
let decoded_arrays = unsafe {
polars_row::decode::decode_rows_from_binary(array, opts, &ctxts, &dtypes, &mut rows)
};
assert_eq!(decoded_arrays.len(), fields.len());
StructArray::new(
struct_arrow_dtype.clone(),
array.len(),
decoded_arrays,
None,
)
.to_boxed()
})
.collect::<Vec<_>>();
Ok(unsafe {
StructChunked::from_chunks_and_dtype(
ca.name().clone(),
chunks,
DataType::Struct(fields.to_vec()),
)
})
}