use crate::builder::buffer_builder::{Int32BufferBuilder, Int8BufferBuilder};
use crate::builder::BufferBuilder;
use crate::{make_array, ArrowPrimitiveType, UnionArray};
use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, Field};
use std::any::Any;
use std::collections::BTreeMap;
use std::sync::Arc;
#[derive(Debug)]
struct FieldData {
type_id: i8,
data_type: DataType,
values_buffer: Box<dyn FieldDataValues>,
slots: usize,
null_buffer_builder: NullBufferBuilder,
}
trait FieldDataValues: std::fmt::Debug {
fn as_mut_any(&mut self) -> &mut dyn Any;
fn append_null(&mut self);
fn finish(&mut self) -> Buffer;
}
impl<T: ArrowNativeType> FieldDataValues for BufferBuilder<T> {
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
fn append_null(&mut self) {
self.advance(1)
}
fn finish(&mut self) -> Buffer {
self.finish()
}
}
impl FieldData {
fn new<T: ArrowPrimitiveType>(type_id: i8, data_type: DataType, capacity: usize) -> Self {
Self {
type_id,
data_type,
slots: 0,
values_buffer: Box::new(BufferBuilder::<T::Native>::new(capacity)),
null_buffer_builder: NullBufferBuilder::new(capacity),
}
}
fn append_value<T: ArrowPrimitiveType>(&mut self, v: T::Native) {
self.values_buffer
.as_mut_any()
.downcast_mut::<BufferBuilder<T::Native>>()
.expect("Tried to append unexpected type")
.append(v);
self.null_buffer_builder.append(true);
self.slots += 1;
}
fn append_null(&mut self) {
self.values_buffer.append_null();
self.null_buffer_builder.append(false);
self.slots += 1;
}
}
#[derive(Debug)]
pub struct UnionBuilder {
len: usize,
fields: BTreeMap<String, FieldData>,
type_id_builder: Int8BufferBuilder,
value_offset_builder: Option<Int32BufferBuilder>,
initial_capacity: usize,
}
impl UnionBuilder {
pub fn new_dense() -> Self {
Self::with_capacity_dense(1024)
}
pub fn new_sparse() -> Self {
Self::with_capacity_sparse(1024)
}
pub fn with_capacity_dense(capacity: usize) -> Self {
Self {
len: 0,
fields: Default::default(),
type_id_builder: Int8BufferBuilder::new(capacity),
value_offset_builder: Some(Int32BufferBuilder::new(capacity)),
initial_capacity: capacity,
}
}
pub fn with_capacity_sparse(capacity: usize) -> Self {
Self {
len: 0,
fields: Default::default(),
type_id_builder: Int8BufferBuilder::new(capacity),
value_offset_builder: None,
initial_capacity: capacity,
}
}
#[inline]
pub fn append_null<T: ArrowPrimitiveType>(
&mut self,
type_name: &str,
) -> Result<(), ArrowError> {
self.append_option::<T>(type_name, None)
}
#[inline]
pub fn append<T: ArrowPrimitiveType>(
&mut self,
type_name: &str,
v: T::Native,
) -> Result<(), ArrowError> {
self.append_option::<T>(type_name, Some(v))
}
fn append_option<T: ArrowPrimitiveType>(
&mut self,
type_name: &str,
v: Option<T::Native>,
) -> Result<(), ArrowError> {
let type_name = type_name.to_string();
let mut field_data = match self.fields.remove(&type_name) {
Some(data) => {
if data.data_type != T::DATA_TYPE {
return Err(ArrowError::InvalidArgumentError(format!(
"Attempt to write col \"{}\" with type {} doesn't match existing type {}",
type_name,
T::DATA_TYPE,
data.data_type
)));
}
data
}
None => match self.value_offset_builder {
Some(_) => FieldData::new::<T>(
self.fields.len() as i8,
T::DATA_TYPE,
self.initial_capacity,
),
None => {
let mut fd = FieldData::new::<T>(
self.fields.len() as i8,
T::DATA_TYPE,
self.len.max(self.initial_capacity),
);
for _ in 0..self.len {
fd.append_null();
}
fd
}
},
};
self.type_id_builder.append(field_data.type_id);
match &mut self.value_offset_builder {
Some(offset_builder) => {
offset_builder.append(field_data.slots as i32);
}
None => {
for (_, fd) in self.fields.iter_mut() {
fd.append_null();
}
}
}
match v {
Some(v) => field_data.append_value::<T>(v),
None => field_data.append_null(),
}
self.fields.insert(type_name, field_data);
self.len += 1;
Ok(())
}
pub fn build(self) -> Result<UnionArray, ArrowError> {
let mut children = Vec::with_capacity(self.fields.len());
let union_fields = self
.fields
.into_iter()
.map(
|(
name,
FieldData {
type_id,
data_type,
mut values_buffer,
slots,
mut null_buffer_builder,
},
)| {
let array_ref = make_array(unsafe {
ArrayDataBuilder::new(data_type.clone())
.add_buffer(values_buffer.finish())
.len(slots)
.nulls(null_buffer_builder.finish())
.build_unchecked()
});
children.push(array_ref);
(type_id, Arc::new(Field::new(name, data_type, false)))
},
)
.collect();
UnionArray::try_new(
union_fields,
self.type_id_builder.into(),
self.value_offset_builder.map(Into::into),
children,
)
}
}