use arrow::array::{Array, IntoBoxedArray};
use polars_compute::find_validity_mismatch::find_validity_mismatch;
use polars_utils::IdxSize;
use super::ListChunked;
use crate::chunked_array::flags::StatisticsFlags;
use crate::prelude::{ChunkedArray, FalseT, PolarsDataType};
use crate::series::Series;
use crate::series::implementations::null::NullChunked;
use crate::utils::align_chunks_binary_ca_series;
pub trait ChunkNestingUtils: Sized {
fn propagate_nulls(&self) -> Option<Self>;
fn trim_lists_to_normalized_offsets(&self) -> Option<Self>;
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>);
}
impl ChunkNestingUtils for ListChunked {
fn propagate_nulls(&self) -> Option<Self> {
use polars_compute::propagate_nulls::propagate_nulls_list;
let flags = self.get_flags();
if flags.has_propagated_nulls() {
return None;
}
if !self.inner_dtype().is_nested() && !self.has_nulls() {
self.flags
.set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
return None;
}
let mut chunks = Vec::new();
for (i, chunk) in self.downcast_iter().enumerate() {
if let Some(propagated_chunk) = propagate_nulls_list(chunk) {
chunks.reserve(self.chunks.len());
chunks.extend(self.chunks[..i].iter().cloned());
chunks.push(propagated_chunk.into_boxed());
break;
}
}
if !chunks.is_empty() {
chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
match propagate_nulls_list(chunk) {
None => chunk.to_boxed(),
Some(chunk) => chunk.into_boxed(),
}
}));
let mut ca = unsafe {
Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
};
ca.set_flags(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
return Some(ca);
}
self.flags
.set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
None
}
fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
use polars_compute::trim_lists_to_normalized_offsets::trim_lists_to_normalized_offsets_list;
let flags = self.get_flags();
if flags.has_trimmed_lists_to_normalized_offsets() {
return None;
}
let mut chunks = Vec::new();
for (i, chunk) in self.downcast_iter().enumerate() {
if let Some(trimmed) = trim_lists_to_normalized_offsets_list(chunk) {
chunks.reserve(self.chunks.len());
chunks.extend(self.chunks[..i].iter().cloned());
chunks.push(trimmed.into_boxed());
break;
}
}
if !chunks.is_empty() {
chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
match trim_lists_to_normalized_offsets_list(chunk) {
Some(chunk) => chunk.into_boxed(),
None => chunk.to_boxed(),
}
}));
let mut ca = unsafe {
Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
};
ca.set_flags(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
return Some(ca);
}
self.flags
.set(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
None
}
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
let (slf, other) = align_chunks_binary_ca_series(self, other);
let mut offset: IdxSize = 0;
for (l, r) in slf.downcast_iter().zip(other.chunks()) {
let start_length = idxs.len();
find_validity_mismatch(l, r.as_ref(), idxs);
for idx in idxs[start_length..].iter_mut() {
*idx += offset;
}
offset += l.len() as IdxSize;
}
}
}
#[cfg(feature = "dtype-array")]
impl ChunkNestingUtils for super::ArrayChunked {
fn propagate_nulls(&self) -> Option<Self> {
use polars_compute::propagate_nulls::propagate_nulls_fsl;
let flags = self.get_flags();
if flags.has_propagated_nulls() {
return None;
}
if !self.inner_dtype().is_nested() && !self.has_nulls() {
self.flags
.set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
return None;
}
let mut chunks = Vec::new();
for (i, chunk) in self.downcast_iter().enumerate() {
if let Some(propagated_chunk) = propagate_nulls_fsl(chunk) {
chunks.reserve(self.chunks.len());
chunks.extend(self.chunks[..i].iter().cloned());
chunks.push(propagated_chunk.into_boxed());
break;
}
}
if !chunks.is_empty() {
chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
match propagate_nulls_fsl(chunk) {
None => chunk.to_boxed(),
Some(chunk) => chunk.into_boxed(),
}
}));
let mut ca = unsafe {
Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
};
ca.set_flags(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
return Some(ca);
}
self.flags
.set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
None
}
fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
use polars_compute::trim_lists_to_normalized_offsets::trim_lists_to_normalized_offsets_fsl;
let flags = self.get_flags();
if flags.has_trimmed_lists_to_normalized_offsets()
|| !self.inner_dtype().contains_list_recursive()
{
return None;
}
let mut chunks = Vec::new();
for (i, chunk) in self.downcast_iter().enumerate() {
if let Some(trimmed) = trim_lists_to_normalized_offsets_fsl(chunk) {
chunks.reserve(self.chunks.len());
chunks.extend(self.chunks[..i].iter().cloned());
chunks.push(trimmed.into_boxed());
break;
}
}
if !chunks.is_empty() {
chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
match trim_lists_to_normalized_offsets_fsl(chunk) {
Some(chunk) => chunk.into_boxed(),
None => chunk.to_boxed(),
}
}));
let mut ca = unsafe {
Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
};
ca.set_flags(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
return Some(ca);
}
self.flags
.set(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
None
}
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
let (slf, other) = align_chunks_binary_ca_series(self, other);
let mut offset: IdxSize = 0;
for (l, r) in slf.downcast_iter().zip(other.chunks()) {
let start_length = idxs.len();
find_validity_mismatch(l, r.as_ref(), idxs);
for idx in idxs[start_length..].iter_mut() {
*idx += offset;
}
offset += l.len() as IdxSize;
}
}
}
#[cfg(feature = "dtype-struct")]
impl ChunkNestingUtils for super::StructChunked {
fn propagate_nulls(&self) -> Option<Self> {
use polars_compute::propagate_nulls::propagate_nulls_struct;
let flags = self.get_flags();
if flags.has_propagated_nulls() {
return None;
}
if self.struct_fields().iter().all(|f| !f.dtype().is_nested()) && !self.has_nulls() {
self.flags
.set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
return None;
}
let mut chunks = Vec::new();
for (i, chunk) in self.downcast_iter().enumerate() {
if let Some(propagated_chunk) = propagate_nulls_struct(chunk) {
chunks.reserve(self.chunks.len());
chunks.extend(self.chunks[..i].iter().cloned());
chunks.push(propagated_chunk.into_boxed());
break;
}
}
if !chunks.is_empty() {
chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
match propagate_nulls_struct(chunk) {
None => chunk.to_boxed(),
Some(chunk) => chunk.into_boxed(),
}
}));
let mut ca = unsafe {
Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
};
ca.set_flags(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
return Some(ca);
}
self.flags
.set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
None
}
fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
use polars_compute::trim_lists_to_normalized_offsets::trim_lists_to_normalized_offsets_struct;
let flags = self.get_flags();
if flags.has_trimmed_lists_to_normalized_offsets()
|| !self
.struct_fields()
.iter()
.any(|f| f.dtype().contains_list_recursive())
{
return None;
}
let mut chunks = Vec::new();
for (i, chunk) in self.downcast_iter().enumerate() {
if let Some(trimmed) = trim_lists_to_normalized_offsets_struct(chunk) {
chunks.reserve(self.chunks.len());
chunks.extend(self.chunks[..i].iter().cloned());
chunks.push(trimmed.into_boxed());
break;
}
}
if !chunks.is_empty() {
chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
match trim_lists_to_normalized_offsets_struct(chunk) {
Some(chunk) => chunk.into_boxed(),
None => chunk.to_boxed(),
}
}));
let mut ca = unsafe {
Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
};
ca.set_flags(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
return Some(ca);
}
self.flags
.set(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
None
}
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
let (slf, other) = align_chunks_binary_ca_series(self, other);
let mut offset: IdxSize = 0;
for (l, r) in slf.downcast_iter().zip(other.chunks()) {
let start_length = idxs.len();
find_validity_mismatch(l, r.as_ref(), idxs);
for idx in idxs[start_length..].iter_mut() {
*idx += offset;
}
offset += l.len() as IdxSize;
}
}
}
impl<T: PolarsDataType<IsNested = FalseT>> ChunkNestingUtils for ChunkedArray<T> {
fn propagate_nulls(&self) -> Option<Self> {
None
}
fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
None
}
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
let slf_nc = self.null_count();
let other_nc = other.null_count();
if slf_nc == other_nc && (slf_nc == 0 || slf_nc == self.len()) {
return;
}
let (slf, other) = align_chunks_binary_ca_series(self, other);
let mut offset: IdxSize = 0;
for (l, r) in slf.downcast_iter().zip(other.chunks()) {
let start_length = idxs.len();
find_validity_mismatch(l, r.as_ref(), idxs);
for idx in idxs[start_length..].iter_mut() {
*idx += offset;
}
offset += l.len() as IdxSize;
}
}
}
impl ChunkNestingUtils for NullChunked {
fn propagate_nulls(&self) -> Option<Self> {
None
}
fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
None
}
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
let other_nc = other.null_count();
if other_nc == self.len() {
return;
}
match other.rechunk_validity() {
None => idxs.extend(0..self.len() as IdxSize),
Some(v) => idxs.extend(v.true_idx_iter().map(|v| v as IdxSize)),
}
}
}