use std::fmt::{Display, Formatter};
use std::iter::FromIterator;
use std::sync::Arc;
use crate::enums::{error::MinarrowError, shape_dim::ShapeDim};
use crate::structs::chunked::super_array::RechunkStrategy;
use crate::structs::field::Field;
use crate::structs::field_array::FieldArray;
use crate::structs::table::Table;
#[cfg(feature = "size")]
use crate::traits::byte_size::ByteSize;
use crate::traits::concatenate::Concatenate;
use crate::traits::consolidate::Consolidate;
use crate::traits::shape::Shape;
#[cfg(feature = "views")]
use crate::{SuperTableV, TableV};
#[derive(Clone, Debug, PartialEq)]
pub struct SuperTable {
pub batches: Vec<Arc<Table>>,
pub schema: Vec<Arc<Field>>,
pub n_rows: usize,
pub name: String,
}
impl SuperTable {
pub fn new(name: String) -> Self {
Self {
batches: Vec::new(),
schema: Vec::new(),
n_rows: 0,
name,
}
}
pub fn from_batches(batches: Vec<Arc<Table>>, name_override: Option<String>) -> Self {
if batches.is_empty() {
return Self::new("Unnamed".into());
}
let name = name_override.unwrap_or_else(|| batches[0].name.clone());
let schema: Vec<Arc<Field>> = batches[0].cols.iter().map(|fa| fa.field.clone()).collect();
let n_cols = schema.len();
let mut total_rows = 0usize;
for (b_idx, batch) in batches.iter().enumerate() {
assert_eq!(
batch.n_cols(),
n_cols,
"Batch {b_idx} column-count mismatch"
);
for col_idx in 0..n_cols {
let field = &schema[col_idx];
let fa = &batch.cols[col_idx];
assert_eq!(
&fa.field, field,
"Batch {b_idx} col {col_idx} schema mismatch"
);
}
total_rows += batch.n_rows;
}
Self {
batches,
schema,
n_rows: total_rows,
name,
}
}
pub fn push(&mut self, batch: Arc<Table>) {
if self.batches.is_empty() {
self.schema = batch.cols.iter().map(|fa| fa.field.clone()).collect();
}
let n_cols = self.schema.len();
assert_eq!(batch.n_cols(), n_cols, "Pushed batch column-count mismatch");
for col_idx in 0..n_cols {
let field = &self.schema[col_idx];
let fa = &batch.cols[col_idx];
assert_eq!(
&fa.field, field,
"Pushed batch col {col_idx} schema mismatch"
);
}
self.n_rows += batch.n_rows;
self.batches.push(batch);
}
pub fn insert_rows(
&mut self,
index: usize,
other: impl Into<SuperTable>,
) -> Result<(), MinarrowError> {
let other = other.into();
if index > self.n_rows {
return Err(MinarrowError::IndexError(format!(
"Index {} out of bounds for SuperTable with {} rows",
index, self.n_rows
)));
}
if other.n_rows == 0 {
return Ok(());
}
if !self.batches.is_empty() {
if self.schema.len() != other.schema.len() {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column count mismatch: {} vs {}",
self.schema.len(),
other.schema.len()
)),
});
}
for (col_idx, (self_field, other_field)) in
self.schema.iter().zip(other.schema.iter()).enumerate()
{
if self_field.name != other_field.name {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column {} name mismatch: '{}' vs '{}'",
col_idx, self_field.name, other_field.name
)),
});
}
if self_field.dtype != other_field.dtype {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column '{}' type mismatch: {:?} vs {:?}",
self_field.name, self_field.dtype, other_field.dtype
)),
});
}
if self_field.nullable != other_field.nullable {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column '{}' nullable mismatch: {} vs {}",
self_field.name, self_field.nullable, other_field.nullable
)),
});
}
}
}
if self.batches.is_empty() {
self.batches = other.batches;
self.schema = other.schema;
self.n_rows = other.n_rows;
return Ok(());
}
let mut cumulative = 0;
let mut target_idx = 0;
let mut local_index = index;
for (idx, batch) in self.batches.iter().enumerate() {
let batch_rows = batch.n_rows;
if index <= cumulative + batch_rows {
target_idx = idx;
local_index = index - cumulative;
break;
}
cumulative += batch_rows;
}
let target_batch_rows = self.batches[target_idx].n_rows;
if local_index == 0 {
let mut new_batches = Vec::with_capacity(self.batches.len() + other.batches.len());
new_batches.extend(self.batches.drain(0..target_idx));
new_batches.extend(other.batches.into_iter());
new_batches.extend(self.batches.drain(..));
self.batches = new_batches;
self.n_rows += other.n_rows;
} else if local_index == target_batch_rows {
let mut new_batches = Vec::with_capacity(self.batches.len() + other.batches.len());
new_batches.extend(self.batches.drain(0..=target_idx));
new_batches.extend(other.batches.into_iter());
new_batches.extend(self.batches.drain(..));
self.batches = new_batches;
self.n_rows += other.n_rows;
} else {
let target_batch = self.batches.remove(target_idx);
let target_table = Arc::try_unwrap(target_batch).unwrap_or_else(|arc| (*arc).clone());
let mut split_batches = target_table.split(local_index)?;
let mut new_batches = Vec::with_capacity(self.batches.len() + other.batches.len() + 2);
new_batches.extend(self.batches.drain(0..target_idx));
new_batches.extend(split_batches.batches.drain(0..1));
new_batches.extend(other.batches.into_iter());
new_batches.extend(split_batches.batches.drain(..));
new_batches.extend(self.batches.drain(..));
self.batches = new_batches;
self.n_rows += other.n_rows;
}
Ok(())
}
#[inline]
pub fn n_cols(&self) -> usize {
self.schema.len()
}
#[inline]
pub fn cols(&self) -> Vec<Arc<Field>> {
self.batches[0]
.cols()
.iter()
.map(|x| x.field.clone())
.collect()
}
#[inline]
pub fn n_rows(&self) -> usize {
self.n_rows
}
#[inline]
pub fn n_batches(&self) -> usize {
self.batches.len()
}
#[inline]
pub fn len(&self) -> usize {
self.n_rows
}
#[inline]
pub fn is_empty(&self) -> bool {
self.n_rows == 0
}
#[inline]
pub fn schema(&self) -> &[Arc<Field>] {
&self.schema
}
#[inline]
pub fn batches(&self) -> &[Arc<Table>] {
&self.batches
}
#[inline]
pub fn batch(&self, idx: usize) -> Option<&Arc<Table>> {
self.batches.get(idx)
}
#[cfg(feature = "table_metadata")]
pub fn metadata(&self) -> &std::collections::BTreeMap<String, String> {
static EMPTY: std::sync::LazyLock<std::collections::BTreeMap<String, String>> =
std::sync::LazyLock::new(std::collections::BTreeMap::new);
self.batches.first().map(|b| b.metadata()).unwrap_or(&EMPTY)
}
#[cfg(feature = "views")]
pub fn view(&self, offset: usize, len: usize) -> SuperTableV {
assert!(offset + len <= self.n_rows, "slice out of bounds");
let mut slices = Vec::<TableV>::new();
let mut remaining = len;
let mut global_row = offset;
for batch in &self.batches {
if global_row >= batch.n_rows {
global_row -= batch.n_rows;
continue;
}
let take = (batch.n_rows - global_row).min(remaining);
slices.push(TableV::from_arc_table(batch.clone(), global_row, take));
global_row = 0;
remaining -= take;
if remaining == 0 {
break;
}
}
SuperTableV { slices, len }
}
#[cfg(feature = "views")]
pub fn from_views(slices: &[TableV], name: String) -> Self {
assert!(!slices.is_empty(), "from_slices: no slices provided");
let n_cols = slices[0].n_cols();
let mut batches = Vec::with_capacity(slices.len());
let mut total_rows = 0usize;
for slice in slices {
let table = slice.to_table();
assert_eq!(table.n_cols(), n_cols, "Batch column-count mismatch");
total_rows += table.n_rows;
batches.push(table.into());
}
let schema = slices[0].fields.iter().cloned().collect();
Self {
batches,
schema,
n_rows: total_rows,
name,
}
}
pub fn rechunk(&mut self, strategy: RechunkStrategy) -> Result<(), MinarrowError> {
if self.batches.is_empty() || self.n_rows == 0 {
return Ok(());
}
let chunk_size = match strategy {
RechunkStrategy::Count(size) => {
if size == 0 {
return Err(MinarrowError::IndexError(
"Count chunk size must be greater than 0".to_string(),
));
}
size
}
RechunkStrategy::Auto => 8192,
#[cfg(feature = "size")]
RechunkStrategy::Memory(bytes_per_chunk) => {
let total_bytes = self.est_bytes();
let total_rows = self.n_rows;
if total_bytes == 0 {
return Err(MinarrowError::IndexError(
"Cannot rechunk: table has 0 estimated bytes".to_string(),
));
}
((bytes_per_chunk * total_rows) / total_bytes).max(1)
}
};
if self.batches.len() == 1 && self.batches[0].n_rows == chunk_size {
return Ok(());
}
let mut new_batches = Vec::new();
let mut accumulator: Option<Table> = None;
for batch_arc in self.batches.drain(..) {
let batch = Arc::try_unwrap(batch_arc).unwrap_or_else(|arc| (*arc).clone());
let mut remaining = batch;
while remaining.n_rows > 0 {
if let Some(ref mut acc) = accumulator {
let acc_rows = acc.n_rows;
let needed = chunk_size - acc_rows;
if remaining.n_rows <= needed {
*acc = acc.clone().concat(remaining)?;
if acc.n_rows == chunk_size {
new_batches.push(Arc::new(accumulator.take().unwrap()));
}
break; } else {
let split_result = remaining.split(needed)?;
let mut parts = split_result.batches;
let to_add =
Arc::try_unwrap(parts.remove(0)).unwrap_or_else(|arc| (*arc).clone());
remaining =
Arc::try_unwrap(parts.remove(0)).unwrap_or_else(|arc| (*arc).clone());
*acc = acc.clone().concat(to_add)?;
new_batches.push(Arc::new(accumulator.take().unwrap()));
}
} else {
if remaining.n_rows == chunk_size {
new_batches.push(Arc::new(remaining));
break;
} else if remaining.n_rows > chunk_size {
let split_result = remaining.split(chunk_size)?;
let mut parts = split_result.batches;
new_batches.push(parts.remove(0));
remaining =
Arc::try_unwrap(parts.remove(0)).unwrap_or_else(|arc| (*arc).clone());
} else {
accumulator = Some(remaining);
break;
}
}
}
}
if let Some(final_batch) = accumulator {
new_batches.push(Arc::new(final_batch));
}
self.batches = new_batches;
Ok(())
}
pub fn rechunk_to(
&mut self,
up_to_row: usize,
strategy: RechunkStrategy,
) -> Result<(), MinarrowError> {
let total_rows = self.n_rows;
if up_to_row > total_rows {
return Err(MinarrowError::IndexError(format!(
"rechunk_to row {} out of bounds for table with {} rows",
up_to_row, total_rows
)));
}
if up_to_row == 0 || self.batches.is_empty() {
return Ok(());
}
if up_to_row == total_rows {
return self.rechunk(strategy);
}
let mut current_offset = 0;
let mut split_point = 0;
for (i, batch) in self.batches.iter().enumerate() {
let batch_end = current_offset + batch.n_rows;
if batch_end > up_to_row {
split_point = i;
break;
}
current_offset = batch_end;
}
let mut to_rechunk = self.batches.drain(..=split_point).collect::<Vec<_>>();
let keep_batches = self.batches.drain(..).collect::<Vec<_>>();
if current_offset < up_to_row {
let split_batch_arc = to_rechunk.pop().unwrap();
let split_batch = Arc::try_unwrap(split_batch_arc).unwrap_or_else(|arc| (*arc).clone());
let split_at = up_to_row - current_offset;
let split_result = split_batch.split(split_at)?;
let mut parts = split_result.batches;
to_rechunk.push(parts.remove(0));
self.batches.push(parts.remove(0));
}
self.batches.extend(keep_batches);
let mut temp = SuperTable::from_batches(to_rechunk.into(), Some(self.name.clone()));
temp.rechunk(strategy)?;
let mut result = temp.batches;
result.extend(self.batches.drain(..));
self.batches = result;
self.n_rows = self.batches.iter().map(|b| b.n_rows).sum();
Ok(())
}
}
impl Default for SuperTable {
fn default() -> Self {
Self::new("Unnamed".into())
}
}
impl FromIterator<Table> for SuperTable {
fn from_iter<T: IntoIterator<Item = Table>>(iter: T) -> Self {
let batches: Vec<Arc<Table>> = iter.into_iter().map(|x| x.into()).collect();
SuperTable::from_batches(batches, None)
}
}
impl Shape for SuperTable {
fn shape(&self) -> ShapeDim {
ShapeDim::Rank2 {
rows: self.n_rows(),
cols: self.n_cols(),
}
}
}
impl Consolidate for SuperTable {
type Output = Table;
fn consolidate(self) -> Table {
#[cfg(feature = "arena")]
{
self.consolidate_arena()
}
#[cfg(not(feature = "arena"))]
{
self.consolidate_concat()
}
}
}
impl SuperTable {
#[cfg_attr(feature = "arena", allow(dead_code))]
fn consolidate_concat(self) -> Table {
assert!(
!self.batches.is_empty(),
"consolidate() called on empty SuperTable"
);
let n_cols = self.schema.len();
let mut unified_cols = Vec::with_capacity(n_cols);
for col_idx in 0..n_cols {
let field = self.schema[col_idx].clone();
let mut arr = self.batches[0].cols[col_idx].array.clone();
for batch in self.batches.iter().skip(1) {
arr.concat_array(&batch.cols[col_idx].array);
}
let null_count = arr.null_count();
unified_cols.push(FieldArray {
field,
array: arr.clone(),
null_count,
});
}
#[allow(unused_mut)]
let mut table = Table::build(unified_cols, self.n_rows, self.name);
#[cfg(feature = "table_metadata")]
{
table.metadata = self.batches[0].metadata.clone();
}
table
}
#[cfg(feature = "arena")]
fn consolidate_arena(self) -> Table {
assert!(
!self.batches.is_empty(),
"consolidate() called on empty SuperTable"
);
if self.batches.len() == 1 {
let batch = self.batches.into_iter().next().unwrap();
let mut table = Arc::try_unwrap(batch).unwrap_or_else(|arc| (*arc).clone());
table.name = self.name;
return table;
}
let refs: Vec<&Table> = self.batches.iter().map(|b| b.as_ref()).collect();
crate::structs::arena::consolidate_tables_arena(&refs, self.name)
}
}
impl Concatenate for SuperTable {
fn concat(self, other: Self) -> Result<Self, MinarrowError> {
if self.batches.is_empty() && other.batches.is_empty() {
return Ok(SuperTable::new(format!("{}+{}", self.name, other.name)));
}
if self.batches.is_empty() {
let mut result = other;
result.name = format!("{}+{}", self.name, result.name);
return Ok(result);
}
if other.batches.is_empty() {
let mut result = self;
result.name = format!("{}+{}", result.name, other.name);
return Ok(result);
}
if self.schema.len() != other.schema.len() {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Cannot concatenate SuperTables with different column counts: {} vs {}",
self.schema.len(),
other.schema.len()
)),
});
}
for (col_idx, (self_field, other_field)) in
self.schema.iter().zip(other.schema.iter()).enumerate()
{
if self_field.name != other_field.name {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column {} name mismatch: '{}' vs '{}'",
col_idx, self_field.name, other_field.name
)),
});
}
if self_field.dtype != other_field.dtype {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column '{}' type mismatch: {:?} vs {:?}",
self_field.name, self_field.dtype, other_field.dtype
)),
});
}
if self_field.nullable != other_field.nullable {
return Err(MinarrowError::IncompatibleTypeError {
from: "SuperTable",
to: "SuperTable",
message: Some(format!(
"Column '{}' nullable mismatch: {} vs {}",
self_field.name, self_field.nullable, other_field.nullable
)),
});
}
}
let mut result_batches = self.batches;
result_batches.extend(other.batches);
let total_rows = self.n_rows + other.n_rows;
Ok(SuperTable {
batches: result_batches,
schema: self.schema,
n_rows: total_rows,
name: format!("{}+{}", self.name, other.name),
})
}
}
impl Display for SuperTable {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"SuperTable \"{}\" [{} rows, {} columns, {} batches]",
self.name,
self.n_rows,
self.schema.len(),
self.batches.len()
)?;
for (batch_idx, batch) in self.batches.iter().enumerate() {
writeln!(
f,
" ├─ Batch {batch_idx}: {} rows, {} columns",
batch.n_rows,
batch.n_cols()
)?;
for (col_idx, col) in batch.cols.iter().enumerate() {
let indent = " │ ";
writeln!(
f,
"{indent}Col {col_idx}: \"{}\" (dtype: {}, nulls: {})",
col.field.name, col.field.dtype, col.null_count
)?;
for line in format!("{}", col.array).lines() {
writeln!(f, "{indent} {line}")?;
}
}
}
Ok(())
}
}
#[cfg(feature = "views")]
impl From<SuperTableV> for SuperTable {
fn from(super_table_v: SuperTableV) -> Self {
if super_table_v.is_empty() {
return SuperTable::new("".to_string());
}
SuperTable::from_views(&super_table_v.slices, "SuperTable".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ffi::arrow_dtype::ArrowType;
use crate::{fa_bool, fa_cat32, fa_f64, fa_i32, fa_i64, fa_str32};
use crate::{Array, Field, FieldArray, MaskedArray, NumericArray, Table};
fn table(cols: Vec<FieldArray>) -> Table {
let n_rows = cols[0].len();
for c in &cols {
assert_eq!(c.len(), n_rows, "all columns must have same len for Table");
}
Table::build(cols, n_rows, "batch".to_string())
}
#[test]
fn test_empty_and_default() {
let t = SuperTable::default();
assert!(t.is_empty());
assert_eq!(t.n_cols(), 0);
assert_eq!(t.n_batches(), 0);
assert_eq!(t.len(), 0);
}
#[test]
fn test_from_batches_basic() {
let col1 = fa_i32!("a", 1, 2, 3);
let col2 = fa_i32!("b", 10, 11, 12);
let col3 = fa_i32!("a", 4, 5);
let col4 = fa_i32!("b", 13, 14);
let batch1 = Arc::new(table(vec![col1.clone(), col2.clone()]));
let batch2 = Arc::new(table(vec![col3.clone(), col4.clone()]));
let batches = vec![batch1, batch2].into();
let t = SuperTable::from_batches(batches, None);
assert_eq!(t.n_cols(), 2);
assert_eq!(t.n_batches(), 2);
assert_eq!(t.len(), 5);
assert_eq!(t.schema()[0].name, "a");
assert_eq!(t.schema()[1].name, "b");
assert_eq!(t.batches()[0].cols[0], col1);
assert_eq!(t.batches()[1].cols[1], col4);
}
#[test]
#[should_panic(expected = "column-count mismatch")]
fn test_from_batches_col_count_mismatch() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2)]));
let batch2 = Arc::new(table(vec![fa_i32!("a", 3, 4), fa_i32!("b", 5, 6)]));
SuperTable::from_batches(vec![batch1, batch2].into(), None);
}
#[test]
#[should_panic(expected = "schema mismatch")]
fn test_from_batches_schema_mismatch() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2)]));
let mut wrong = fa_i32!("z", 3, 4);
let mut mismatched_field = (*wrong.field).clone();
mismatched_field.dtype = ArrowType::Int32;
wrong.field = Arc::new(mismatched_field);
let batch2 = Arc::new(table(vec![wrong]));
SuperTable::from_batches(vec![batch1, batch2].into(), None);
}
#[test]
fn test_push_and_consolidate() {
let mut t = SuperTable::default();
t.push(Arc::new(table(vec![fa_i32!("x", 1, 2), fa_i32!("y", 3, 4)])));
t.push(Arc::new(table(vec![fa_i32!("x", 5), fa_i32!("y", 6)])));
assert_eq!(t.n_cols(), 2);
assert_eq!(t.n_batches(), 2);
assert_eq!(t.len(), 3);
let tab = t.consolidate();
assert_eq!(tab.name, "Unnamed");
assert_eq!(tab.n_rows, 3);
assert_eq!(tab.cols[0].field.name, "x");
assert_eq!(tab.cols[1].field.name, "y");
}
#[test]
#[should_panic(expected = "column-count mismatch")]
fn test_push_col_count_mismatch() {
let mut t = SuperTable::default();
t.push(Arc::new(table(vec![fa_i32!("a", 1, 2)])));
t.push(Arc::new(table(vec![fa_i32!("a", 3, 4), fa_i32!("b", 5, 6)])));
}
#[cfg(feature = "views")]
#[test]
fn test_slice_and_owned_table() {
let batch1 = Arc::new(table(vec![fa_i32!("q", 1, 2, 3), fa_i32!("w", 4, 5, 6)]));
let batch2 = Arc::new(table(vec![fa_i32!("q", 7, 8), fa_i32!("w", 9, 10)]));
let t = SuperTable::from_batches(vec![batch1, batch2].into(), None);
let slice = t.view(2, 3);
assert_eq!(slice.len, 3);
assert_eq!(slice.slices.len(), 2);
let owned = slice.consolidate();
assert_eq!(owned.name, "part");
assert_eq!(owned.n_rows, 3);
assert_eq!(owned.cols[0].field.name, "q");
assert_eq!(owned.cols[1].field.name, "w");
let arr = &owned.cols[0].array;
if let Array::NumericArray(NumericArray::Int32(ints)) = arr {
assert_eq!(ints.data.as_slice(), &[3, 7, 8]);
} else {
panic!("expected Int32 array");
}
let arr = &owned.cols[1].array;
if let Array::NumericArray(NumericArray::Int32(ints)) = arr {
assert_eq!(ints.data.as_slice(), &[6, 9, 10]);
} else {
panic!("expected Int32 array");
}
}
#[test]
fn test_schema_and_batch_access() {
let t = SuperTable::from_batches(vec![Arc::new(table(vec![fa_i32!("alpha", 1, 2)]))], None);
assert_eq!(t.n_cols(), 1);
assert_eq!(t.schema()[0].name, "alpha");
assert!(t.batch(0).is_some());
assert!(t.batch(5).is_none());
assert_eq!(t.batches().len(), 1);
}
#[cfg(feature = "views")]
#[test]
fn test_from_slices() {
let batch1 = Arc::new(table(vec![fa_i32!("x", 1, 2), fa_i32!("y", 3, 4)]));
let batch2 = Arc::new(table(vec![fa_i32!("x", 5, 6), fa_i32!("y", 7, 8)]));
let t = SuperTable::from_batches(vec![batch1.clone(), batch2.clone()], None);
let mut table_slices = Vec::new();
for i in 0..t.len() {
let bts = t.view(i, 1);
for ts in bts.slices.clone() {
table_slices.push(ts);
}
}
let rebuilt = SuperTable::from_views(&table_slices, "rebuilt".to_string());
assert_eq!(rebuilt.n_cols(), t.n_cols());
assert_eq!(rebuilt.len(), t.len());
for (left, right) in rebuilt.schema().iter().zip(t.schema()) {
assert_eq!(left.name, right.name);
assert_eq!(left.dtype, right.dtype);
}
let expected_x = [1, 2, 5, 6];
let expected_y = [3, 4, 7, 8];
let consolidated = rebuilt.consolidate();
for (col_idx, expected) in [expected_x.as_slice(), expected_y.as_slice()]
.iter()
.enumerate()
{
let arr = consolidated.cols[col_idx].array.clone();
if let Array::NumericArray(NumericArray::Int32(ints)) = arr {
assert_eq!(ints.data.as_slice(), *expected);
} else {
panic!("unexpected array type at col {col_idx}");
}
}
}
#[test]
fn test_insert_rows_into_first_batch() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2, 3), fa_i32!("b", 10, 20, 30)]));
let batch2 = Arc::new(table(vec![fa_i32!("a", 4, 5), fa_i32!("b", 40, 50)]));
let mut st = SuperTable::from_batches(vec![batch1, batch2], None);
let insert_batch = Arc::new(table(vec![fa_i32!("a", 99), fa_i32!("b", 88)]));
let insert_st = SuperTable::from_batches(vec![insert_batch], None);
st.insert_rows(1, insert_st).unwrap();
assert_eq!(st.n_rows(), 6);
assert_eq!(st.n_batches(), 4);
let materialised = st.consolidate();
match &materialised.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[1, 99, 2, 3, 4, 5]);
}
_ => panic!("wrong type"),
}
}
#[test]
fn test_insert_rows_into_second_batch() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2)]));
let batch2 = Arc::new(table(vec![fa_i32!("a", 3, 4, 5)]));
let mut st = SuperTable::from_batches(vec![batch1, batch2], None);
let insert_batch = Arc::new(table(vec![fa_i32!("a", 99, 88)]));
let insert_st = SuperTable::from_batches(vec![insert_batch], None);
st.insert_rows(3, insert_st).unwrap();
assert_eq!(st.n_rows(), 7);
let materialised = st.consolidate();
match &materialised.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[1, 2, 3, 99, 88, 4, 5]);
}
_ => panic!("wrong type"),
}
}
#[test]
fn test_insert_rows_prepend() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2, 3)]));
let mut st = SuperTable::from_batches(vec![batch1], None);
let insert_batch = Arc::new(table(vec![fa_i32!("a", 99)]));
let insert_st = SuperTable::from_batches(vec![insert_batch], None);
st.insert_rows(0, insert_st).unwrap();
assert_eq!(st.n_rows(), 4);
let materialised = st.consolidate();
match &materialised.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[99, 1, 2, 3]);
}
_ => panic!("wrong type"),
}
}
#[test]
fn test_insert_rows_append() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2)]));
let batch2 = Arc::new(table(vec![fa_i32!("a", 3, 4)]));
let mut st = SuperTable::from_batches(vec![batch1, batch2], None);
let insert_batch = Arc::new(table(vec![fa_i32!("a", 99)]));
let insert_st = SuperTable::from_batches(vec![insert_batch], None);
st.insert_rows(4, insert_st).unwrap();
assert_eq!(st.n_rows(), 5);
let materialised = st.consolidate();
match &materialised.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[1, 2, 3, 4, 99]);
}
_ => panic!("wrong type"),
}
}
#[test]
fn test_insert_rows_schema_mismatch() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2)]));
let mut st = SuperTable::from_batches(vec![batch1], None);
let wrong_batch = Arc::new(table(vec![fa_i32!("b", 99)]));
let wrong_st = SuperTable::from_batches(vec![wrong_batch], None);
let result = st.insert_rows(0, wrong_st);
assert!(result.is_err());
}
#[test]
fn test_insert_rows_out_of_bounds() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2)]));
let mut st = SuperTable::from_batches(vec![batch1], None);
let insert_batch = Arc::new(table(vec![fa_i32!("a", 99)]));
let insert_st = SuperTable::from_batches(vec![insert_batch], None);
let result = st.insert_rows(10, insert_st);
assert!(result.is_err());
}
#[test]
fn test_rechunk_uniform() {
let batch1 = Arc::new(table(vec![fa_i32!("x", 1, 2, 3), fa_i32!("y", 10, 20, 30)]));
let batch2 = Arc::new(table(vec![
fa_i32!("x", 4, 5, 6, 7),
fa_i32!("y", 40, 50, 60, 70),
]));
let batch3 = Arc::new(table(vec![fa_i32!("x", 8, 9), fa_i32!("y", 80, 90)]));
let mut st = SuperTable::from_batches(vec![batch1, batch2, batch3], None);
assert_eq!(st.n_rows(), 9);
assert_eq!(st.n_batches(), 3);
st.rechunk(RechunkStrategy::Count(4)).unwrap();
assert_eq!(st.n_batches(), 3);
assert_eq!(st.batch(0).unwrap().n_rows, 4);
assert_eq!(st.batch(1).unwrap().n_rows, 4);
assert_eq!(st.batch(2).unwrap().n_rows, 1);
assert_eq!(st.n_rows(), 9);
}
#[test]
fn test_rechunk_auto() {
let mut batches = Vec::new();
for i in 0..100 {
let vals: Vec<i32> = vec![i * 10, i * 10 + 1];
let arr = Array::from_int32(crate::IntegerArray::<i32>::from_slice(&vals));
let field = Field::new("col", ArrowType::Int32, false, None);
batches.push(Arc::new(table(vec![FieldArray::new(field, arr)])));
}
let mut st = SuperTable::from_batches(batches.into(), None);
assert_eq!(st.n_rows(), 200);
assert_eq!(st.n_batches(), 100);
st.rechunk(RechunkStrategy::Auto).unwrap();
assert_eq!(st.n_batches(), 1);
assert_eq!(st.batch(0).unwrap().n_rows, 200);
assert_eq!(st.n_rows(), 200);
}
#[test]
#[cfg(feature = "size")]
fn test_rechunk_by_memory() {
let batch1 = Arc::new(table(vec![fa_i32!("a", 1, 2, 3, 4), fa_i32!("b", 5, 6, 7, 8)]));
let batch2 = Arc::new(table(vec![
fa_i32!("a", 9, 10, 11, 12),
fa_i32!("b", 13, 14, 15, 16),
]));
let mut st = SuperTable::from_batches(vec![batch1, batch2], None);
assert_eq!(st.n_rows(), 8);
assert_eq!(st.n_batches(), 2);
st.rechunk(RechunkStrategy::Memory(64)).unwrap();
assert!(st.n_batches() >= 1);
assert_eq!(st.n_rows(), 8);
let materialized = st.consolidate();
assert_eq!(materialized.n_rows, 8);
}
#[test]
fn test_rechunk_uniform_zero_error() {
let batch1 = Arc::new(table(vec![fa_i32!("x", 1, 2, 3)]));
let mut st = SuperTable::from_batches(vec![batch1], None);
let result = st.rechunk(RechunkStrategy::Count(0));
assert!(result.is_err());
if let Err(MinarrowError::IndexError(msg)) = result {
assert!(msg.contains("Count chunk size must be greater than 0"));
} else {
panic!("Expected IndexError for zero chunk size");
}
}
#[test]
fn test_rechunk_empty_table() {
let mut st = SuperTable::default();
assert!(st.is_empty());
st.rechunk(RechunkStrategy::Auto).unwrap();
assert!(st.is_empty());
assert_eq!(st.n_batches(), 0);
st.rechunk(RechunkStrategy::Count(10)).unwrap();
assert!(st.is_empty());
assert_eq!(st.n_batches(), 0);
}
#[test]
fn test_rechunk_preserves_data_order() {
let batch1 = Arc::new(table(vec![fa_i32!("num", 1, 2, 3)]));
let batch2 = Arc::new(table(vec![fa_i32!("num", 4, 5, 6, 7)]));
let batch3 = Arc::new(table(vec![fa_i32!("num", 8, 9)]));
let mut st = SuperTable::from_batches(vec![batch1, batch2, batch3], None);
assert_eq!(st.n_rows(), 9);
st.rechunk(RechunkStrategy::Count(5)).unwrap();
let materialized = st.consolidate();
match &materialized.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
}
_ => panic!("Expected Int32 array"),
}
}
#[test]
fn test_consolidate_arena_integer_and_float() {
let b1 = Arc::new(table(vec![
fa_i32!("id", 1, 2, 3),
fa_f64!("val", 1.5, 2.5, 3.5),
]));
let b2 = Arc::new(table(vec![
fa_i32!("id", 4, 5),
fa_f64!("val", 4.5, 5.5),
]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[1, 2, 3, 4, 5]);
}
_ => panic!("Expected Int32"),
}
match &result.cols[1].array {
Array::NumericArray(NumericArray::Float64(arr)) => {
assert_eq!(arr.data.as_slice(), &[1.5, 2.5, 3.5, 4.5, 5.5]);
}
_ => panic!("Expected Float64"),
}
}
#[test]
fn test_consolidate_arena_string_columns() {
let b1 = Arc::new(table(vec![fa_str32!("name", "hello", "world")]));
let b2 = Arc::new(table(vec![fa_str32!("name", "foo", "bar", "baz")]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::TextArray(crate::TextArray::String32(arr)) => {
assert_eq!(
arr.offsets.len(),
6,
"offsets should have n_rows+1 elements"
);
assert_eq!(arr.len(), 5);
assert_eq!(arr.get_str(0), Some("hello"));
assert_eq!(arr.get_str(1), Some("world"));
assert_eq!(arr.get_str(2), Some("foo"));
assert_eq!(arr.get_str(3), Some("bar"));
assert_eq!(arr.get_str(4), Some("baz"));
}
_ => panic!("Expected String32"),
}
}
#[test]
fn test_consolidate_arena_nullable_columns() {
use crate::{Bitmask, Buffer, IntegerArray};
let fa_nullable = |name: &str, vals: &[i32], nulls: &[bool]| -> FieldArray {
let mask = Bitmask::from_bools(nulls);
let arr = Array::from_int32(IntegerArray::<i32>::new(
Buffer::from_slice(vals),
Some(mask),
));
FieldArray::new(
Field::new(name.to_string(), ArrowType::Int32, true, None),
arr,
)
};
let b1 = Arc::new(table(vec![fa_nullable(
"x",
&[10, 0, 30],
&[true, false, true],
)]));
let b2 = Arc::new(table(vec![fa_nullable("x", &[0, 50], &[false, true])]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.get(0), Some(10));
assert_eq!(arr.get(1), None); assert_eq!(arr.get(2), Some(30));
assert_eq!(arr.get(3), None); assert_eq!(arr.get(4), Some(50));
}
_ => panic!("Expected Int32"),
}
}
#[test]
fn test_consolidate_arena_boolean_columns() {
let b1 = Arc::new(table(vec![fa_bool!("flag", true, false, true)]));
let b2 = Arc::new(table(vec![fa_bool!("flag", false, true)]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::BooleanArray(arr) => {
assert_eq!(arr.data.get(0), true);
assert_eq!(arr.data.get(1), false);
assert_eq!(arr.data.get(2), true);
assert_eq!(arr.data.get(3), false);
assert_eq!(arr.data.get(4), true);
}
_ => panic!("Expected BooleanArray"),
}
}
#[test]
fn test_consolidate_arena_mixed_types() {
let b1 = Arc::new(table(vec![
fa_i64!("id", 1, 2),
fa_f64!("score", 9.5, 8.0),
fa_str32!("name", "alice", "bob"),
fa_bool!("active", true, false),
]));
let b2 = Arc::new(table(vec![
fa_i64!("id", 3),
fa_f64!("score", 7.0),
fa_str32!("name", "charlie"),
fa_bool!("active", true),
]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 3);
assert_eq!(result.cols.len(), 4);
match &result.cols[0].array {
Array::NumericArray(NumericArray::Int64(arr)) => {
assert_eq!(arr.data.as_slice(), &[1i64, 2, 3]);
}
_ => panic!("Expected Int64"),
}
match &result.cols[1].array {
Array::NumericArray(NumericArray::Float64(arr)) => {
assert_eq!(arr.data.as_slice(), &[9.5, 8.0, 7.0]);
}
_ => panic!("Expected Float64"),
}
match &result.cols[2].array {
Array::TextArray(crate::TextArray::String32(arr)) => {
assert_eq!(arr.get_str(0), Some("alice"));
assert_eq!(arr.get_str(1), Some("bob"));
assert_eq!(arr.get_str(2), Some("charlie"));
}
_ => panic!("Expected String32"),
}
match &result.cols[3].array {
Array::BooleanArray(arr) => {
assert_eq!(arr.data.get(0), true);
assert_eq!(arr.data.get(1), false);
assert_eq!(arr.data.get(2), true);
}
_ => panic!("Expected BooleanArray"),
}
}
#[test]
#[cfg(feature = "arena")]
fn test_consolidate_arena_shared_buffers() {
let b1 = Arc::new(table(vec![fa_i32!("x", 1, 2, 3)]));
let b2 = Arc::new(table(vec![fa_i32!("x", 4, 5)]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
match &result.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert!(arr.data.is_shared(), "Data buffer should be shared");
}
_ => panic!("Expected Int32"),
}
}
#[test]
fn test_consolidate_arena_single_batch() {
let batch = Arc::new(table(vec![fa_i32!("x", 10, 20, 30)]));
let st = SuperTable::from_batches(vec![batch], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 3);
match &result.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[10, 20, 30]);
}
_ => panic!("Expected Int32"),
}
}
#[test]
#[cfg(feature = "arena")]
fn test_consolidate_arena_equivalence_with_consolidate() {
let make_st = || {
let b1 = Arc::new(table(vec![
fa_i32!("id", 1, 2, 3),
fa_f64!("val", 1.5, 2.5, 3.5),
fa_str32!("label", "a", "bb", "ccc"),
]));
let b2 = Arc::new(table(vec![
fa_i32!("id", 4, 5),
fa_f64!("val", 4.5, 5.5),
fa_str32!("label", "dddd", "e"),
]));
SuperTable::from_batches(vec![b1, b2], None)
};
let result_concat = make_st().consolidate_concat();
let result_arena = make_st().consolidate_arena();
assert_eq!(result_concat.n_rows, result_arena.n_rows);
assert_eq!(result_concat.cols.len(), result_arena.cols.len());
match (&result_concat.cols[0].array, &result_arena.cols[0].array) {
(
Array::NumericArray(NumericArray::Int32(a)),
Array::NumericArray(NumericArray::Int32(b)),
) => {
assert_eq!(a.data.as_slice(), b.data.as_slice());
}
_ => panic!("Mismatched types for column 0"),
}
match (&result_concat.cols[1].array, &result_arena.cols[1].array) {
(
Array::NumericArray(NumericArray::Float64(a)),
Array::NumericArray(NumericArray::Float64(b)),
) => {
assert_eq!(a.data.as_slice(), b.data.as_slice());
}
_ => panic!("Mismatched types for column 1"),
}
match (&result_concat.cols[2].array, &result_arena.cols[2].array) {
(
Array::TextArray(crate::TextArray::String32(a)),
Array::TextArray(crate::TextArray::String32(b)),
) => {
for i in 0..5 {
assert_eq!(a.get_str(i), b.get_str(i), "String mismatch at index {i}");
}
}
_ => panic!("Mismatched types for column 2"),
}
}
#[cfg(any(not(feature = "default_categorical_8"), feature = "extended_categorical"))]
#[test]
fn test_consolidate_arena_categorical() {
let b1 = Arc::new(table(vec![fa_cat32!("cat", "a", "b", "a")]));
let b2 = Arc::new(table(vec![fa_cat32!("cat", "a", "b")]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::TextArray(crate::TextArray::Categorical32(arr)) => {
assert_eq!(arr.get_str(0), Some("a"));
assert_eq!(arr.get_str(1), Some("b"));
assert_eq!(arr.get_str(2), Some("a"));
assert_eq!(arr.get_str(3), Some("a"));
assert_eq!(arr.get_str(4), Some("b"));
}
_ => panic!("Expected Categorical32"),
}
}
#[test]
fn test_consolidate_arena_nullable_strings() {
use crate::{Bitmask, StringArray};
let fa_nullable_str = |name: &str, vals: &[&str], nulls: &[bool]| -> FieldArray {
let mask = Bitmask::from_bools(nulls);
let arr = Array::from_string32(StringArray::<u32>::from_vec(vals.to_vec(), Some(mask)));
FieldArray::new(
Field::new(name.to_string(), ArrowType::String, true, None),
arr,
)
};
let b1 = Arc::new(table(vec![fa_nullable_str(
"s",
&["hello", "", "world"],
&[true, false, true],
)]));
let b2 = Arc::new(table(vec![fa_nullable_str(
"s",
&["", "bar"],
&[false, true],
)]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::TextArray(crate::TextArray::String32(arr)) => {
assert!(!arr.is_null(0));
assert!(arr.is_null(1));
assert!(!arr.is_null(2));
assert!(arr.is_null(3));
assert!(!arr.is_null(4));
assert_eq!(arr.get_str(0), Some("hello"));
assert_eq!(arr.get_str(2), Some("world"));
assert_eq!(arr.get_str(4), Some("bar"));
}
_ => panic!("Expected String32"),
}
}
#[test]
fn test_consolidate_arena_three_batches() {
let b1 = Arc::new(table(vec![fa_i32!("x", 1, 2)]));
let b2 = Arc::new(table(vec![fa_i32!("x", 3)]));
let b3 = Arc::new(table(vec![fa_i32!("x", 4, 5, 6)]));
let st = SuperTable::from_batches(vec![b1, b2, b3], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 6);
match &result.cols[0].array {
Array::NumericArray(NumericArray::Int32(arr)) => {
assert_eq!(arr.data.as_slice(), &[1, 2, 3, 4, 5, 6]);
}
_ => panic!("Expected Int32"),
}
}
#[test]
fn test_consolidate_arena_preserves_name() {
let b1 = Arc::new(table(vec![fa_i32!("x", 1, 2)]));
let b2 = Arc::new(table(vec![fa_i32!("x", 3)]));
let mut st = SuperTable::from_batches(vec![b1, b2], None);
st.name = "my_table".to_string();
let result = st.consolidate();
assert_eq!(result.name, "my_table");
}
#[cfg(feature = "datetime")]
#[test]
fn test_consolidate_arena_datetime() {
use crate::enums::collections::temporal_array::TemporalArray;
use crate::{DatetimeArray, TimeUnit};
let fa_dt = |name: &str, vals: &[i64]| -> FieldArray {
let arr =
Array::TemporalArray(TemporalArray::Datetime64(Arc::new(DatetimeArray::new(
crate::Buffer::from_vec64(vals.into()),
None,
Some(TimeUnit::Milliseconds),
))));
FieldArray::new(
Field::new(
name.to_string(),
ArrowType::Timestamp(TimeUnit::Milliseconds, None),
false,
None,
),
arr,
)
};
let b1 = Arc::new(table(vec![fa_dt("ts", &[1000, 2000, 3000])]));
let b2 = Arc::new(table(vec![fa_dt("ts", &[4000, 5000])]));
let st = SuperTable::from_batches(vec![b1, b2], None);
let result = st.consolidate();
assert_eq!(result.n_rows, 5);
match &result.cols[0].array {
Array::TemporalArray(TemporalArray::Datetime64(arr)) => {
assert_eq!(arr.data.as_slice(), &[1000i64, 2000, 3000, 4000, 5000]);
assert_eq!(arr.time_unit, TimeUnit::Milliseconds);
}
_ => panic!("Expected Datetime64"),
}
}
}