#![allow(unsafe_op_in_unsafe_fn)]
use std::rc::Rc;
use polars_core::series::amortized_iter::AmortSeries;
use rayon::iter::IntoParallelIterator;
use rayon::prelude::*;
use super::*;
impl AggregationContext<'_> {
pub(super) fn iter_groups(
&mut self,
keep_names: bool,
) -> Box<dyn Iterator<Item = Option<AmortSeries>> + '_> {
match self.agg_state() {
AggState::LiteralScalar(_) => {
self.groups();
let c = self.get_values().rechunk();
let name = if keep_names {
c.name().clone()
} else {
PlSmallStr::EMPTY
};
unsafe {
Box::new(LitIter::new(
c.as_materialized_series().array_ref(0).clone(),
self.groups.len(),
c.dtype(),
name,
))
}
},
AggState::AggregatedScalar(_) => {
self.groups();
let c = self.get_values();
let name = if keep_names {
c.name().clone()
} else {
PlSmallStr::EMPTY
};
unsafe {
Box::new(FlatIter::new(
c.as_materialized_series().chunks(),
self.groups.len(),
c.dtype(),
name,
))
}
},
AggState::AggregatedList(_) => {
let c = self.get_values();
let list = c.list().unwrap();
let name = if keep_names {
c.name().clone()
} else {
PlSmallStr::EMPTY
};
Box::new(list.amortized_iter_with_name(name))
},
AggState::NotAggregated(_) => {
let _ = self.aggregated();
let c = self.get_values();
let list = c.list().unwrap();
let name = if keep_names {
c.name().clone()
} else {
PlSmallStr::EMPTY
};
Box::new(list.amortized_iter_with_name(name))
},
}
}
}
impl AggregationContext<'_> {
pub(super) fn iter_groups_lazy(&mut self) -> impl Iterator<Item = Option<Series>> + '_ {
match self.agg_state() {
AggState::NotAggregated(_) => {
let groups = self.groups();
let len = groups.len();
let groups = Arc::new(groups.clone());
let c = self.get_values().rechunk();
let col = Arc::new(c);
(0..len).map(move |idx| {
let g = groups.get(idx);
match g {
GroupsIndicator::Idx(_) => unreachable!(),
GroupsIndicator::Slice(s) => Some(
col.slice(s[0] as i64, s[1] as usize)
.into_materialized_series()
.clone(),
),
}
})
},
_ => unreachable!(),
}
}
pub(super) fn par_iter_groups_lazy(
&mut self,
) -> impl IndexedParallelIterator<Item = Option<Series>> + '_ {
match self.agg_state() {
AggState::NotAggregated(_) => {
let groups = self.groups();
let len = groups.len();
let groups = Arc::new(groups.clone());
let c = self.get_values().rechunk();
let col = Arc::new(c);
(0..len).into_par_iter().map(move |idx| {
let g = groups.get(idx);
match g {
GroupsIndicator::Idx(_) => unreachable!(),
GroupsIndicator::Slice(s) => Some(
col.slice(s[0] as i64, s[1] as usize)
.into_materialized_series()
.clone(),
),
}
})
},
_ => unreachable!(),
}
}
}
struct LitIter {
len: usize,
offset: usize,
#[allow(dead_code)]
series_container: Rc<Series>,
item: AmortSeries,
}
impl LitIter {
unsafe fn new(array: ArrayRef, len: usize, logical: &DataType, name: PlSmallStr) -> Self {
let series_container = Rc::new(Series::from_chunks_and_dtype_unchecked(
name,
vec![array],
logical,
));
Self {
offset: 0,
len,
series_container: series_container.clone(),
item: AmortSeries::new(series_container),
}
}
}
impl Iterator for LitIter {
type Item = Option<AmortSeries>;
fn next(&mut self) -> Option<Self::Item> {
if self.len == self.offset {
None
} else {
self.offset += 1;
Some(Some(self.item.clone()))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}
struct FlatIter {
current_array: ArrayRef,
chunks: Vec<ArrayRef>,
offset: usize,
chunk_offset: usize,
len: usize,
#[allow(dead_code)]
series_container: Rc<Series>,
item: AmortSeries,
}
impl FlatIter {
unsafe fn new(chunks: &[ArrayRef], len: usize, logical: &DataType, name: PlSmallStr) -> Self {
let mut stack = Vec::with_capacity(chunks.len());
for chunk in chunks.iter().rev() {
stack.push(chunk.clone())
}
let current_array = stack.pop().unwrap();
let series_container = Rc::new(Series::from_chunks_and_dtype_unchecked(
name,
vec![current_array.clone()],
logical,
));
Self {
current_array,
chunks: stack,
offset: 0,
chunk_offset: 0,
len,
series_container: series_container.clone(),
item: AmortSeries::new(series_container),
}
}
}
impl Iterator for FlatIter {
type Item = Option<AmortSeries>;
fn next(&mut self) -> Option<Self::Item> {
if self.len == self.offset {
None
} else {
if self.chunk_offset < self.current_array.len() {
let mut arr = unsafe { self.current_array.sliced_unchecked(self.chunk_offset, 1) };
unsafe { self.item.swap(&mut arr) };
} else {
match self.chunks.pop() {
Some(arr) => {
self.current_array = arr;
self.chunk_offset = 0;
return self.next();
},
None => return None,
}
}
self.offset += 1;
self.chunk_offset += 1;
Some(Some(self.item.clone()))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len - self.offset, Some(self.len - self.offset))
}
}