use std::mem;
use std::ptr;
use std::sync::Arc;
use crate::enums::array::Array;
use crate::enums::collections::numeric_array::NumericArray;
#[cfg(feature = "datetime")]
use crate::enums::collections::temporal_array::TemporalArray;
use crate::enums::collections::text_array::TextArray;
#[cfg(feature = "datetime")]
use crate::enums::time_units::TimeUnit;
use crate::ffi::arrow_dtype::{ArrowType, CategoricalIndexType};
use crate::structs::shared_buffer::SharedBuffer;
use crate::utils::align64;
use crate::{Bitmask, Buffer};
use vec64::Vec64;
pub struct Arena {
buffer: Vec64<u8>,
cursor: usize,
}
impl Arena {
#[inline]
pub fn with_capacity(bytes: usize) -> Self {
let mut buffer = Vec64::with_capacity(bytes);
buffer.resize(bytes, 0);
Self { buffer, cursor: 0 }
}
#[inline]
pub fn used(&self) -> usize {
self.cursor
}
#[inline]
pub fn remaining(&self) -> usize {
self.buffer.len().saturating_sub(self.cursor)
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.len()
}
#[inline]
fn align_cursor(&mut self) {
self.cursor = (self.cursor + 63) & !63;
}
#[inline]
pub fn push_slice<T: Copy>(&mut self, data: &[T]) -> ArenaRegion {
self.align_cursor();
let byte_len = data.len() * mem::size_of::<T>();
assert!(
self.cursor + byte_len <= self.buffer.len(),
"Arena overflow: need {} bytes at offset {}, but capacity is {}",
byte_len,
self.cursor,
self.buffer.len()
);
unsafe {
ptr::copy_nonoverlapping(
data.as_ptr() as *const u8,
self.buffer.as_mut_ptr().add(self.cursor),
byte_len,
);
}
let region = ArenaRegion {
byte_offset: self.cursor,
byte_len,
};
self.cursor += byte_len;
region
}
#[inline]
pub fn push_bitmask(&mut self, mask: &Bitmask) -> ArenaRegion {
self.push_slice(mask.bits.as_slice())
}
#[inline]
pub fn reserve_slice<T>(&mut self, count: usize) -> ArenaRegion {
self.align_cursor();
let byte_len = count * mem::size_of::<T>();
assert!(
self.cursor + byte_len <= self.buffer.len(),
"Arena overflow: need {} bytes at offset {}, but capacity is {}",
byte_len,
self.cursor,
self.buffer.len()
);
let region = ArenaRegion {
byte_offset: self.cursor,
byte_len,
};
self.cursor += byte_len;
region
}
#[inline]
pub fn region_as_mut_slice<T>(&mut self, region: &ArenaRegion) -> &mut [T] {
let size_of_t = mem::size_of::<T>();
assert_eq!(
region.byte_len % size_of_t,
0,
"Region byte_len {} is not a multiple of size_of::<T>() = {}",
region.byte_len,
size_of_t
);
assert!(
region.byte_offset + region.byte_len <= self.buffer.len(),
"Region exceeds arena bounds"
);
let count = region.byte_len / size_of_t;
unsafe {
std::slice::from_raw_parts_mut(
self.buffer.as_mut_ptr().add(region.byte_offset) as *mut T,
count,
)
}
}
pub fn write_slices<T: Copy>(
&mut self,
slices: &[(&[T], Option<&Bitmask>)],
total_count: usize,
has_nulls: bool,
) -> AAMaker {
let data_region = self.reserve_slice::<T>(total_count);
let mask_region = if has_nulls {
Some(self.reserve_slice::<u8>((total_count + 7) / 8))
} else {
None
};
{
let dest = self.region_as_mut_slice::<T>(&data_region);
let mut pos = 0usize;
for (src, _) in slices {
dest[pos..pos + src.len()].copy_from_slice(src);
pos += src.len();
}
}
if mask_region.is_some() {
let mut mask = Bitmask::default();
for (src, null_mask) in slices {
match null_mask {
Some(src_mask) => mask.extend_from_bitmask(src_mask),
None => mask.resize(mask.len + src.len(), true),
}
}
self.region_as_mut_slice::<u8>(mask_region.as_ref().unwrap())
.copy_from_slice(mask.bits.as_slice());
}
AAMaker::Primitive {
data: data_region,
mask: mask_region,
}
}
pub fn write_string_slices<T>(
&mut self,
slices: &[(&[T], &[u8], Option<&Bitmask>)],
total_rows: usize,
total_data_bytes: usize,
has_nulls: bool,
) -> AAMaker
where
T: Copy
+ Default
+ std::ops::Add<Output = T>
+ std::ops::Sub<Output = T>
+ num_traits::ToPrimitive
+ num_traits::NumCast,
{
let offsets_region = self.reserve_slice::<T>(total_rows + 1);
let data_region = self.reserve_slice::<u8>(total_data_bytes);
let mask_region = if has_nulls {
Some(self.reserve_slice::<u8>((total_rows + 7) / 8))
} else {
None
};
unsafe {
let off_ptr = self.buffer.as_mut_ptr().add(offsets_region.byte_offset) as *mut T;
let dat_ptr = self.buffer.as_mut_ptr().add(data_region.byte_offset);
*off_ptr = T::default();
let mut off_pos = 1usize;
let mut data_pos = 0usize;
for (offsets, data, _) in slices {
let batch_len = offsets.len() - 1;
let byte_start = offsets[0].to_usize().unwrap();
let byte_end = offsets[batch_len].to_usize().unwrap();
let batch_data = &data[byte_start..byte_end];
ptr::copy_nonoverlapping(
batch_data.as_ptr(),
dat_ptr.add(data_pos),
batch_data.len(),
);
let base = offsets[0];
let shift: T = num_traits::cast(data_pos).unwrap();
for i in 1..=batch_len {
*off_ptr.add(off_pos) = shift + (offsets[i] - base);
off_pos += 1;
}
data_pos += batch_data.len();
}
}
if mask_region.is_some() {
let mut mask = Bitmask::default();
for (offsets, _, null_mask) in slices {
let batch_len = offsets.len() - 1;
match null_mask {
Some(src_mask) => mask.extend_from_bitmask(src_mask),
None => mask.resize(mask.len + batch_len, true),
}
}
self.region_as_mut_slice::<u8>(mask_region.as_ref().unwrap())
.copy_from_slice(mask.bits.as_slice());
}
AAMaker::String {
offsets: offsets_region,
data: data_region,
mask: mask_region,
}
}
pub fn write_boolean_slices(
&mut self,
slices: &[(&Bitmask, Option<&Bitmask>)],
total_rows: usize,
has_nulls: bool,
) -> AAMaker {
let data_region = self.reserve_slice::<u8>((total_rows + 7) / 8);
let mask_region = if has_nulls {
Some(self.reserve_slice::<u8>((total_rows + 7) / 8))
} else {
None
};
let mut data_builder = Bitmask::default();
let mut mask_builder: Option<Bitmask> = if has_nulls {
Some(Bitmask::default())
} else {
None
};
for (src_data, null_mask) in slices {
data_builder.extend_from_bitmask(src_data);
if let Some(ref mut mask) = mask_builder {
match null_mask {
Some(src_mask) => mask.extend_from_bitmask(src_mask),
None => mask.resize(mask.len + src_data.len(), true),
}
}
}
self.region_as_mut_slice::<u8>(&data_region)
.copy_from_slice(data_builder.bits.as_slice());
if let (Some(mask), Some(mr)) = (mask_builder, mask_region.as_ref()) {
self.region_as_mut_slice::<u8>(mr)
.copy_from_slice(mask.bits.as_slice());
}
AAMaker::Boolean {
data: data_region,
mask: mask_region,
}
}
pub fn capacity_for_regions(entries: &[(usize, usize)]) -> usize {
entries
.iter()
.map(|&(len, elem_size)| align64(len * elem_size))
.sum()
}
#[inline]
pub fn freeze(mut self) -> SharedBuffer {
self.buffer.truncate(self.cursor);
SharedBuffer::from_vec64(self.buffer)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ArenaRegion {
byte_offset: usize,
byte_len: usize,
}
impl ArenaRegion {
pub const EMPTY: Self = Self {
byte_offset: 0,
byte_len: 0,
};
#[inline]
pub fn byte_offset(&self) -> usize {
self.byte_offset
}
#[inline]
pub fn byte_len(&self) -> usize {
self.byte_len
}
#[inline]
pub fn to_buffer<T>(&self, shared: &SharedBuffer) -> Buffer<T> {
if self.byte_len == 0 {
return Buffer::default();
}
let slice = shared.slice(self.byte_offset..self.byte_offset + self.byte_len);
Buffer::from_shared(slice)
}
#[inline]
pub fn to_bitmask(&self, shared: &SharedBuffer, num_bits: usize) -> Bitmask {
if self.byte_len == 0 {
return Bitmask::default();
}
let slice = shared.slice(self.byte_offset..self.byte_offset + self.byte_len);
let buffer: Buffer<u8> = Buffer::from_shared(slice);
Bitmask::new(buffer, num_bits)
}
}
pub enum AAMaker {
Primitive {
data: ArenaRegion,
mask: Option<ArenaRegion>,
},
String {
offsets: ArenaRegion,
data: ArenaRegion,
mask: Option<ArenaRegion>,
},
Boolean {
data: ArenaRegion,
mask: Option<ArenaRegion>,
},
Categorical {
indices: ArenaRegion,
mask: Option<ArenaRegion>,
unique_values: Vec64<String>,
},
#[cfg(feature = "datetime")]
Temporal {
data: ArenaRegion,
mask: Option<ArenaRegion>,
time_unit: TimeUnit,
},
}
impl AAMaker {
pub fn to_array(self, dtype: &ArrowType, shared: &SharedBuffer, n_rows: usize) -> Array {
match (dtype, self) {
(ArrowType::Int32, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::Int32(Arc::new(crate::IntegerArray::new(
data.to_buffer::<i32>(shared),
m,
))))
}
(ArrowType::Int64, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::Int64(Arc::new(crate::IntegerArray::new(
data.to_buffer::<i64>(shared),
m,
))))
}
(ArrowType::UInt32, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::UInt32(Arc::new(crate::IntegerArray::new(
data.to_buffer::<u32>(shared),
m,
))))
}
(ArrowType::UInt64, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::UInt64(Arc::new(crate::IntegerArray::new(
data.to_buffer::<u64>(shared),
m,
))))
}
#[cfg(feature = "extended_numeric_types")]
(ArrowType::Int8, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::Int8(Arc::new(crate::IntegerArray::new(
data.to_buffer::<i8>(shared),
m,
))))
}
#[cfg(feature = "extended_numeric_types")]
(ArrowType::Int16, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::Int16(Arc::new(crate::IntegerArray::new(
data.to_buffer::<i16>(shared),
m,
))))
}
#[cfg(feature = "extended_numeric_types")]
(ArrowType::UInt8, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::UInt8(Arc::new(crate::IntegerArray::new(
data.to_buffer::<u8>(shared),
m,
))))
}
#[cfg(feature = "extended_numeric_types")]
(ArrowType::UInt16, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::UInt16(Arc::new(crate::IntegerArray::new(
data.to_buffer::<u16>(shared),
m,
))))
}
(ArrowType::Float32, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::Float32(Arc::new(crate::FloatArray::new(
data.to_buffer::<f32>(shared),
m,
))))
}
(ArrowType::Float64, AAMaker::Primitive { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::NumericArray(NumericArray::Float64(Arc::new(crate::FloatArray::new(
data.to_buffer::<f64>(shared),
m,
))))
}
(
ArrowType::String,
AAMaker::String {
offsets,
data,
mask,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TextArray(TextArray::String32(Arc::new(crate::StringArray::new(
data.to_buffer::<u8>(shared),
m,
offsets.to_buffer::<u32>(shared),
))))
}
#[cfg(feature = "large_string")]
(
ArrowType::LargeString,
AAMaker::String {
offsets,
data,
mask,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TextArray(TextArray::String64(Arc::new(crate::StringArray::new(
data.to_buffer::<u8>(shared),
m,
offsets.to_buffer::<u64>(shared),
))))
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
(
ArrowType::Dictionary(CategoricalIndexType::UInt32),
AAMaker::Categorical {
indices,
mask,
unique_values,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TextArray(TextArray::Categorical32(Arc::new(
crate::CategoricalArray::new(
indices.to_buffer::<u32>(shared),
unique_values,
m,
),
)))
}
#[cfg(feature = "default_categorical_8")]
(
ArrowType::Dictionary(CategoricalIndexType::UInt8),
AAMaker::Categorical {
indices,
mask,
unique_values,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TextArray(TextArray::Categorical8(Arc::new(
crate::CategoricalArray::new(indices.to_buffer::<u8>(shared), unique_values, m),
)))
}
#[cfg(feature = "extended_categorical")]
(
ArrowType::Dictionary(CategoricalIndexType::UInt16),
AAMaker::Categorical {
indices,
mask,
unique_values,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TextArray(TextArray::Categorical16(Arc::new(
crate::CategoricalArray::new(
indices.to_buffer::<u16>(shared),
unique_values,
m,
),
)))
}
#[cfg(feature = "extended_categorical")]
(
ArrowType::Dictionary(CategoricalIndexType::UInt64),
AAMaker::Categorical {
indices,
mask,
unique_values,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TextArray(TextArray::Categorical64(Arc::new(
crate::CategoricalArray::new(
indices.to_buffer::<u64>(shared),
unique_values,
m,
),
)))
}
(ArrowType::Boolean, AAMaker::Boolean { data, mask }) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
let data_bitmask = data.to_bitmask(shared, n_rows);
Array::BooleanArray(Arc::new(crate::BooleanArray::new(data_bitmask, m)))
}
#[cfg(feature = "datetime")]
(
ArrowType::Date32 | ArrowType::Time32(_) | ArrowType::Duration32(_),
AAMaker::Temporal {
data,
mask,
time_unit,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TemporalArray(TemporalArray::Datetime32(Arc::new(
crate::DatetimeArray::new(data.to_buffer::<i32>(shared), m, Some(time_unit)),
)))
}
#[cfg(feature = "datetime")]
(
ArrowType::Date64
| ArrowType::Time64(_)
| ArrowType::Duration64(_)
| ArrowType::Timestamp(_, _),
AAMaker::Temporal {
data,
mask,
time_unit,
},
) => {
let m = mask.map(|r| r.to_bitmask(shared, n_rows));
Array::TemporalArray(TemporalArray::Datetime64(Arc::new(
crate::DatetimeArray::new(data.to_buffer::<i64>(shared), m, Some(time_unit)),
)))
}
(ArrowType::Null, _) => Array::Null,
_ => unreachable!("Mismatched ArrowType and AAMaker variant"),
}
}
}
#[cfg(feature = "chunked")]
pub(crate) fn consolidate_array_arena(chunks: &[&Array], dtype: &ArrowType) -> Array {
assert!(!chunks.is_empty(), "consolidate called on empty chunk set");
let n_rows: usize = chunks.iter().map(|a| a.len()).sum();
let mask_bytes = (n_rows + 7) / 8;
let has_nulls = chunks.iter().any(|a| a.null_mask().is_some());
let first = chunks[0];
let mut total_bytes = 0usize;
match first {
Array::NumericArray(num) => {
let elem = match num {
NumericArray::Int32(_) => 4,
NumericArray::Int64(_) => 8,
NumericArray::UInt32(_) => 4,
NumericArray::UInt64(_) => 8,
NumericArray::Float32(_) => 4,
NumericArray::Float64(_) => 8,
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int8(_) => 1,
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int16(_) => 2,
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt8(_) => 1,
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt16(_) => 2,
NumericArray::Null => 0,
};
total_bytes += align64(n_rows * elem);
}
Array::TextArray(text) => match text {
TextArray::String32(_) => {
total_bytes += align64((n_rows + 1) * 4);
let data_bytes: usize = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::String32(s)) = a {
s.data.len()
} else {
0
}
})
.sum();
total_bytes += align64(data_bytes);
}
#[cfg(feature = "large_string")]
TextArray::String64(_) => {
total_bytes += align64((n_rows + 1) * 8);
let data_bytes: usize = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::String64(s)) = a {
s.data.len()
} else {
0
}
})
.sum();
total_bytes += align64(data_bytes);
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
TextArray::Categorical32(_) => {
total_bytes += align64(n_rows * 4);
}
#[cfg(feature = "default_categorical_8")]
TextArray::Categorical8(_) => {
total_bytes += align64(n_rows);
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical16(_) => {
total_bytes += align64(n_rows * 2);
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical64(_) => {
total_bytes += align64(n_rows * 8);
}
TextArray::Null => {}
},
Array::BooleanArray(_) => {
total_bytes += align64(mask_bytes);
}
#[cfg(feature = "datetime")]
Array::TemporalArray(temp) => {
let elem = match temp {
TemporalArray::Datetime32(_) => 4,
TemporalArray::Datetime64(_) => 8,
TemporalArray::Null => 0,
};
total_bytes += align64(n_rows * elem);
}
Array::Null => {}
}
if has_nulls {
total_bytes += align64(mask_bytes);
}
let mut arena = Arena::with_capacity(total_bytes);
let aa = match first {
Array::NumericArray(num) => {
macro_rules! write_numeric {
($variant:ident, $ty:ty) => {{
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::NumericArray(NumericArray::$variant(inner)) = a {
(inner.data.as_slice() as &[$ty], inner.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
arena.write_slices::<$ty>(&slices, n_rows, has_nulls)
}};
}
match num {
NumericArray::Int32(_) => write_numeric!(Int32, i32),
NumericArray::Int64(_) => write_numeric!(Int64, i64),
NumericArray::UInt32(_) => write_numeric!(UInt32, u32),
NumericArray::UInt64(_) => write_numeric!(UInt64, u64),
NumericArray::Float32(_) => write_numeric!(Float32, f32),
NumericArray::Float64(_) => write_numeric!(Float64, f64),
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int8(_) => write_numeric!(Int8, i8),
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int16(_) => write_numeric!(Int16, i16),
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt8(_) => write_numeric!(UInt8, u8),
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt16(_) => write_numeric!(UInt16, u16),
NumericArray::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
}
}
Array::TextArray(text) => match text {
TextArray::String32(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::String32(s)) = a {
(
s.offsets.as_slice() as &[u32],
s.data.as_slice() as &[u8],
s.null_mask.as_ref(),
)
} else {
unreachable!()
}
})
.collect();
let total_data: usize = slices.iter().map(|(_, d, _)| d.len()).sum();
arena.write_string_slices(&slices, n_rows, total_data, has_nulls)
}
#[cfg(feature = "large_string")]
TextArray::String64(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::String64(s)) = a {
(
s.offsets.as_slice() as &[u64],
s.data.as_slice() as &[u8],
s.null_mask.as_ref(),
)
} else {
unreachable!()
}
})
.collect();
let total_data: usize = slices.iter().map(|(_, d, _)| d.len()).sum();
arena.write_string_slices(&slices, n_rows, total_data, has_nulls)
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
TextArray::Categorical32(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::Categorical32(c)) = a {
(c.data.as_slice() as &[u32], c.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u32>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical32(c)) = first {
c.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
#[cfg(feature = "default_categorical_8")]
TextArray::Categorical8(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::Categorical8(c)) = a {
(c.data.as_slice() as &[u8], c.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u8>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical8(c)) = first {
c.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical16(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::Categorical16(c)) = a {
(c.data.as_slice() as &[u16], c.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u16>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical16(c)) = first {
c.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical64(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TextArray(TextArray::Categorical64(c)) = a {
(c.data.as_slice() as &[u64], c.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u64>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical64(c)) = first {
c.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
TextArray::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
},
Array::BooleanArray(_) => {
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::BooleanArray(b) = a {
(&b.data as &Bitmask, b.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
arena.write_boolean_slices(&slices, n_rows, has_nulls)
}
#[cfg(feature = "datetime")]
Array::TemporalArray(temp) => {
macro_rules! write_temporal {
($variant:ident, $ty:ty) => {{
let slices: Vec<_> = chunks
.iter()
.map(|a| {
if let Array::TemporalArray(TemporalArray::$variant(inner)) = a {
(inner.data.as_slice() as &[$ty], inner.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<$ty>(&slices, n_rows, has_nulls);
let tu = if let Array::TemporalArray(TemporalArray::$variant(inner)) = first {
inner.time_unit.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Temporal {
data,
mask,
time_unit: tu,
}
} else {
unreachable!()
}
}};
}
match temp {
TemporalArray::Datetime32(_) => write_temporal!(Datetime32, i32),
TemporalArray::Datetime64(_) => write_temporal!(Datetime64, i64),
TemporalArray::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
}
}
Array::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
};
let shared = arena.freeze();
aa.to_array(dtype, &shared, n_rows)
}
#[cfg(feature = "chunked")]
pub(crate) fn consolidate_tables_arena(
tables: &[&crate::structs::table::Table],
name: String,
) -> crate::structs::table::Table {
use crate::structs::table::Table;
assert!(!tables.is_empty(), "consolidate called on empty table set");
let n_cols = tables[0].cols.len();
let n_rows: usize = tables.iter().map(|t| t.n_rows).sum();
let mask_bytes = (n_rows + 7) / 8;
let schema: Vec<Arc<crate::Field>> = tables[0].cols.iter().map(|c| c.field.clone()).collect();
let mut total_bytes = 0usize;
for col_idx in 0..n_cols {
let first = &tables[0].cols[col_idx].array;
let has_nulls = tables
.iter()
.any(|t| t.cols[col_idx].array.null_mask().is_some());
match first {
Array::NumericArray(num) => {
let elem = match num {
NumericArray::Int32(_) => 4,
NumericArray::Int64(_) => 8,
NumericArray::UInt32(_) => 4,
NumericArray::UInt64(_) => 8,
NumericArray::Float32(_) => 4,
NumericArray::Float64(_) => 8,
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int8(_) => 1,
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int16(_) => 2,
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt8(_) => 1,
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt16(_) => 2,
NumericArray::Null => 0,
};
total_bytes += align64(n_rows * elem);
}
Array::TextArray(text) => match text {
TextArray::String32(_) => {
total_bytes += align64((n_rows + 1) * 4);
let data_bytes: usize = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::String32(a)) = &t.cols[col_idx].array
{
a.data.len()
} else {
0
}
})
.sum();
total_bytes += align64(data_bytes);
}
#[cfg(feature = "large_string")]
TextArray::String64(_) => {
total_bytes += align64((n_rows + 1) * 8);
let data_bytes: usize = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::String64(a)) = &t.cols[col_idx].array
{
a.data.len()
} else {
0
}
})
.sum();
total_bytes += align64(data_bytes);
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
TextArray::Categorical32(_) => {
total_bytes += align64(n_rows * 4);
}
#[cfg(feature = "default_categorical_8")]
TextArray::Categorical8(_) => {
total_bytes += align64(n_rows);
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical16(_) => {
total_bytes += align64(n_rows * 2);
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical64(_) => {
total_bytes += align64(n_rows * 8);
}
TextArray::Null => {}
},
Array::BooleanArray(_) => {
total_bytes += align64(mask_bytes);
}
#[cfg(feature = "datetime")]
Array::TemporalArray(temp) => {
let elem = match temp {
TemporalArray::Datetime32(_) => 4,
TemporalArray::Datetime64(_) => 8,
TemporalArray::Null => 0,
};
total_bytes += align64(n_rows * elem);
}
Array::Null => {}
}
if has_nulls {
total_bytes += align64(mask_bytes);
}
}
let mut arena = Arena::with_capacity(total_bytes);
let mut regions: Vec<AAMaker> = Vec::with_capacity(n_cols);
for col_idx in 0..n_cols {
let first = &tables[0].cols[col_idx].array;
let has_nulls = tables
.iter()
.any(|t| t.cols[col_idx].array.null_mask().is_some());
let aa = match first {
Array::NumericArray(num) => {
macro_rules! write_numeric {
($variant:ident, $ty:ty) => {{
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::NumericArray(NumericArray::$variant(a)) =
&t.cols[col_idx].array
{
(a.data.as_slice() as &[$ty], a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
arena.write_slices::<$ty>(&slices, n_rows, has_nulls)
}};
}
match num {
NumericArray::Int32(_) => write_numeric!(Int32, i32),
NumericArray::Int64(_) => write_numeric!(Int64, i64),
NumericArray::UInt32(_) => write_numeric!(UInt32, u32),
NumericArray::UInt64(_) => write_numeric!(UInt64, u64),
NumericArray::Float32(_) => write_numeric!(Float32, f32),
NumericArray::Float64(_) => write_numeric!(Float64, f64),
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int8(_) => write_numeric!(Int8, i8),
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int16(_) => write_numeric!(Int16, i16),
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt8(_) => write_numeric!(UInt8, u8),
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt16(_) => write_numeric!(UInt16, u16),
NumericArray::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
}
}
Array::TextArray(text) => match text {
TextArray::String32(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::String32(a)) = &t.cols[col_idx].array
{
(
a.offsets.as_slice() as &[u32],
a.data.as_slice() as &[u8],
a.null_mask.as_ref(),
)
} else {
unreachable!()
}
})
.collect();
let total_data: usize = slices.iter().map(|(_, d, _)| d.len()).sum();
arena.write_string_slices(&slices, n_rows, total_data, has_nulls)
}
#[cfg(feature = "large_string")]
TextArray::String64(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::String64(a)) = &t.cols[col_idx].array
{
(
a.offsets.as_slice() as &[u64],
a.data.as_slice() as &[u8],
a.null_mask.as_ref(),
)
} else {
unreachable!()
}
})
.collect();
let total_data: usize = slices.iter().map(|(_, d, _)| d.len()).sum();
arena.write_string_slices(&slices, n_rows, total_data, has_nulls)
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
TextArray::Categorical32(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::Categorical32(a)) =
&t.cols[col_idx].array
{
(a.data.as_slice() as &[u32], a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u32>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical32(a)) =
&tables[0].cols[col_idx].array
{
a.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
#[cfg(feature = "default_categorical_8")]
TextArray::Categorical8(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::Categorical8(a)) =
&t.cols[col_idx].array
{
(a.data.as_slice() as &[u8], a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u8>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical8(a)) =
&tables[0].cols[col_idx].array
{
a.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical16(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::Categorical16(a)) =
&t.cols[col_idx].array
{
(a.data.as_slice() as &[u16], a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u16>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical16(a)) =
&tables[0].cols[col_idx].array
{
a.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
#[cfg(feature = "extended_categorical")]
TextArray::Categorical64(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TextArray(TextArray::Categorical64(a)) =
&t.cols[col_idx].array
{
(a.data.as_slice() as &[u64], a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<u64>(&slices, n_rows, has_nulls);
let dict = if let Array::TextArray(TextArray::Categorical64(a)) =
&tables[0].cols[col_idx].array
{
a.unique_values.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Categorical {
indices: data,
mask,
unique_values: dict,
}
} else {
unreachable!()
}
}
TextArray::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
},
Array::BooleanArray(_) => {
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::BooleanArray(a) = &t.cols[col_idx].array {
(&a.data as &Bitmask, a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
arena.write_boolean_slices(&slices, n_rows, has_nulls)
}
#[cfg(feature = "datetime")]
Array::TemporalArray(temp) => {
macro_rules! write_temporal {
($variant:ident, $ty:ty) => {{
let slices: Vec<_> = tables
.iter()
.map(|t| {
if let Array::TemporalArray(TemporalArray::$variant(a)) =
&t.cols[col_idx].array
{
(a.data.as_slice() as &[$ty], a.null_mask.as_ref())
} else {
unreachable!()
}
})
.collect();
let aa = arena.write_slices::<$ty>(&slices, n_rows, has_nulls);
let tu = if let Array::TemporalArray(TemporalArray::$variant(a)) =
&tables[0].cols[col_idx].array
{
a.time_unit.clone()
} else {
unreachable!()
};
if let AAMaker::Primitive { data, mask } = aa {
AAMaker::Temporal {
data,
mask,
time_unit: tu,
}
} else {
unreachable!()
}
}};
}
match temp {
TemporalArray::Datetime32(_) => write_temporal!(Datetime32, i32),
TemporalArray::Datetime64(_) => write_temporal!(Datetime64, i64),
TemporalArray::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
}
}
Array::Null => AAMaker::Primitive {
data: ArenaRegion::EMPTY,
mask: None,
},
};
regions.push(aa);
}
Table::from_arena(name, &schema, arena, regions, n_rows)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{FloatArray, IntegerArray, MaskedArray, StringArray};
#[test]
fn test_basic_i64_roundtrip() {
let values: Vec<i64> = vec![10, 20, 30, 40, 50];
let mut arena = Arena::with_capacity(1024);
let region = arena.push_slice(&values);
let shared = arena.freeze();
let buffer: Buffer<i64> = region.to_buffer(&shared);
assert_eq!(buffer.as_slice(), &[10, 20, 30, 40, 50]);
}
#[test]
fn test_multiple_types_in_one_arena() {
let ints: Vec<i64> = vec![1, 2, 3];
let floats: Vec<f64> = vec![1.5, 2.5, 3.5];
let bytes: Vec<u8> = vec![0xFF, 0x00, 0xAB];
let mut arena = Arena::with_capacity(4096);
let r_ints = arena.push_slice(&ints);
let r_floats = arena.push_slice(&floats);
let r_bytes = arena.push_slice(&bytes);
let shared = arena.freeze();
let buf_ints: Buffer<i64> = r_ints.to_buffer(&shared);
let buf_floats: Buffer<f64> = r_floats.to_buffer(&shared);
let buf_bytes: Buffer<u8> = r_bytes.to_buffer(&shared);
assert_eq!(buf_ints.as_slice(), &[1i64, 2, 3]);
assert_eq!(buf_floats.as_slice(), &[1.5f64, 2.5, 3.5]);
assert_eq!(buf_bytes.as_slice(), &[0xFFu8, 0x00, 0xAB]);
}
#[test]
fn test_bitmask_roundtrip() {
let mask = Bitmask::new_set_all(10, true);
let mut arena = Arena::with_capacity(1024);
let region = arena.push_bitmask(&mask);
let shared = arena.freeze();
let recovered = region.to_bitmask(&shared, 10);
assert_eq!(recovered.len, 10);
for i in 0..10 {
assert!(recovered.get(i), "Bit {} should be set", i);
}
}
#[test]
fn test_bitmask_with_nulls() {
let mut mask = Bitmask::new_set_all(8, true);
mask.set(2, false);
mask.set(5, false);
let mut arena = Arena::with_capacity(1024);
let region = arena.push_bitmask(&mask);
let shared = arena.freeze();
let recovered = region.to_bitmask(&shared, 8);
assert!(recovered.get(0));
assert!(recovered.get(1));
assert!(!recovered.get(2));
assert!(recovered.get(3));
assert!(recovered.get(4));
assert!(!recovered.get(5));
assert!(recovered.get(6));
assert!(recovered.get(7));
}
#[test]
fn test_alignment() {
let a: Vec<u8> = vec![1, 2, 3]; let b: Vec<i64> = vec![100, 200];
let mut arena = Arena::with_capacity(4096);
let r_a = arena.push_slice(&a);
let r_b = arena.push_slice(&b);
assert_eq!(r_a.byte_offset() % 64, 0);
assert_eq!(r_b.byte_offset() % 64, 0);
assert_eq!(r_b.byte_offset(), 64);
let shared = arena.freeze();
let buf_a: Buffer<u8> = r_a.to_buffer(&shared);
let buf_b: Buffer<i64> = r_b.to_buffer(&shared);
assert_eq!(buf_a.as_slice(), &[1u8, 2, 3]);
assert_eq!(buf_b.as_slice(), &[100i64, 200]);
}
#[test]
fn test_reserve_and_write() {
let mut arena = Arena::with_capacity(1024);
let region = arena.reserve_slice::<i32>(4);
let slice = arena.region_as_mut_slice::<i32>(®ion);
slice[0] = 10;
slice[1] = 20;
slice[2] = 30;
slice[3] = 40;
let shared = arena.freeze();
let buffer: Buffer<i32> = region.to_buffer(&shared);
assert_eq!(buffer.as_slice(), &[10i32, 20, 30, 40]);
}
#[test]
#[should_panic(expected = "Arena overflow")]
fn test_capacity_overflow() {
let mut arena = Arena::with_capacity(16);
let data: Vec<i64> = vec![1, 2, 3]; arena.push_slice(&data);
}
#[test]
fn test_empty_arena_freeze() {
let arena = Arena::with_capacity(1024);
let shared = arena.freeze();
assert!(shared.is_empty());
}
#[test]
fn test_used_and_remaining() {
let mut arena = Arena::with_capacity(1024);
assert_eq!(arena.used(), 0);
assert_eq!(arena.remaining(), 1024);
arena.push_slice(&[1u8, 2, 3]);
assert_eq!(arena.used(), 3);
assert_eq!(arena.remaining(), 1024 - 3);
}
#[test]
fn test_full_table_construction() {
let ids: Vec<i64> = vec![1, 2, 3, 4, 5];
let prices: Vec<f64> = vec![10.5, 20.0, 15.75, 8.25, 99.99];
let mut null_mask = Bitmask::new_set_all(5, true);
null_mask.set(2, false);
let mut arena = Arena::with_capacity(4096);
let r_ids = arena.push_slice(&ids);
let r_prices = arena.push_slice(&prices);
let r_mask = arena.push_bitmask(&null_mask);
let shared = arena.freeze();
let id_buf: Buffer<i64> = r_ids.to_buffer(&shared);
let price_buf: Buffer<f64> = r_prices.to_buffer(&shared);
let mask = r_mask.to_bitmask(&shared, 5);
let id_arr = IntegerArray::new(id_buf, None);
let price_arr = FloatArray::new(price_buf, Some(mask));
assert_eq!(id_arr.len(), 5);
assert_eq!(id_arr.get(0), Some(1));
assert_eq!(id_arr.get(4), Some(5));
assert_eq!(price_arr.len(), 5);
assert_eq!(price_arr.get(0), Some(10.5));
assert_eq!(price_arr.get(2), None); assert_eq!(price_arr.get(4), Some(99.99));
}
#[test]
fn test_string_array_from_arena() {
let strings = ["hello", "world", "foo"];
let mut offsets: Vec<u32> = Vec::with_capacity(strings.len() + 1);
let mut data: Vec<u8> = Vec::new();
offsets.push(0);
for s in &strings {
data.extend_from_slice(s.as_bytes());
offsets.push(data.len() as u32);
}
let mut arena = Arena::with_capacity(4096);
let r_offsets = arena.push_slice(&offsets);
let r_data = arena.push_slice(&data);
let shared = arena.freeze();
let off_buf: Buffer<u32> = r_offsets.to_buffer(&shared);
let data_buf: Buffer<u8> = r_data.to_buffer(&shared);
let str_arr = StringArray::<u32>::new(data_buf, None, off_buf);
assert_eq!(str_arr.len(), 3);
assert_eq!(str_arr.get_str(0), Some("hello"));
assert_eq!(str_arr.get_str(1), Some("world"));
assert_eq!(str_arr.get_str(2), Some("foo"));
}
#[test]
fn test_shared_buffer_sharing() {
let a: Vec<i64> = vec![1, 2, 3];
let b: Vec<f64> = vec![4.0, 5.0];
let mut arena = Arena::with_capacity(4096);
let r_a = arena.push_slice(&a);
let r_b = arena.push_slice(&b);
let shared = arena.freeze();
let buf_a: Buffer<i64> = r_a.to_buffer(&shared);
let buf_b: Buffer<f64> = r_b.to_buffer(&shared);
assert!(buf_a.is_shared());
assert!(buf_b.is_shared());
}
#[test]
fn test_clone_is_cheap() {
let values: Vec<i64> = vec![1, 2, 3, 4, 5];
let mut arena = Arena::with_capacity(1024);
let region = arena.push_slice(&values);
let shared = arena.freeze();
let buffer: Buffer<i64> = region.to_buffer(&shared);
let cloned = buffer.clone();
assert!(cloned.is_shared());
assert_eq!(cloned.as_slice(), buffer.as_slice());
}
#[test]
fn test_cow_on_mutation() {
let values: Vec<i64> = vec![1, 2, 3];
let mut arena = Arena::with_capacity(1024);
let region = arena.push_slice(&values);
let shared = arena.freeze();
let mut buffer: Buffer<i64> = region.to_buffer(&shared);
assert!(buffer.is_shared());
buffer.push(4);
assert!(!buffer.is_shared());
assert_eq!(buffer.as_slice(), &[1, 2, 3, 4]);
}
#[test]
fn test_zero_length_slice() {
let empty: Vec<i64> = vec![];
let mut arena = Arena::with_capacity(1024);
let region = arena.push_slice(&empty);
assert_eq!(region.byte_len(), 0);
let shared = arena.freeze();
let buffer: Buffer<i64> = region.to_buffer(&shared);
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
}
#[test]
fn test_many_small_allocations() {
let mut arena = Arena::with_capacity(64 * 1024); let mut regions = Vec::new();
for i in 0..10 {
let data: Vec<i64> = (0..100).map(|x| x + i * 100).collect();
let mask = Bitmask::new_set_all(100, true);
regions.push((arena.push_slice(&data), arena.push_bitmask(&mask)));
}
let shared = arena.freeze();
for (i, (r_data, r_mask)) in regions.iter().enumerate() {
let buf: Buffer<i64> = r_data.to_buffer(&shared);
let mask = r_mask.to_bitmask(&shared, 100);
let arr = IntegerArray::new(buf, Some(mask));
assert_eq!(arr.len(), 100);
assert_eq!(arr.get(0), Some((i as i64) * 100));
}
}
}