use std::borrow::{Borrow, Cow};
use std::collections::LinkedList;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::sync::Arc;
use arrow::array::{BooleanArray, PrimitiveArray, Utf8Array};
use arrow::bitmap::{Bitmap, MutableBitmap};
use polars_utils::sync::SyncPtr;
use rayon::iter::{FromParallelIterator, IntoParallelIterator};
use rayon::prelude::*;
use crate::chunked_array::builder::{
get_list_builder, AnonymousListBuilder, AnonymousOwnedListBuilder,
};
#[cfg(feature = "dtype-array")]
use crate::chunked_array::builder::{AnonymousOwnedFixedSizeListBuilder, FixedSizeListBuilder};
#[cfg(feature = "object")]
use crate::chunked_array::object::ObjectArray;
use crate::prelude::*;
use crate::utils::flatten::flatten_par;
use crate::utils::{get_iter_capacity, CustomIterTools, NoNull};
impl<T: PolarsDataType> Default for ChunkedArray<T> {
fn default() -> Self {
ChunkedArray {
field: Arc::new(Field::new("default", DataType::Null)),
chunks: Default::default(),
phantom: PhantomData,
bit_settings: Default::default(),
length: 0,
}
}
}
impl<T> FromIterator<Option<T::Native>> for ChunkedArray<T>
where
T: PolarsNumericType,
{
fn from_iter<I: IntoIterator<Item = Option<T::Native>>>(iter: I) -> Self {
let iter = iter.into_iter();
let arr: PrimitiveArray<T::Native> = match iter.size_hint() {
(a, Some(b)) if a == b => {
#[cfg(feature = "performant")]
unsafe {
let arr = PrimitiveArray::from_trusted_len_iter_unchecked(iter)
.to(T::get_dtype().to_arrow());
assert_eq!(arr.len(), a);
arr
}
#[cfg(not(feature = "performant"))]
iter.collect::<PrimitiveArray<T::Native>>()
.to(T::get_dtype().to_arrow())
}
_ => iter
.collect::<PrimitiveArray<T::Native>>()
.to(T::get_dtype().to_arrow()),
};
unsafe { ChunkedArray::from_chunks("", vec![Box::new(arr)]) }
}
}
impl<T> FromIterator<T::Native> for NoNull<ChunkedArray<T>>
where
T: PolarsNumericType,
{
fn from_iter<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {
let av = iter.into_iter().collect::<Vec<T::Native>>();
NoNull::new(ChunkedArray::from_vec("", av))
}
}
impl FromIterator<Option<bool>> for ChunkedArray<BooleanType> {
fn from_iter<I: IntoIterator<Item = Option<bool>>>(iter: I) -> Self {
let arr = BooleanArray::from_iter(iter);
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl FromIterator<bool> for BooleanChunked {
fn from_iter<I: IntoIterator<Item = bool>>(iter: I) -> Self {
let arr = BooleanArray::from_iter(iter.into_iter().map(Some));
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl FromIterator<bool> for NoNull<BooleanChunked> {
fn from_iter<I: IntoIterator<Item = bool>>(iter: I) -> Self {
let ca = iter.into_iter().collect::<BooleanChunked>();
NoNull::new(ca)
}
}
impl<Ptr> FromIterator<Option<Ptr>> for Utf8Chunked
where
Ptr: AsRef<str>,
{
fn from_iter<I: IntoIterator<Item = Option<Ptr>>>(iter: I) -> Self {
let arr = Utf8Array::<i64>::from_iter(iter);
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
pub trait PolarsAsRef<T: ?Sized>: AsRef<T> {}
impl PolarsAsRef<str> for String {}
impl PolarsAsRef<str> for &str {}
impl PolarsAsRef<str> for &&str {}
impl<'a> PolarsAsRef<str> for Cow<'a, str> {}
impl<Ptr> FromIterator<Ptr> for Utf8Chunked
where
Ptr: PolarsAsRef<str>,
{
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
let arr = Utf8Array::<i64>::from_iter_values(iter.into_iter());
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl<Ptr> FromIterator<Option<Ptr>> for BinaryChunked
where
Ptr: AsRef<[u8]>,
{
fn from_iter<I: IntoIterator<Item = Option<Ptr>>>(iter: I) -> Self {
let arr = BinaryArray::<i64>::from_iter(iter);
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl PolarsAsRef<[u8]> for Vec<u8> {}
impl PolarsAsRef<[u8]> for &[u8] {}
impl PolarsAsRef<[u8]> for &&[u8] {}
impl<'a> PolarsAsRef<[u8]> for Cow<'a, [u8]> {}
impl<Ptr> FromIterator<Ptr> for BinaryChunked
where
Ptr: PolarsAsRef<[u8]>,
{
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
let arr = BinaryArray::<i64>::from_iter_values(iter.into_iter());
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl<Ptr> FromIterator<Ptr> for ListChunked
where
Ptr: Borrow<Series>,
{
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
let mut it = iter.into_iter();
let capacity = get_iter_capacity(&it);
let v = match it.next() {
Some(v) => v,
None => return ListChunked::full_null("", 0),
};
let mut builder =
get_list_builder(v.borrow().dtype(), capacity * 5, capacity, "collected").unwrap();
builder.append_series(v.borrow());
for s in it {
builder.append_series(s.borrow());
}
builder.finish()
}
}
impl FromIterator<Option<Series>> for ListChunked {
fn from_iter<I: IntoIterator<Item = Option<Series>>>(iter: I) -> Self {
let mut it = iter.into_iter();
let capacity = get_iter_capacity(&it);
let first_value;
let mut init_null_count = 0;
loop {
match it.next() {
Some(Some(s)) => {
first_value = Some(s);
break;
}
Some(None) => {
init_null_count += 1;
}
None => return ListChunked::full_null("", init_null_count),
}
}
match first_value {
None => {
unreachable!()
}
Some(ref first_s) => {
if matches!(first_s.dtype(), DataType::Null) && first_s.is_empty() {
let mut builder = AnonymousOwnedListBuilder::new("collected", capacity, None);
for _ in 0..init_null_count {
builder.append_null();
}
builder.append_empty();
for opt_s in it {
builder.append_opt_series(opt_s.as_ref());
}
builder.finish()
} else {
match first_s.dtype() {
#[cfg(feature = "object")]
DataType::Object(_) => {
let mut builder =
first_s.get_list_builder("collected", capacity * 5, capacity);
for _ in 0..init_null_count {
builder.append_null();
}
builder.append_series(first_s);
for opt_s in it {
builder.append_opt_series(opt_s.as_ref());
}
builder.finish()
}
_ => {
let mut builder = get_list_builder(
first_s.dtype(),
capacity * 5,
capacity,
"collected",
)
.unwrap();
for _ in 0..init_null_count {
builder.append_null();
}
builder.append_series(first_s);
for opt_s in it {
builder.append_opt_series(opt_s.as_ref());
}
builder.finish()
}
}
}
}
}
}
}
impl FromIterator<Option<Box<dyn Array>>> for ListChunked {
fn from_iter<I: IntoIterator<Item = Option<Box<dyn Array>>>>(iter: I) -> Self {
let mut cap = 0;
let mut dtype: Option<DataType> = None;
let vals = iter
.into_iter()
.map(|opt_arr| {
opt_arr.map(|arr| {
if dtype.is_none() {
dtype = Some(arr.data_type().into());
}
cap += arr.len();
arr
})
})
.collect::<Vec<_>>();
let mut builder = AnonymousListBuilder::new("collected", cap, None);
for val in &vals {
builder.append_opt_array(val.as_deref());
}
builder.finish()
}
}
#[cfg(feature = "dtype-array")]
impl ArrayChunked {
pub(crate) unsafe fn from_iter_and_args<I: IntoIterator<Item = Option<Box<dyn Array>>>>(
iter: I,
width: usize,
capacity: usize,
inner_dtype: Option<DataType>,
name: &str,
) -> Self {
let mut builder =
AnonymousOwnedFixedSizeListBuilder::new(name, width, capacity, inner_dtype);
for val in iter {
match val {
None => builder.push_null(),
Some(arr) => builder.push_unchecked(arr.as_ref(), 0),
}
}
builder.finish()
}
}
#[cfg(feature = "object")]
impl<T: PolarsObject> FromIterator<Option<T>> for ObjectChunked<T> {
fn from_iter<I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
let iter = iter.into_iter();
let size = iter.size_hint().0;
let mut null_mask_builder = MutableBitmap::with_capacity(size);
let values: Vec<T> = iter
.map(|value| match value {
Some(value) => {
null_mask_builder.push(true);
value
}
None => {
null_mask_builder.push(false);
T::default()
}
})
.collect();
let null_bit_buffer: Option<Bitmap> = null_mask_builder.into();
let null_bitmap = null_bit_buffer;
let len = values.len();
let arr = Box::new(ObjectArray {
values: Arc::new(values),
null_bitmap,
offset: 0,
len,
});
let mut out = ChunkedArray {
field: Arc::new(Field::new("", DataType::Object(T::type_name()))),
chunks: vec![arr],
phantom: PhantomData,
bit_settings: Default::default(),
length: 0,
};
out.compute_len();
out
}
}
fn vec_push<T>(mut vec: Vec<T>, elem: T) -> Vec<T> {
vec.push(elem);
vec
}
fn as_list<T>(item: T) -> LinkedList<T> {
let mut list = LinkedList::new();
list.push_back(item);
list
}
fn list_append<T>(mut list1: LinkedList<T>, mut list2: LinkedList<T>) -> LinkedList<T> {
list1.append(&mut list2);
list1
}
fn collect_into_linked_list<I>(par_iter: I) -> LinkedList<Vec<I::Item>>
where
I: IntoParallelIterator,
{
let it = par_iter.into_par_iter();
it.fold(Vec::new, vec_push)
.map(as_list)
.reduce(LinkedList::new, list_append)
}
fn get_capacity_from_par_results<T>(ll: &LinkedList<Vec<T>>) -> usize {
ll.iter().map(|list| list.len()).sum()
}
fn get_capacity_from_par_results_slice<T>(bufs: &[Vec<T>]) -> usize {
bufs.iter().map(|list| list.len()).sum()
}
fn get_offsets<T>(bufs: &[Vec<T>]) -> Vec<usize> {
bufs.iter()
.scan(0usize, |acc, buf| {
let out = *acc;
*acc += buf.len();
Some(out)
})
.collect()
}
impl<T> FromParallelIterator<T::Native> for NoNull<ChunkedArray<T>>
where
T: PolarsNumericType,
{
fn from_par_iter<I: IntoParallelIterator<Item = T::Native>>(iter: I) -> Self {
let vectors = collect_into_linked_list(iter);
let capacity: usize = get_capacity_from_par_results(&vectors);
let mut av = Vec::<T::Native>::with_capacity(capacity);
for v in vectors {
av.extend_from_slice(&v)
}
let arr = to_array::<T>(av, None);
unsafe { NoNull::new(ChunkedArray::from_chunks("", vec![arr])) }
}
}
fn finish_validities(validities: Vec<(Option<Bitmap>, usize)>, capacity: usize) -> Option<Bitmap> {
if validities.iter().any(|(v, _)| v.is_some()) {
let mut bitmap = MutableBitmap::with_capacity(capacity);
for (valids, len) in validities {
if let Some(valids) = valids {
bitmap.extend_from_bitmap(&(valids))
} else {
bitmap.extend_constant(len, true)
}
}
Some(bitmap.into())
} else {
None
}
}
impl<T> FromParallelIterator<Option<T::Native>> for ChunkedArray<T>
where
T: PolarsNumericType,
{
fn from_par_iter<I: IntoParallelIterator<Item = Option<T::Native>>>(iter: I) -> Self {
let vectors = collect_into_linked_list(iter);
let vectors = vectors.into_iter().collect::<Vec<_>>();
let capacity: usize = get_capacity_from_par_results_slice(&vectors);
let offsets = get_offsets(&vectors);
let mut values_buf: Vec<T::Native> = Vec::with_capacity(capacity);
let values_buf_ptr = unsafe { SyncPtr::new(values_buf.as_mut_ptr()) };
let validities = offsets
.into_par_iter()
.zip(vectors)
.map(|(offset, vector)| {
let mut local_validity = None;
let local_len = vector.len();
let mut latest_validy_written = 0;
unsafe {
let values_buf_ptr = values_buf_ptr.get().add(offset);
for (i, opt_v) in vector.into_iter().enumerate() {
match opt_v {
Some(v) => {
std::ptr::write(values_buf_ptr.add(i), v);
}
None => {
let validity = match &mut local_validity {
None => {
let validity = MutableBitmap::with_capacity(local_len);
local_validity = Some(validity);
local_validity.as_mut().unwrap_unchecked()
}
Some(validity) => validity,
};
validity.extend_constant(i - latest_validy_written, true);
latest_validy_written = i + 1;
validity.push_unchecked(false);
std::ptr::write(values_buf_ptr.add(i), T::Native::default());
}
}
}
}
if let Some(validity) = &mut local_validity {
validity.extend_constant(local_len - latest_validy_written, true);
}
(local_validity.map(|b| b.into()), local_len)
})
.collect::<Vec<_>>();
unsafe { values_buf.set_len(capacity) };
let validity = finish_validities(validities, capacity);
let arr = PrimitiveArray::from_data_default(values_buf.into(), validity);
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl FromParallelIterator<bool> for BooleanChunked {
fn from_par_iter<I: IntoParallelIterator<Item = bool>>(iter: I) -> Self {
let vectors = collect_into_linked_list(iter);
let capacity: usize = get_capacity_from_par_results(&vectors);
let arr = unsafe {
BooleanArray::from_trusted_len_values_iter(
vectors.into_iter().flatten().trust_my_length(capacity),
)
};
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl FromParallelIterator<Option<bool>> for BooleanChunked {
fn from_par_iter<I: IntoParallelIterator<Item = Option<bool>>>(iter: I) -> Self {
let vectors = collect_into_linked_list(iter);
let vectors = vectors.into_iter().collect::<Vec<_>>();
let chunks = vectors
.into_par_iter()
.map(|vector| {
Box::new(unsafe {
BooleanArray::from_trusted_len_iter_unchecked(vector.into_iter())
}) as ArrayRef
})
.collect::<Vec<_>>();
unsafe { BooleanChunked::from_chunks("", chunks).rechunk() }
}
}
impl<Ptr> FromParallelIterator<Ptr> for Utf8Chunked
where
Ptr: PolarsAsRef<str> + Send + Sync,
{
fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
let vectors = collect_into_linked_list(iter);
let cap = get_capacity_from_par_results(&vectors);
let mut builder = MutableUtf8ValuesArray::with_capacities(cap, cap * 10);
for vec in vectors {
for val in vec {
builder.push(val.as_ref())
}
}
let arr: LargeStringArray = builder.into();
unsafe { Self::from_chunks("", vec![Box::new(arr)]) }
}
}
impl<Ptr> FromParallelIterator<Option<Ptr>> for Utf8Chunked
where
Ptr: AsRef<str> + Send + Sync,
{
fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
let vectors = collect_into_linked_list(iter);
let vectors = vectors.into_iter().collect::<Vec<_>>();
let arrays = vectors
.into_par_iter()
.map(|vector| {
let cap = vector.len();
let mut builder = MutableUtf8Array::with_capacities(cap, cap * 10);
for opt_val in vector {
builder.push(opt_val)
}
let arr: LargeStringArray = builder.into();
arr
})
.collect::<Vec<_>>();
let mut len = 0;
let mut thread_offsets = Vec::with_capacity(arrays.len());
let values = arrays
.iter()
.map(|arr| {
thread_offsets.push(len);
len += arr.len();
arr.values().as_slice()
})
.collect::<Vec<_>>();
let values = flatten_par(&values);
let validity = finish_validities(
arrays
.iter()
.map(|arr| {
let local_len = arr.len();
(arr.validity().cloned(), local_len)
})
.collect(),
len,
);
let mut offsets = Vec::with_capacity(len + 1);
let mut offsets_so_far = 0;
let mut first = true;
for array in &arrays {
let local_offsets = array.offsets().as_slice();
if first {
offsets.extend_from_slice(local_offsets);
first = false;
} else {
unsafe {
offsets.extend(
local_offsets
.get_unchecked(1..)
.iter()
.map(|v| *v + offsets_so_far),
)
}
}
offsets_so_far = unsafe { *offsets.last().unwrap_unchecked() };
}
unsafe {
offsets.set_len(len + 1);
let arr = Utf8Array::<i64>::from_data_unchecked_default(
offsets.into(),
values.into(),
validity,
);
Self::from_chunks("", vec![Box::new(arr)])
}
}
}
impl<'a> From<&'a Utf8Chunked> for Vec<Option<&'a str>> {
fn from(ca: &'a Utf8Chunked) -> Self {
ca.into_iter().collect()
}
}
impl From<Utf8Chunked> for Vec<Option<String>> {
fn from(ca: Utf8Chunked) -> Self {
ca.into_iter()
.map(|opt| opt.map(|s| s.to_string()))
.collect()
}
}
impl<'a> From<&'a BooleanChunked> for Vec<Option<bool>> {
fn from(ca: &'a BooleanChunked) -> Self {
ca.into_iter().collect()
}
}
impl From<BooleanChunked> for Vec<Option<bool>> {
fn from(ca: BooleanChunked) -> Self {
ca.into_iter().collect()
}
}
impl<'a, T> From<&'a ChunkedArray<T>> for Vec<Option<T::Native>>
where
T: PolarsNumericType,
{
fn from(ca: &'a ChunkedArray<T>) -> Self {
ca.into_iter().collect()
}
}
impl FromParallelIterator<Option<Series>> for ListChunked {
fn from_par_iter<I>(iter: I) -> Self
where
I: IntoParallelIterator<Item = Option<Series>>,
{
let mut dtype = None;
let vectors = collect_into_linked_list(iter);
let list_capacity: usize = get_capacity_from_par_results(&vectors);
let value_capacity = vectors
.iter()
.map(|list| {
list.iter()
.map(|opt_s| {
opt_s
.as_ref()
.map(|s| {
if dtype.is_none() && !matches!(s.dtype(), DataType::Null) {
dtype = Some(s.dtype().clone())
}
s.len()
})
.unwrap_or(0)
})
.sum::<usize>()
})
.sum::<usize>();
match &dtype {
#[cfg(feature = "object")]
Some(DataType::Object(_)) => {
let s = vectors
.iter()
.flatten()
.find_map(|opt_s| opt_s.as_ref())
.unwrap();
let mut builder = s.get_list_builder("collected", value_capacity, list_capacity);
for v in vectors {
for val in v {
builder.append_opt_series(val.as_ref());
}
}
builder.finish()
}
Some(dtype) => {
let mut builder =
get_list_builder(dtype, value_capacity, list_capacity, "collected").unwrap();
for v in &vectors {
for val in v {
builder.append_opt_series(val.as_ref());
}
}
builder.finish()
}
None => ListChunked::full_null_with_dtype("collected", list_capacity, &DataType::Null),
}
}
}
#[cfg(test)]
mod test {
use crate::prelude::*;
#[test]
fn test_collect_into_list() {
let s1 = Series::new("", &[true, false, true]);
let s2 = Series::new("", &[true, false, true]);
let ll: ListChunked = [&s1, &s2].iter().copied().collect();
assert_eq!(ll.len(), 2);
assert_eq!(ll.null_count(), 0);
let ll: ListChunked = [None, Some(s2)].into_iter().collect();
assert_eq!(ll.len(), 2);
assert_eq!(ll.null_count(), 1);
}
}