use crate::builder::{ArrayBuilder, BufferBuilder, UInt8BufferBuilder};
use crate::types::{ByteArrayType, GenericBinaryType, GenericStringType};
use crate::{Array, ArrayRef, GenericByteArray, OffsetSizeTrait};
use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
use std::any::Any;
use std::sync::Arc;
pub struct GenericByteBuilder<T: ByteArrayType> {
value_builder: UInt8BufferBuilder,
offsets_builder: BufferBuilder<T::Offset>,
null_buffer_builder: NullBufferBuilder,
}
impl<T: ByteArrayType> GenericByteBuilder<T> {
pub fn new() -> Self {
Self::with_capacity(1024, 1024)
}
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
let mut offsets_builder = BufferBuilder::<T::Offset>::new(item_capacity + 1);
offsets_builder.append(T::Offset::from_usize(0).unwrap());
Self {
value_builder: UInt8BufferBuilder::new(data_capacity),
offsets_builder,
null_buffer_builder: NullBufferBuilder::new(item_capacity),
}
}
pub unsafe fn new_from_buffer(
offsets_buffer: MutableBuffer,
value_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let offsets_builder = BufferBuilder::<T::Offset>::new_from_buffer(offsets_buffer);
let value_builder = BufferBuilder::<u8>::new_from_buffer(value_buffer);
let null_buffer_builder = null_buffer
.map(|buffer| NullBufferBuilder::new_from_buffer(buffer, offsets_builder.len() - 1))
.unwrap_or_else(|| NullBufferBuilder::new_with_len(offsets_builder.len() - 1));
Self {
offsets_builder,
value_builder,
null_buffer_builder,
}
}
#[inline]
fn next_offset(&self) -> T::Offset {
T::Offset::from_usize(self.value_builder.len()).expect("byte array offset overflow")
}
#[inline]
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
self.value_builder.append_slice(value.as_ref().as_ref());
self.null_buffer_builder.append(true);
self.offsets_builder.append(self.next_offset());
}
#[inline]
pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
match value {
None => self.append_null(),
Some(v) => self.append_value(v),
};
}
#[inline]
pub fn append_null(&mut self) {
self.null_buffer_builder.append(false);
self.offsets_builder.append(self.next_offset());
}
#[inline]
pub fn append_nulls(&mut self, n: usize) {
self.null_buffer_builder.append_n_nulls(n);
let next_offset = self.next_offset();
self.offsets_builder.append_n(n, next_offset);
}
#[inline]
pub fn append_array(&mut self, array: &GenericByteArray<T>) {
if array.len() == 0 {
return;
}
let offsets = array.offsets();
if self.next_offset() == offsets[0] {
self.offsets_builder.append_slice(&offsets[1..]);
} else {
let shift: T::Offset = self.next_offset() - offsets[0];
let mut intermediate = Vec::with_capacity(offsets.len() - 1);
for &offset in &offsets[1..] {
intermediate.push(offset + shift)
}
self.offsets_builder.append_slice(&intermediate);
}
self.value_builder.append_slice(
&array.values().as_slice()[offsets[0].as_usize()..offsets[array.len()].as_usize()],
);
if let Some(null_buffer) = array.nulls() {
self.null_buffer_builder.append_buffer(null_buffer);
} else {
self.null_buffer_builder.append_n_non_nulls(array.len());
}
}
pub fn finish(&mut self) -> GenericByteArray<T> {
let array_type = T::DATA_TYPE;
let array_builder = ArrayDataBuilder::new(array_type)
.len(self.len())
.add_buffer(self.offsets_builder.finish())
.add_buffer(self.value_builder.finish())
.nulls(self.null_buffer_builder.finish());
self.offsets_builder.append(self.next_offset());
let array_data = unsafe { array_builder.build_unchecked() };
GenericByteArray::from(array_data)
}
pub fn finish_cloned(&self) -> GenericByteArray<T> {
let array_type = T::DATA_TYPE;
let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice());
let value_buffer = Buffer::from_slice_ref(self.value_builder.as_slice());
let array_builder = ArrayDataBuilder::new(array_type)
.len(self.len())
.add_buffer(offset_buffer)
.add_buffer(value_buffer)
.nulls(self.null_buffer_builder.finish_cloned());
let array_data = unsafe { array_builder.build_unchecked() };
GenericByteArray::from(array_data)
}
pub fn values_slice(&self) -> &[u8] {
self.value_builder.as_slice()
}
pub fn offsets_slice(&self) -> &[T::Offset] {
self.offsets_builder.as_slice()
}
pub fn validity_slice(&self) -> Option<&[u8]> {
self.null_buffer_builder.as_slice()
}
pub fn validity_slice_mut(&mut self) -> Option<&mut [u8]> {
self.null_buffer_builder.as_slice_mut()
}
}
impl<T: ByteArrayType> std::fmt::Debug for GenericByteBuilder<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}Builder", T::Offset::PREFIX, T::PREFIX)?;
f.debug_struct("")
.field("value_builder", &self.value_builder)
.field("offsets_builder", &self.offsets_builder)
.field("null_buffer_builder", &self.null_buffer_builder)
.finish()
}
}
impl<T: ByteArrayType> Default for GenericByteBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: ByteArrayType> ArrayBuilder for GenericByteBuilder<T> {
fn len(&self) -> usize {
self.null_buffer_builder.len()
}
fn finish(&mut self) -> ArrayRef {
Arc::new(self.finish())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.finish_cloned())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}
impl<T: ByteArrayType, V: AsRef<T::Native>> Extend<Option<V>> for GenericByteBuilder<T> {
#[inline]
fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
for v in iter {
self.append_option(v)
}
}
}
pub type GenericStringBuilder<O> = GenericByteBuilder<GenericStringType<O>>;
impl<O: OffsetSizeTrait> std::fmt::Write for GenericStringBuilder<O> {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
self.value_builder.append_slice(s.as_bytes());
Ok(())
}
}
pub type GenericBinaryBuilder<O> = GenericByteBuilder<GenericBinaryType<O>>;
impl<O: OffsetSizeTrait> std::io::Write for GenericBinaryBuilder<O> {
fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
self.value_builder.append_slice(bs);
Ok(bs.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::array::Array;
use crate::GenericStringArray;
use arrow_buffer::NullBuffer;
use std::fmt::Write as _;
use std::io::Write as _;
fn _test_generic_binary_builder<O: OffsetSizeTrait>() {
let mut builder = GenericBinaryBuilder::<O>::new();
builder.append_value(b"hello");
builder.append_value(b"");
builder.append_null();
builder.append_value(b"rust");
let array = builder.finish();
assert_eq!(4, array.len());
assert_eq!(1, array.null_count());
assert_eq!(b"hello", array.value(0));
assert_eq!([] as [u8; 0], array.value(1));
assert!(array.is_null(2));
assert_eq!(b"rust", array.value(3));
assert_eq!(O::from_usize(5).unwrap(), array.value_offsets()[2]);
assert_eq!(O::from_usize(4).unwrap(), array.value_length(3));
}
#[test]
fn test_binary_builder() {
_test_generic_binary_builder::<i32>()
}
#[test]
fn test_large_binary_builder() {
_test_generic_binary_builder::<i64>()
}
fn _test_generic_binary_builder_all_nulls<O: OffsetSizeTrait>() {
let mut builder = GenericBinaryBuilder::<O>::new();
builder.append_null();
builder.append_null();
builder.append_null();
builder.append_nulls(2);
assert_eq!(5, builder.len());
assert!(!builder.is_empty());
let array = builder.finish();
assert_eq!(5, array.null_count());
assert_eq!(5, array.len());
assert!(array.is_null(0));
assert!(array.is_null(1));
assert!(array.is_null(2));
assert!(array.is_null(3));
assert!(array.is_null(4));
}
#[test]
fn test_binary_builder_all_nulls() {
_test_generic_binary_builder_all_nulls::<i32>()
}
#[test]
fn test_large_binary_builder_all_nulls() {
_test_generic_binary_builder_all_nulls::<i64>()
}
fn _test_generic_binary_builder_reset<O: OffsetSizeTrait>() {
let mut builder = GenericBinaryBuilder::<O>::new();
builder.append_value(b"hello");
builder.append_value(b"");
builder.append_null();
builder.append_value(b"rust");
builder.finish();
assert!(builder.is_empty());
builder.append_value(b"parquet");
builder.append_null();
builder.append_value(b"arrow");
builder.append_value(b"");
builder.append_nulls(2);
builder.append_value(b"hi");
let array = builder.finish();
assert_eq!(7, array.len());
assert_eq!(3, array.null_count());
assert_eq!(b"parquet", array.value(0));
assert!(array.is_null(1));
assert!(array.is_null(4));
assert!(array.is_null(5));
assert_eq!(b"arrow", array.value(2));
assert_eq!(b"", array.value(1));
assert_eq!(b"hi", array.value(6));
assert_eq!(O::zero(), array.value_offsets()[0]);
assert_eq!(O::from_usize(7).unwrap(), array.value_offsets()[2]);
assert_eq!(O::from_usize(14).unwrap(), array.value_offsets()[7]);
assert_eq!(O::from_usize(5).unwrap(), array.value_length(2));
}
#[test]
fn test_binary_builder_reset() {
_test_generic_binary_builder_reset::<i32>()
}
#[test]
fn test_large_binary_builder_reset() {
_test_generic_binary_builder_reset::<i64>()
}
fn _test_generic_string_array_builder<O: OffsetSizeTrait>() {
let mut builder = GenericStringBuilder::<O>::new();
let owned = "arrow".to_owned();
builder.append_value("hello");
builder.append_value("");
builder.append_value(&owned);
builder.append_null();
builder.append_option(Some("rust"));
builder.append_option(None::<&str>);
builder.append_option(None::<String>);
builder.append_nulls(2);
builder.append_value("parquet");
assert_eq!(10, builder.len());
assert_eq!(
GenericStringArray::<O>::from(vec![
Some("hello"),
Some(""),
Some("arrow"),
None,
Some("rust"),
None,
None,
None,
None,
Some("parquet")
]),
builder.finish()
);
}
#[test]
fn test_string_array_builder() {
_test_generic_string_array_builder::<i32>()
}
#[test]
fn test_large_string_array_builder() {
_test_generic_string_array_builder::<i64>()
}
fn _test_generic_string_array_builder_finish<O: OffsetSizeTrait>() {
let mut builder = GenericStringBuilder::<O>::with_capacity(3, 11);
builder.append_value("hello");
builder.append_value("rust");
builder.append_null();
builder.finish();
assert!(builder.is_empty());
assert_eq!(&[O::zero()], builder.offsets_slice());
builder.append_value("arrow");
builder.append_value("parquet");
let arr = builder.finish();
assert!(arr.nulls().is_none());
assert_eq!(GenericStringArray::<O>::from(vec!["arrow", "parquet"]), arr,)
}
#[test]
fn test_string_array_builder_finish() {
_test_generic_string_array_builder_finish::<i32>()
}
#[test]
fn test_large_string_array_builder_finish() {
_test_generic_string_array_builder_finish::<i64>()
}
fn _test_generic_string_array_builder_finish_cloned<O: OffsetSizeTrait>() {
let mut builder = GenericStringBuilder::<O>::with_capacity(3, 11);
builder.append_value("hello");
builder.append_value("rust");
builder.append_null();
let mut arr = builder.finish_cloned();
assert!(!builder.is_empty());
assert_eq!(3, arr.len());
builder.append_value("arrow");
builder.append_value("parquet");
arr = builder.finish();
assert!(arr.nulls().is_some());
assert_eq!(&[O::zero()], builder.offsets_slice());
assert_eq!(5, arr.len());
}
#[test]
fn test_string_array_builder_finish_cloned() {
_test_generic_string_array_builder_finish_cloned::<i32>()
}
#[test]
fn test_large_string_array_builder_finish_cloned() {
_test_generic_string_array_builder_finish_cloned::<i64>()
}
#[test]
fn test_extend() {
let mut builder = GenericStringBuilder::<i32>::new();
builder.extend(["a", "b", "c", "", "a", "b", "c"].into_iter().map(Some));
builder.extend(["d", "cupcakes", "hello"].into_iter().map(Some));
let array = builder.finish();
assert_eq!(array.value_offsets(), &[0, 1, 2, 3, 3, 4, 5, 6, 7, 15, 20]);
assert_eq!(array.value_data(), b"abcabcdcupcakeshello");
}
#[test]
fn test_write_str() {
let mut builder = GenericStringBuilder::<i32>::new();
write!(builder, "foo").unwrap();
builder.append_value("");
writeln!(builder, "bar").unwrap();
builder.append_value("");
write!(builder, "fiz").unwrap();
write!(builder, "buz").unwrap();
builder.append_value("");
let a = builder.finish();
let r: Vec<_> = a.iter().flatten().collect();
assert_eq!(r, &["foo", "bar\n", "fizbuz"])
}
#[test]
fn test_write_bytes() {
let mut builder = GenericBinaryBuilder::<i32>::new();
write!(builder, "foo").unwrap();
builder.append_value("");
writeln!(builder, "bar").unwrap();
builder.append_value("");
write!(builder, "fiz").unwrap();
write!(builder, "buz").unwrap();
builder.append_value("");
let a = builder.finish();
let r: Vec<_> = a.iter().flatten().collect();
assert_eq!(
r,
&["foo".as_bytes(), "bar\n".as_bytes(), "fizbuz".as_bytes()]
)
}
#[test]
fn test_append_array_without_nulls() {
let input = vec![
"hello", "world", "how", "are", "you", "doing", "today", "I", "am", "doing", "well",
"thank", "you", "for", "asking",
];
let arr1 = GenericStringArray::<i32>::from(input[..3].to_vec());
let arr2 = GenericStringArray::<i32>::from(input[3..7].to_vec());
let arr3 = GenericStringArray::<i32>::from(input[7..].to_vec());
let mut builder = GenericStringBuilder::<i32>::new();
builder.append_array(&arr1);
builder.append_array(&arr2);
builder.append_array(&arr3);
let actual = builder.finish();
let expected = GenericStringArray::<i32>::from(input);
assert_eq!(actual, expected);
}
#[test]
fn test_append_array_with_nulls() {
let input = vec![
Some("hello"),
None,
Some("how"),
None,
None,
None,
None,
Some("I"),
Some("am"),
Some("doing"),
Some("well"),
];
let arr1 = GenericStringArray::<i32>::from(input[..3].to_vec());
let arr2 = GenericStringArray::<i32>::from(input[3..7].to_vec());
let arr3 = GenericStringArray::<i32>::from(input[7..].to_vec());
let mut builder = GenericStringBuilder::<i32>::new();
builder.append_array(&arr1);
builder.append_array(&arr2);
builder.append_array(&arr3);
let actual = builder.finish();
let expected = GenericStringArray::<i32>::from(input);
assert_eq!(actual, expected);
}
#[test]
fn test_append_empty_array() {
let arr = GenericStringArray::<i32>::from(Vec::<&str>::new());
let mut builder = GenericStringBuilder::<i32>::new();
builder.append_array(&arr);
let result = builder.finish();
assert_eq!(result.len(), 0);
}
#[test]
fn test_append_array_with_offset_not_starting_at_0() {
let input = vec![
Some("hello"),
None,
Some("how"),
None,
None,
None,
None,
Some("I"),
Some("am"),
Some("doing"),
Some("well"),
];
let full_array = GenericStringArray::<i32>::from(input);
let sliced = full_array.slice(1, 4);
assert_ne!(sliced.offsets()[0].as_usize(), 0);
assert_ne!(sliced.offsets().last(), full_array.offsets().last());
let mut builder = GenericStringBuilder::<i32>::new();
builder.append_array(&sliced);
let actual = builder.finish();
let expected = GenericStringArray::<i32>::from(vec![None, Some("how"), None, None]);
assert_eq!(actual, expected);
}
#[test]
fn test_append_underlying_null_values_added_as_is() {
let input_1_array_with_nulls = {
let input = vec![
"hello", "world", "how", "are", "you", "doing", "today", "I", "am",
];
let (offsets, buffer, _) = GenericStringArray::<i32>::from(input).into_parts();
GenericStringArray::<i32>::new(
offsets,
buffer,
Some(NullBuffer::from(&[
true, false, true, false, false, true, true, true, false,
])),
)
};
let input_2_array_with_nulls = {
let input = vec!["doing", "well", "thank", "you", "for", "asking"];
let (offsets, buffer, _) = GenericStringArray::<i32>::from(input).into_parts();
GenericStringArray::<i32>::new(
offsets,
buffer,
Some(NullBuffer::from(&[false, false, true, false, true, true])),
)
};
let mut builder = GenericStringBuilder::<i32>::new();
builder.append_array(&input_1_array_with_nulls);
builder.append_array(&input_2_array_with_nulls);
let actual = builder.finish();
let expected = GenericStringArray::<i32>::from(vec![
Some("hello"),
None, Some("how"),
None, None, Some("doing"),
Some("today"),
Some("I"),
None, None, None, Some("thank"),
None, Some("for"),
Some("asking"),
]);
assert_eq!(actual, expected);
let expected_underlying_buffer = Buffer::from(
[
"hello", "world", "how", "are", "you", "doing", "today", "I", "am", "doing",
"well", "thank", "you", "for", "asking",
]
.join("")
.as_bytes(),
);
assert_eq!(actual.values(), &expected_underlying_buffer);
}
#[test]
fn append_array_with_continues_indices() {
let input = vec![
"hello", "world", "how", "are", "you", "doing", "today", "I", "am", "doing", "well",
"thank", "you", "for", "asking",
];
let full_array = GenericStringArray::<i32>::from(input);
let slice1 = full_array.slice(0, 3);
let slice2 = full_array.slice(3, 4);
let slice3 = full_array.slice(7, full_array.len() - 7);
let mut builder = GenericStringBuilder::<i32>::new();
builder.append_array(&slice1);
builder.append_array(&slice2);
builder.append_array(&slice3);
let actual = builder.finish();
assert_eq!(actual, full_array);
}
}