use crate::builder::buffer_builder::{Int8BufferBuilder, Int32BufferBuilder};
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::{ArrayRef, ArrowPrimitiveType, UnionArray, make_array};
use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, Field};
use std::any::Any;
use std::collections::BTreeMap;
use std::sync::Arc;
#[derive(Debug)]
struct FieldData {
type_id: i8,
data_type: DataType,
values_buffer: Box<dyn FieldDataValues>,
slots: usize,
null_buffer_builder: NullBufferBuilder,
}
trait FieldDataValues: std::fmt::Debug + Send + Sync {
fn as_mut_any(&mut self) -> &mut dyn Any;
fn append_null(&mut self);
fn finish(&mut self) -> Buffer;
fn finish_cloned(&self) -> Buffer;
}
impl<T: ArrowNativeType> FieldDataValues for BufferBuilder<T> {
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
fn append_null(&mut self) {
self.advance(1)
}
fn finish(&mut self) -> Buffer {
self.finish()
}
fn finish_cloned(&self) -> Buffer {
Buffer::from_slice_ref(self.as_slice())
}
}
impl FieldData {
fn new<T: ArrowPrimitiveType>(type_id: i8, data_type: DataType, capacity: usize) -> Self {
Self {
type_id,
data_type,
slots: 0,
values_buffer: Box::new(BufferBuilder::<T::Native>::new(capacity)),
null_buffer_builder: NullBufferBuilder::new(capacity),
}
}
fn append_value<T: ArrowPrimitiveType>(&mut self, v: T::Native) {
self.values_buffer
.as_mut_any()
.downcast_mut::<BufferBuilder<T::Native>>()
.expect("Tried to append unexpected type")
.append(v);
self.null_buffer_builder.append(true);
self.slots += 1;
}
fn append_null(&mut self) {
self.values_buffer.append_null();
self.null_buffer_builder.append(false);
self.slots += 1;
}
}
#[derive(Debug, Default)]
pub struct UnionBuilder {
len: usize,
fields: BTreeMap<String, FieldData>,
type_id_builder: Int8BufferBuilder,
value_offset_builder: Option<Int32BufferBuilder>,
initial_capacity: usize,
}
impl UnionBuilder {
pub fn new_dense() -> Self {
Self::with_capacity_dense(1024)
}
pub fn new_sparse() -> Self {
Self::with_capacity_sparse(1024)
}
pub fn with_capacity_dense(capacity: usize) -> Self {
Self {
len: 0,
fields: Default::default(),
type_id_builder: Int8BufferBuilder::new(capacity),
value_offset_builder: Some(Int32BufferBuilder::new(capacity)),
initial_capacity: capacity,
}
}
pub fn with_capacity_sparse(capacity: usize) -> Self {
Self {
len: 0,
fields: Default::default(),
type_id_builder: Int8BufferBuilder::new(capacity),
value_offset_builder: None,
initial_capacity: capacity,
}
}
#[inline]
pub fn append_null<T: ArrowPrimitiveType>(
&mut self,
type_name: &str,
) -> Result<(), ArrowError> {
self.append_option::<T>(type_name, None)
}
#[inline]
pub fn append<T: ArrowPrimitiveType>(
&mut self,
type_name: &str,
v: T::Native,
) -> Result<(), ArrowError> {
self.append_option::<T>(type_name, Some(v))
}
fn append_option<T: ArrowPrimitiveType>(
&mut self,
type_name: &str,
v: Option<T::Native>,
) -> Result<(), ArrowError> {
let type_name = type_name.to_string();
let mut field_data = match self.fields.remove(&type_name) {
Some(data) => {
if data.data_type != T::DATA_TYPE {
return Err(ArrowError::InvalidArgumentError(format!(
"Attempt to write col \"{}\" with type {} doesn't match existing type {}",
type_name,
T::DATA_TYPE,
data.data_type
)));
}
data
}
None => match self.value_offset_builder {
Some(_) => FieldData::new::<T>(
self.fields.len() as i8,
T::DATA_TYPE,
self.initial_capacity,
),
None => {
let mut fd = FieldData::new::<T>(
self.fields.len() as i8,
T::DATA_TYPE,
self.len.max(self.initial_capacity),
);
for _ in 0..self.len {
fd.append_null();
}
fd
}
},
};
self.type_id_builder.append(field_data.type_id);
match &mut self.value_offset_builder {
Some(offset_builder) => {
offset_builder.append(field_data.slots as i32);
}
None => {
for (_, fd) in self.fields.iter_mut() {
fd.append_null();
}
}
}
match v {
Some(v) => field_data.append_value::<T>(v),
None => field_data.append_null(),
}
self.fields.insert(type_name, field_data);
self.len += 1;
Ok(())
}
pub fn build(self) -> Result<UnionArray, ArrowError> {
let mut children = Vec::with_capacity(self.fields.len());
let union_fields = self
.fields
.into_iter()
.map(
|(
name,
FieldData {
type_id,
data_type,
mut values_buffer,
slots,
mut null_buffer_builder,
},
)| {
let array_ref = make_array(unsafe {
ArrayDataBuilder::new(data_type.clone())
.add_buffer(values_buffer.finish())
.len(slots)
.nulls(null_buffer_builder.finish())
.build_unchecked()
});
children.push(array_ref);
(type_id, Arc::new(Field::new(name, data_type, false)))
},
)
.collect();
UnionArray::try_new(
union_fields,
self.type_id_builder.into(),
self.value_offset_builder.map(Into::into),
children,
)
}
fn build_cloned(&self) -> Result<UnionArray, ArrowError> {
let mut children = Vec::with_capacity(self.fields.len());
let union_fields: Vec<_> = self
.fields
.iter()
.map(|(name, field_data)| {
let FieldData {
type_id,
data_type,
values_buffer,
slots,
null_buffer_builder,
} = field_data;
let array_ref = make_array(unsafe {
ArrayDataBuilder::new(data_type.clone())
.add_buffer(values_buffer.finish_cloned())
.len(*slots)
.nulls(null_buffer_builder.finish_cloned())
.build_unchecked()
});
children.push(array_ref);
(
*type_id,
Arc::new(Field::new(name.clone(), data_type.clone(), false)),
)
})
.collect();
UnionArray::try_new(
union_fields.into_iter().collect(),
ScalarBuffer::from(self.type_id_builder.as_slice().to_vec()),
self.value_offset_builder
.as_ref()
.map(|builder| ScalarBuffer::from(builder.as_slice().to_vec())),
children,
)
}
}
impl ArrayBuilder for UnionBuilder {
fn len(&self) -> usize {
self.len
}
fn finish(&mut self) -> ArrayRef {
let builder = std::mem::take(self);
Arc::new(builder.build().unwrap())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.build_cloned().unwrap_or_else(|err| {
panic!("UnionBuilder::build_cloned failed unexpectedly: {}", err)
}))
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::array::Array;
use crate::cast::AsArray;
use crate::types::{Float64Type, Int32Type};
#[test]
fn test_union_builder_array_builder_trait() {
let mut builder = UnionBuilder::new_dense();
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Float64Type>("b", 3.0).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
assert_eq!(builder.len(), 3);
let array1 = builder.finish_cloned();
assert_eq!(array1.len(), 3);
let union1 = array1.as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union1.type_ids(), &[0, 1, 0]);
assert_eq!(union1.offsets().unwrap().as_ref(), &[0, 0, 1]);
let int_array1 = union1.child(0).as_primitive::<Int32Type>();
let float_array1 = union1.child(1).as_primitive::<Float64Type>();
assert_eq!(int_array1.value(0), 1);
assert_eq!(int_array1.value(1), 4);
assert_eq!(float_array1.value(0), 3.0);
builder.append::<Float64Type>("b", 5.0).unwrap();
assert_eq!(builder.len(), 4);
let array2 = builder.finish();
assert_eq!(array2.len(), 4);
let union2 = array2.as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union2.type_ids(), &[0, 1, 0, 1]);
assert_eq!(union2.offsets().unwrap().as_ref(), &[0, 0, 1, 1]);
let int_array2 = union2.child(0).as_primitive::<Int32Type>();
let float_array2 = union2.child(1).as_primitive::<Float64Type>();
assert_eq!(int_array2.value(0), 1);
assert_eq!(int_array2.value(1), 4);
assert_eq!(float_array2.value(0), 3.0);
assert_eq!(float_array2.value(1), 5.0);
}
#[test]
fn test_union_builder_type_erased() {
let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![Box::new(UnionBuilder::new_sparse())];
let union_builder = builders[0]
.as_any_mut()
.downcast_mut::<UnionBuilder>()
.unwrap();
union_builder.append::<Int32Type>("x", 10).unwrap();
union_builder.append::<Float64Type>("y", 20.0).unwrap();
assert_eq!(builders[0].len(), 2);
let result = builders
.into_iter()
.map(|mut b| b.finish())
.collect::<Vec<_>>();
assert_eq!(result[0].len(), 2);
let union = result[0].as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union.type_ids(), &[0, 1]);
assert!(union.offsets().is_none()); let int_array = union.child(0).as_primitive::<Int32Type>();
let float_array = union.child(1).as_primitive::<Float64Type>();
assert_eq!(int_array.value(0), 10);
assert!(int_array.is_null(1)); assert!(float_array.is_null(0)); assert_eq!(float_array.value(1), 20.0);
}
}