use std::borrow::Cow;
use std::sync::Arc;
use arrow::array::{
Array, ArrayRef, BooleanBufferBuilder, FixedSizeListArray, ListArray, StructArray, UInt32Array,
UInt64Array,
};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{ArrowNativeType as _, Field};
use re_log::debug_assert_eq;
use crate::{Error, Transform};
#[derive(Clone)]
pub struct GetField {
field_name: String,
}
impl GetField {
pub fn new(field_name: impl Into<String>) -> Self {
Self {
field_name: field_name.into(),
}
}
}
impl Transform for GetField {
type Source = StructArray;
type Target = ArrayRef;
fn transform(&self, source: &StructArray) -> Result<Option<ArrayRef>, Error> {
let Some(field_array) = source.column_by_name(&self.field_name).cloned() else {
return Ok(None);
};
if let Some(struct_nulls) = source.nulls() {
let field_data = field_array.to_data();
let combined_nulls = if let Some(field_nulls) = field_data.nulls() {
let combined: Vec<bool> = (0..source.len())
.map(|i| struct_nulls.is_valid(i) && field_nulls.is_valid(i))
.collect();
NullBuffer::from(combined)
} else {
struct_nulls.clone()
};
let new_data = field_data
.into_builder()
.nulls(Some(combined_nulls))
.build()?;
Ok(Some(arrow::array::make_array(new_data)))
} else {
Ok(Some(field_array))
}
}
}
fn fixed_size_list_to_list(fixed: &FixedSizeListArray) -> Result<ListArray, Error> {
let (field, size, values, nulls) = fixed.clone().into_parts();
let len = i32::try_from(fixed.len()).map_err(|_err| Error::OffsetOverflow {
actual: fixed.len(),
expected_type: "i32",
})?;
let offsets: Vec<i32> = (0..=len).map(|i| i * size).collect();
Ok(ListArray::new(
field,
OffsetBuffer::new(ScalarBuffer::from(offsets)),
values,
nulls,
))
}
#[derive(Clone, Debug, Default)]
pub struct Flatten;
impl Flatten {
pub fn new() -> Self {
Self
}
}
impl Transform for Flatten {
type Source = ListArray;
type Target = ListArray;
fn transform(&self, source: &ListArray) -> Result<Option<ListArray>, Error> {
let values = source.values();
let inner_list: Cow<'_, ListArray> =
if let Some(list) = values.as_any().downcast_ref::<ListArray>() {
Cow::Borrowed(list)
} else if let Some(fixed) = values.as_any().downcast_ref::<FixedSizeListArray>() {
Cow::Owned(fixed_size_list_to_list(fixed)?)
} else {
return Err(Error::TypeMismatch {
expected: "List or FixedSizeList".to_owned(),
actual: values.data_type().clone(),
context: "Flatten expects List<List<T>> or List<FixedSizeList<T>>".to_owned(),
});
};
let outer_offsets = source.offsets();
let inner_offsets = inner_list.offsets();
let inner_values = inner_list.values();
let mut is_trivial = true;
for outer_row_idx in 0..source.len() {
if !source.is_null(outer_row_idx) {
let outer_start = outer_offsets[outer_row_idx] as usize;
let outer_end = outer_offsets[outer_row_idx + 1] as usize;
let count = outer_end - outer_start;
if count > 1 {
is_trivial = false;
break;
}
}
}
if is_trivial {
let mut new_offsets = Vec::with_capacity(source.len() + 1);
for outer_row_idx in 0..=source.len() {
let outer_idx = outer_offsets[outer_row_idx] as usize;
let inner_offset = inner_offsets[outer_idx];
new_offsets.push(inner_offset);
}
let field = Arc::new(Field::new_list_field(
inner_values.data_type().clone(),
true,
));
let offsets = arrow::buffer::OffsetBuffer::new(new_offsets.into());
return Ok(Some(ListArray::new(
field,
offsets,
inner_values.clone(),
source.nulls().cloned(),
)));
}
let mut new_offsets = Vec::with_capacity(source.len() + 1);
new_offsets.push(0i32);
let mut current_offset = 0i32;
let mut value_ranges: Vec<(i32, i32)> = Vec::new();
for outer_row_idx in 0..source.len() {
if source.is_null(outer_row_idx) {
new_offsets.push(current_offset);
continue;
}
let outer_start = outer_offsets[outer_row_idx];
let outer_end = outer_offsets[outer_row_idx + 1];
for inner_idx in outer_start..outer_end {
let inner_idx = inner_idx as usize;
if !inner_list.is_null(inner_idx) {
let inner_start = inner_offsets[inner_idx];
let inner_end = inner_offsets[inner_idx + 1];
let length = inner_end - inner_start;
if length > 0 {
if let Some((last_start, last_len)) = value_ranges.last_mut() {
if *last_start + *last_len == inner_start {
*last_len += length;
} else {
value_ranges.push((inner_start, length));
}
} else {
value_ranges.push((inner_start, length));
}
current_offset += length;
}
}
}
new_offsets.push(current_offset);
}
let flattened_values = if value_ranges.is_empty() {
inner_values.slice(0, 0)
} else if value_ranges.len() == 1 {
let (start, length) = value_ranges[0];
inner_values.slice(start as usize, length as usize)
} else {
let slices: Vec<_> = value_ranges
.iter()
.map(|&(start, length)| inner_values.slice(start as usize, length as usize))
.collect();
let refs: Vec<&dyn Array> = slices.iter().map(|a| a.as_ref()).collect();
re_arrow_util::concat_arrays(&refs)?
};
let field = Arc::new(Field::new_list_field(
inner_values.data_type().clone(),
true,
));
let offsets = arrow::buffer::OffsetBuffer::new(new_offsets.into());
Ok(Some(ListArray::new(
field,
offsets,
flattened_values,
source.nulls().cloned(),
)))
}
}
#[derive(Clone)]
pub struct StructToFixedList {
field_names: Vec<String>,
}
impl StructToFixedList {
pub fn new(field_names: impl IntoIterator<Item = impl Into<String>>) -> Self {
Self {
field_names: field_names.into_iter().map(|s| s.into()).collect(),
}
}
}
impl Transform for StructToFixedList {
type Source = StructArray;
type Target = FixedSizeListArray;
fn transform(&self, source: &StructArray) -> Result<Option<FixedSizeListArray>, Error> {
if self.field_names.is_empty() {
return Err(Error::NoFieldNames);
}
let first_field_name = &self.field_names[0];
let first_array = GetField::new(first_field_name)
.transform(source)?
.ok_or_else(|| Error::FieldNotFound {
field_name: first_field_name.clone(),
available_fields: source.fields().iter().map(|f| f.name().clone()).collect(),
})?;
let element_type = first_array.data_type().clone();
let mut field_arrays = Vec::new();
field_arrays.push(first_array);
for field_name in &self.field_names[1..] {
let array = GetField::new(field_name)
.transform(source)?
.ok_or_else(|| Error::FieldNotFound {
field_name: field_name.clone(),
available_fields: source.fields().iter().map(|f| f.name().clone()).collect(),
})?;
if array.data_type() != &element_type {
return Err(Error::InconsistentFieldTypes {
field_name: field_name.clone(),
actual_type: array.data_type().clone(),
reference_field: first_field_name.clone(),
expected_type: element_type.clone(),
});
}
field_arrays.push(array);
}
let mut concatenated_arrays = Vec::new();
for row_idx in 0..source.len() {
for field_array in &field_arrays {
concatenated_arrays.push(field_array.slice(row_idx, 1));
}
}
let refs: Vec<&dyn Array> = concatenated_arrays.iter().map(|a| a.as_ref()).collect();
let values = re_arrow_util::concat_arrays(&refs)?;
let field = Arc::new(Field::new_list_field(element_type, true));
let list_size = self.field_names.len();
let list_size = i32::try_from(list_size).map_err(|err| Error::InvalidNumberOfFields {
actual: list_size,
err,
})?;
Ok(Some(FixedSizeListArray::new(
field, list_size, values, None, )))
}
}
pub(crate) struct PromoteInnerNulls;
impl Transform for PromoteInnerNulls {
type Source = ListArray;
type Target = ListArray;
fn transform(&self, source: &ListArray) -> Result<Option<Self::Target>, Error> {
let (field, offsets, values, nulls) = source.clone().into_parts();
let inner_nulls = values.logical_nulls();
let mut null_buf = BooleanBufferBuilder::new(source.len());
for i in 0..source.len() {
let valid = if nulls.as_ref().is_some_and(|nulls| !nulls.is_valid(i)) {
false
} else {
let start = offsets[i].as_usize();
let end = offsets[i + 1].as_usize();
if start == end {
true
} else {
match &inner_nulls {
Some(nulls) => (start..end).any(|j| nulls.is_valid(j)),
None => true,
}
}
};
null_buf.append(valid);
}
Ok(Some(ListArray::new(
field,
offsets,
values,
Some(NullBuffer::from(null_buf.finish())),
)))
}
}
pub struct Explode;
impl Transform for Explode {
type Source = ListArray;
type Target = ListArray;
fn transform(&self, source: &Self::Source) -> Result<Option<Self::Target>, Error> {
let values_array = source.values();
let offsets = source.offsets();
let mut capacity = 0;
for i in 0..source.len() {
let start = offsets[i];
let end = offsets[i + 1];
if source.is_null(i) || start == end {
capacity += 1; } else {
capacity += (end - start) as usize; }
}
let mut indices = Vec::with_capacity(capacity);
let mut new_offsets = Vec::with_capacity(capacity + 1);
new_offsets.push(0i32);
let mut new_validity = Vec::with_capacity(capacity);
let mut current_offset = 0i32;
for i in 0..source.len() {
let start = offsets[i] as usize;
let end = offsets[i + 1] as usize;
if source.is_null(i) {
new_validity.push(false);
new_offsets.push(current_offset);
} else if start == end {
new_validity.push(true);
new_offsets.push(current_offset);
} else {
for j in start..end {
indices.push(j as u32);
current_offset += 1;
new_offsets.push(current_offset);
new_validity.push(values_array.is_valid(j));
}
}
}
debug_assert_eq!(
new_offsets.len(),
capacity + 1,
"new_offsets length mismatch: expected {}, got {}",
capacity + 1,
new_offsets.len()
);
debug_assert_eq!(
new_validity.len(),
capacity,
"new_validity length mismatch: expected {}, got {}",
capacity,
new_validity.len()
);
let values = if indices.is_empty() {
values_array.slice(0, 0)
} else {
let indices_array = UInt32Array::from(indices);
#[expect(clippy::disallowed_methods)]
arrow::compute::take(values_array.as_ref(), &indices_array, None)?
};
let field = Arc::new(Field::new_list_field(source.value_type(), true));
Ok(Some(ListArray::new(
field,
OffsetBuffer::new(new_offsets.into()),
values,
Some(NullBuffer::from(new_validity)),
)))
}
}
#[derive(Clone, Debug)]
pub struct RowMajorToColumnMajor {
output_rows: usize,
output_columns: usize,
permutation_per_list: Vec<usize>,
}
impl RowMajorToColumnMajor {
pub fn new(output_rows: usize, output_columns: usize) -> Self {
let mut permutation = Vec::with_capacity(output_rows * output_columns);
for column in 0..output_columns {
for row in 0..output_rows {
let row_major_pos = row * output_columns + column;
permutation.push(row_major_pos);
}
}
Self {
output_rows,
output_columns,
permutation_per_list: permutation,
}
}
}
impl Transform for RowMajorToColumnMajor {
type Source = FixedSizeListArray;
type Target = FixedSizeListArray;
fn transform(&self, source: &Self::Source) -> Result<Option<Self::Target>, Error> {
let expected_list_size = self.output_rows * self.output_columns;
let value_length = source.value_length() as usize;
if value_length != expected_list_size {
return Err(Error::UnexpectedListValueLength {
expected: expected_list_size,
actual: value_length,
});
}
let total_values = source.values().len();
let indices_to_take: UInt64Array = (0..total_values)
.map(|value_index| {
let list_index = value_index / expected_list_size;
let value_index_within_list = value_index % expected_list_size;
let next_index_to_take = list_index * expected_list_size
+ self.permutation_per_list[value_index_within_list];
next_index_to_take as u64
})
.collect();
#[expect(clippy::disallowed_methods)]
let reordered_values = arrow::compute::take(source.values(), &indices_to_take, None)?;
let field = Arc::new(Field::new_list_field(
source.value_type().clone(),
source.is_nullable(),
));
Ok(Some(FixedSizeListArray::new(
field,
source.value_length(),
reordered_values,
source.nulls().cloned(),
)))
}
}