use std::marker::PhantomData;
use std::mem::size_of;
use std::sync::Arc;
use datafusion_common::{Result, exec_datafusion_err, internal_err};
use arrow::array::{
Array, ArrayAccessor, ArrayDataBuilder, ArrayRef, BinaryArray, ByteView,
GenericStringArray, LargeStringArray, OffsetSizeTrait, StringArray, StringViewArray,
make_view,
};
use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
pub(crate) struct ConcatStringBuilder {
offsets_buffer: MutableBuffer,
value_buffer: MutableBuffer,
tainted: bool,
}
impl ConcatStringBuilder {
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
let capacity = item_capacity
.checked_add(1)
.map(|i| i.saturating_mul(size_of::<i32>()))
.expect("capacity integer overflow");
let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
unsafe { offsets_buffer.push_unchecked(0_i32) };
Self {
offsets_buffer,
value_buffer: MutableBuffer::with_capacity(data_capacity),
tainted: false,
}
}
pub fn write<const CHECK_VALID: bool>(
&mut self,
column: &ColumnarValueRef,
i: usize,
) {
match column {
ColumnarValueRef::Scalar(s) => {
self.value_buffer.extend_from_slice(s);
self.tainted = true;
}
ColumnarValueRef::NullableArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableLargeStringArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableStringViewArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableBinaryArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer.extend_from_slice(array.value(i));
}
self.tainted = true;
}
ColumnarValueRef::NonNullableArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableLargeStringArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableStringViewArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableBinaryArray(array) => {
self.value_buffer.extend_from_slice(array.value(i));
self.tainted = true;
}
}
}
pub fn append_offset(&mut self) -> Result<()> {
let next_offset: i32 = self
.value_buffer
.len()
.try_into()
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.offsets_buffer.push(next_offset);
Ok(())
}
pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<StringArray> {
let row_count = self.offsets_buffer.len() / size_of::<i32>() - 1;
if let Some(ref null_buffer) = null_buffer
&& null_buffer.len() != row_count
{
return internal_err!(
"Null buffer and offsets buffer must be the same length"
);
}
let array_builder = ArrayDataBuilder::new(DataType::Utf8)
.len(row_count)
.add_buffer(self.offsets_buffer.into())
.add_buffer(self.value_buffer.into())
.nulls(null_buffer);
if self.tainted {
let array_data = array_builder.build()?;
Ok(StringArray::from(array_data))
} else {
let array_data = unsafe { array_builder.build_unchecked() };
Ok(StringArray::from(array_data))
}
}
}
pub(crate) struct ConcatStringViewBuilder {
views: Vec<u128>,
data: Vec<u8>,
block: Vec<u8>,
tainted: bool,
}
impl ConcatStringViewBuilder {
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
Self {
views: Vec::with_capacity(item_capacity),
data: Vec::with_capacity(data_capacity),
block: vec![],
tainted: false,
}
}
pub fn write<const CHECK_VALID: bool>(
&mut self,
column: &ColumnarValueRef,
i: usize,
) {
match column {
ColumnarValueRef::Scalar(s) => {
self.block.extend_from_slice(s);
self.tainted = true;
}
ColumnarValueRef::NullableArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableLargeStringArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableStringViewArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableBinaryArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.extend_from_slice(array.value(i));
}
self.tainted = true;
}
ColumnarValueRef::NonNullableArray(array) => {
self.block.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableLargeStringArray(array) => {
self.block.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableStringViewArray(array) => {
self.block.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableBinaryArray(array) => {
self.block.extend_from_slice(array.value(i));
self.tainted = true;
}
}
}
pub fn append_offset(&mut self) -> Result<()> {
if self.tainted {
std::str::from_utf8(&self.block)
.map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?;
}
let v = &self.block;
if v.len() > 12 {
let offset: u32 = self
.data
.len()
.try_into()
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.data.extend_from_slice(v);
self.views.push(make_view(v, 0, offset));
} else {
self.views.push(make_view(v, 0, 0));
}
self.block.clear();
self.tainted = false;
Ok(())
}
pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
if let Some(ref nulls) = null_buffer
&& nulls.len() != self.views.len()
{
return internal_err!(
"Null buffer length ({}) must match row count ({})",
nulls.len(),
self.views.len()
);
}
let buffers: Vec<Buffer> = if self.data.is_empty() {
vec![]
} else {
vec![Buffer::from(self.data)]
};
let array = unsafe {
StringViewArray::new_unchecked(
ScalarBuffer::from(self.views),
buffers,
null_buffer,
)
};
Ok(array)
}
}
pub(crate) struct ConcatLargeStringBuilder {
offsets_buffer: MutableBuffer,
value_buffer: MutableBuffer,
tainted: bool,
}
impl ConcatLargeStringBuilder {
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
let capacity = item_capacity
.checked_add(1)
.map(|i| i.saturating_mul(size_of::<i64>()))
.expect("capacity integer overflow");
let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
unsafe { offsets_buffer.push_unchecked(0_i64) };
Self {
offsets_buffer,
value_buffer: MutableBuffer::with_capacity(data_capacity),
tainted: false,
}
}
pub fn write<const CHECK_VALID: bool>(
&mut self,
column: &ColumnarValueRef,
i: usize,
) {
match column {
ColumnarValueRef::Scalar(s) => {
self.value_buffer.extend_from_slice(s);
self.tainted = true;
}
ColumnarValueRef::NullableArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableLargeStringArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableStringViewArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableBinaryArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer.extend_from_slice(array.value(i));
}
self.tainted = true;
}
ColumnarValueRef::NonNullableArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableLargeStringArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableStringViewArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableBinaryArray(array) => {
self.value_buffer.extend_from_slice(array.value(i));
self.tainted = true;
}
}
}
pub fn append_offset(&mut self) -> Result<()> {
let next_offset: i64 = self
.value_buffer
.len()
.try_into()
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.offsets_buffer.push(next_offset);
Ok(())
}
pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<LargeStringArray> {
let row_count = self.offsets_buffer.len() / size_of::<i64>() - 1;
if let Some(ref null_buffer) = null_buffer
&& null_buffer.len() != row_count
{
return internal_err!(
"Null buffer and offsets buffer must be the same length"
);
}
let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
.len(row_count)
.add_buffer(self.offsets_buffer.into())
.add_buffer(self.value_buffer.into())
.nulls(null_buffer);
if self.tainted {
let array_data = array_builder.build()?;
Ok(LargeStringArray::from(array_data))
} else {
let array_data = unsafe { array_builder.build_unchecked() };
Ok(LargeStringArray::from(array_data))
}
}
}
pub(crate) struct GenericStringArrayBuilder<O: OffsetSizeTrait> {
offsets_buffer: MutableBuffer,
value_buffer: MutableBuffer,
placeholder_count: usize,
_phantom: PhantomData<O>,
}
impl<O: OffsetSizeTrait> GenericStringArrayBuilder<O> {
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
let capacity = item_capacity
.checked_add(1)
.map(|i| i.saturating_mul(size_of::<O>()))
.expect("capacity integer overflow");
let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
offsets_buffer.push(O::usize_as(0));
Self {
offsets_buffer,
value_buffer: MutableBuffer::with_capacity(data_capacity),
placeholder_count: 0,
_phantom: PhantomData,
}
}
#[inline]
pub fn append_value(&mut self, value: &str) {
self.value_buffer.extend_from_slice(value.as_bytes());
let next_offset =
O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
self.offsets_buffer.push(next_offset);
}
#[inline]
pub fn append_placeholder(&mut self) {
let next_offset =
O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
self.offsets_buffer.push(next_offset);
self.placeholder_count += 1;
}
#[inline]
pub unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], mut map: F) {
self.value_buffer.extend(src.iter().map(|&b| map(b)));
let next_offset =
O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
self.offsets_buffer.push(next_offset);
}
#[inline]
pub fn append_with<F>(&mut self, f: F)
where
F: FnOnce(&mut GenericStringWriter<'_>),
{
let mut writer = GenericStringWriter {
value_buffer: &mut self.value_buffer,
};
f(&mut writer);
let next_offset =
O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
self.offsets_buffer.push(next_offset);
}
pub fn finish(
self,
null_buffer: Option<NullBuffer>,
) -> Result<GenericStringArray<O>> {
let row_count = self.offsets_buffer.len() / size_of::<O>() - 1;
if let Some(ref n) = null_buffer
&& n.len() != row_count
{
return internal_err!(
"Null buffer length ({}) must match row count ({row_count})",
n.len()
);
}
let null_count = null_buffer.as_ref().map_or(0, |n| n.null_count());
debug_assert!(
null_count >= self.placeholder_count,
"{} placeholder rows but null buffer has {null_count} nulls",
self.placeholder_count,
);
let array_data = ArrayDataBuilder::new(GenericStringArray::<O>::DATA_TYPE)
.len(row_count)
.add_buffer(self.offsets_buffer.into())
.add_buffer(self.value_buffer.into())
.nulls(null_buffer);
let array_data = unsafe { array_data.build_unchecked() };
Ok(GenericStringArray::<O>::from(array_data))
}
}
pub(crate) const STRING_VIEW_INIT_BLOCK_SIZE: u32 = 8 * 1024;
pub(crate) const STRING_VIEW_MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024;
pub(crate) trait StringWriter {
fn write_str(&mut self, s: &str);
fn write_char(&mut self, c: char);
}
pub(crate) struct GenericStringWriter<'a> {
value_buffer: &'a mut MutableBuffer,
}
impl StringWriter for GenericStringWriter<'_> {
#[inline(always)]
fn write_str(&mut self, s: &str) {
push_bytes_to_mutable_buffer(self.value_buffer, s.as_bytes());
}
#[inline(always)]
fn write_char(&mut self, c: char) {
push_char_to_mutable_buffer(self.value_buffer, c);
}
}
#[inline(always)]
fn push_bytes_to_mutable_buffer(value_buffer: &mut MutableBuffer, bytes: &[u8]) {
let n = bytes.len();
let old_len = value_buffer.len();
value_buffer.reserve(n);
unsafe {
let dst = value_buffer.as_mut_ptr().add(old_len);
let src = bytes.as_ptr();
match n {
0 => {}
1 => std::ptr::copy_nonoverlapping(src, dst, 1),
2 => std::ptr::copy_nonoverlapping(src, dst, 2),
3 => std::ptr::copy_nonoverlapping(src, dst, 3),
4 => std::ptr::copy_nonoverlapping(src, dst, 4),
5 => std::ptr::copy_nonoverlapping(src, dst, 5),
6 => std::ptr::copy_nonoverlapping(src, dst, 6),
7 => std::ptr::copy_nonoverlapping(src, dst, 7),
8 => std::ptr::copy_nonoverlapping(src, dst, 8),
_ => std::ptr::copy_nonoverlapping(src, dst, n),
}
value_buffer.set_len(old_len + n);
}
}
#[inline(always)]
fn push_char_to_mutable_buffer(value_buffer: &mut MutableBuffer, c: char) {
let len = c.len_utf8();
let old_len = value_buffer.len();
value_buffer.reserve(len);
unsafe {
let dst = value_buffer.as_mut_ptr().add(old_len);
if len == 1 {
*dst = c as u8;
} else {
c.encode_utf8(std::slice::from_raw_parts_mut(dst, len));
}
value_buffer.set_len(old_len + len);
}
}
pub(crate) struct StringViewArrayBuilder {
views: Vec<u128>,
in_progress: Vec<u8>,
completed: Vec<Buffer>,
block_size: u32,
placeholder_count: usize,
}
impl StringViewArrayBuilder {
pub fn with_capacity(item_capacity: usize) -> Self {
Self {
views: Vec::with_capacity(item_capacity),
in_progress: Vec::new(),
completed: Vec::new(),
block_size: STRING_VIEW_INIT_BLOCK_SIZE,
placeholder_count: 0,
}
}
fn next_block_size(&mut self) -> u32 {
if self.block_size < STRING_VIEW_MAX_BLOCK_SIZE {
self.block_size = self.block_size.saturating_mul(2);
}
self.block_size
}
#[inline]
pub fn append_value(&mut self, value: &str) {
let v = value.as_bytes();
let length: u32 =
i32::try_from(v.len()).expect("value length exceeds i32::MAX") as u32;
if length <= 12 {
self.views.push(make_view(v, 0, 0));
return;
}
let required_cap = self.in_progress.len() + length as usize;
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = (length as usize).max(self.next_block_size() as usize);
self.in_progress.reserve(to_reserve);
}
let offset: u32 = i32::try_from(self.in_progress.len())
.expect("offset exceeds i32::MAX") as u32;
self.in_progress.extend_from_slice(v);
self.views.push(self.make_long_view(length, offset, v));
}
#[inline]
pub fn append_placeholder(&mut self) {
self.views.push(0);
self.placeholder_count += 1;
}
#[inline]
fn ensure_long_capacity(&mut self, length: u32) {
let required_cap = self.in_progress.len() + length as usize;
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = (length as usize).max(self.next_block_size() as usize);
self.in_progress.reserve(to_reserve);
}
}
#[inline]
fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) -> u128 {
let buffer_index: u32 = i32::try_from(self.completed.len())
.expect("buffer count exceeds i32::MAX")
as u32;
ByteView {
length,
prefix: u32::from_le_bytes(prefix_bytes[..4].try_into().unwrap()),
buffer_index,
offset,
}
.into()
}
#[inline]
pub unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], mut map: F) {
let length: u32 =
i32::try_from(src.len()).expect("value length exceeds i32::MAX") as u32;
if length <= 12 {
let mut bytes = [0u8; 12];
for (d, &b) in bytes[..src.len()].iter_mut().zip(src) {
*d = map(b);
}
self.views.push(make_view(&bytes[..src.len()], 0, 0));
return;
}
self.ensure_long_capacity(length);
let cursor = self.in_progress.len();
let offset: u32 = i32::try_from(cursor).expect("offset exceeds i32::MAX") as u32;
self.in_progress.extend(src.iter().map(|&b| map(b)));
self.views
.push(self.make_long_view(length, offset, &self.in_progress[cursor..]));
}
#[inline]
pub fn append_with<F>(&mut self, f: F)
where
F: FnOnce(&mut StringViewWriter<'_>),
{
let mut writer = StringViewWriter {
inline_buf: [0u8; 12],
inline_len: 0,
spill_cursor: None,
builder: self,
};
f(&mut writer);
let StringViewWriter {
inline_buf,
inline_len,
spill_cursor,
..
} = writer;
match spill_cursor {
None => {
self.views
.push(make_view(&inline_buf[..inline_len as usize], 0, 0));
}
Some(start) => {
let end = self.in_progress.len();
let length: u32 = i32::try_from(end - start)
.expect("value length exceeds i32::MAX")
as u32;
let offset: u32 =
i32::try_from(start).expect("offset exceeds i32::MAX") as u32;
self.views.push(self.make_long_view(
length,
offset,
&self.in_progress[start..],
));
}
}
}
fn flush_in_progress(&mut self) {
if !self.in_progress.is_empty() {
let block = std::mem::take(&mut self.in_progress);
self.completed.push(Buffer::from_vec(block));
}
}
pub fn finish(mut self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
if let Some(ref n) = null_buffer
&& n.len() != self.views.len()
{
return internal_err!(
"Null buffer length ({}) must match row count ({})",
n.len(),
self.views.len()
);
}
let null_count = null_buffer.as_ref().map_or(0, |n| n.null_count());
debug_assert!(
null_count >= self.placeholder_count,
"{} placeholder rows but null buffer has {null_count} nulls",
self.placeholder_count,
);
self.flush_in_progress();
let array = unsafe {
StringViewArray::new_unchecked(
ScalarBuffer::from(self.views),
self.completed,
null_buffer,
)
};
Ok(array)
}
}
pub(crate) struct StringViewWriter<'a> {
inline_buf: [u8; 12],
inline_len: u8,
spill_cursor: Option<usize>,
builder: &'a mut StringViewArrayBuilder,
}
impl StringWriter for StringViewWriter<'_> {
#[inline]
fn write_str(&mut self, s: &str) {
let bytes = s.as_bytes();
if self.spill_cursor.is_some() {
self.builder.in_progress.extend_from_slice(bytes);
return;
}
let inline_len = self.inline_len as usize;
let new_len = inline_len + bytes.len();
if new_len <= 12 {
self.inline_buf[inline_len..new_len].copy_from_slice(bytes);
self.inline_len = new_len as u8;
return;
}
self.builder.ensure_long_capacity(new_len as u32);
let cursor = self.builder.in_progress.len();
self.builder
.in_progress
.extend_from_slice(&self.inline_buf[..inline_len]);
self.builder.in_progress.extend_from_slice(bytes);
self.spill_cursor = Some(cursor);
}
#[inline]
fn write_char(&mut self, c: char) {
let len = c.len_utf8();
if self.spill_cursor.is_some() {
push_char_to_vec(&mut self.builder.in_progress, c);
return;
}
let inline_len = self.inline_len as usize;
let new_len = inline_len + len;
if new_len <= 12 {
c.encode_utf8(&mut self.inline_buf[inline_len..new_len]);
self.inline_len = new_len as u8;
return;
}
self.builder.ensure_long_capacity(new_len as u32);
let cursor = self.builder.in_progress.len();
self.builder
.in_progress
.extend_from_slice(&self.inline_buf[..inline_len]);
push_char_to_vec(&mut self.builder.in_progress, c);
self.spill_cursor = Some(cursor);
}
}
#[inline]
fn push_char_to_vec(v: &mut Vec<u8>, c: char) {
let mut buf = [0u8; 4];
v.extend_from_slice(c.encode_utf8(&mut buf).as_bytes());
}
pub(crate) trait BulkNullStringArrayBuilder {
type Writer<'a>: StringWriter
where
Self: 'a;
fn append_value(&mut self, value: &str);
fn append_placeholder(&mut self);
fn append_with<F>(&mut self, f: F)
where
F: for<'a> FnOnce(&mut Self::Writer<'a>);
unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map: F);
fn finish(self, nulls: Option<NullBuffer>) -> Result<ArrayRef>;
}
impl<O: OffsetSizeTrait> BulkNullStringArrayBuilder for GenericStringArrayBuilder<O> {
type Writer<'a> = GenericStringWriter<'a>;
#[inline]
fn append_value(&mut self, value: &str) {
GenericStringArrayBuilder::<O>::append_value(self, value)
}
#[inline]
fn append_placeholder(&mut self) {
GenericStringArrayBuilder::<O>::append_placeholder(self)
}
#[inline]
fn append_with<F>(&mut self, f: F)
where
F: for<'a> FnOnce(&mut Self::Writer<'a>),
{
GenericStringArrayBuilder::<O>::append_with(self, f)
}
#[inline]
unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map: F) {
unsafe { GenericStringArrayBuilder::<O>::append_byte_map(self, src, map) }
}
fn finish(self, nulls: Option<NullBuffer>) -> Result<ArrayRef> {
Ok(Arc::new(GenericStringArrayBuilder::<O>::finish(
self, nulls,
)?))
}
}
impl BulkNullStringArrayBuilder for StringViewArrayBuilder {
type Writer<'a> = StringViewWriter<'a>;
#[inline]
fn append_value(&mut self, value: &str) {
StringViewArrayBuilder::append_value(self, value)
}
#[inline]
fn append_placeholder(&mut self) {
StringViewArrayBuilder::append_placeholder(self)
}
#[inline]
fn append_with<F>(&mut self, f: F)
where
F: for<'a> FnOnce(&mut Self::Writer<'a>),
{
StringViewArrayBuilder::append_with(self, f)
}
#[inline]
unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map: F) {
unsafe { StringViewArrayBuilder::append_byte_map(self, src, map) }
}
fn finish(self, nulls: Option<NullBuffer>) -> Result<ArrayRef> {
Ok(Arc::new(StringViewArrayBuilder::finish(self, nulls)?))
}
}
#[inline(never)]
pub(crate) fn append_view(
views_buffer: &mut Vec<u128>,
original_view: &u128,
substr: &str,
start_offset: u32,
) {
let substr_len = substr.len();
let sub_view = if substr_len > 12 {
let view = ByteView::from(*original_view);
make_view(
substr.as_bytes(),
view.buffer_index,
view.offset + start_offset,
)
} else {
make_view(substr.as_bytes(), 0, 0)
};
views_buffer.push(sub_view);
}
#[derive(Debug)]
pub(crate) enum ColumnarValueRef<'a> {
Scalar(&'a [u8]),
NullableArray(&'a StringArray),
NonNullableArray(&'a StringArray),
NullableLargeStringArray(&'a LargeStringArray),
NonNullableLargeStringArray(&'a LargeStringArray),
NullableStringViewArray(&'a StringViewArray),
NonNullableStringViewArray(&'a StringViewArray),
NullableBinaryArray(&'a BinaryArray),
NonNullableBinaryArray(&'a BinaryArray),
}
impl ColumnarValueRef<'_> {
#[inline]
pub fn is_valid(&self, i: usize) -> bool {
match &self {
Self::Scalar(_)
| Self::NonNullableArray(_)
| Self::NonNullableLargeStringArray(_)
| Self::NonNullableStringViewArray(_)
| Self::NonNullableBinaryArray(_) => true,
Self::NullableArray(array) => array.is_valid(i),
Self::NullableStringViewArray(array) => array.is_valid(i),
Self::NullableLargeStringArray(array) => array.is_valid(i),
Self::NullableBinaryArray(array) => array.is_valid(i),
}
}
#[inline]
pub fn nulls(&self) -> Option<NullBuffer> {
match &self {
Self::Scalar(_)
| Self::NonNullableArray(_)
| Self::NonNullableStringViewArray(_)
| Self::NonNullableLargeStringArray(_)
| Self::NonNullableBinaryArray(_) => None,
Self::NullableArray(array) => array.nulls().cloned(),
Self::NullableStringViewArray(array) => array.nulls().cloned(),
Self::NullableLargeStringArray(array) => array.nulls().cloned(),
Self::NullableBinaryArray(array) => array.nulls().cloned(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn run_scenario<B, F>(mut builder: B, expected: &[Option<&str>], scenario: F)
where
B: BulkNullStringArrayBuilder,
F: FnOnce(&mut B),
{
scenario(&mut builder);
let bits: Vec<bool> = expected.iter().map(|x| x.is_some()).collect();
let nulls = if bits.iter().any(|v| !v) {
Some(NullBuffer::from(bits))
} else {
None
};
let array = builder.finish(nulls).unwrap();
let owned: Vec<Option<&str>> = expected.to_vec();
if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
assert_eq!(a, &StringArray::from(owned));
} else if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
assert_eq!(a, &LargeStringArray::from(owned));
} else if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
assert_eq!(a, &StringViewArray::from(owned));
} else {
panic!("unexpected array type");
}
}
macro_rules! check_on_all_builders {
($expected:expr, $scenario:expr $(,)?) => {{
let expected = $expected;
run_scenario(
GenericStringArrayBuilder::<i32>::with_capacity(0, 0),
expected,
$scenario,
);
run_scenario(
GenericStringArrayBuilder::<i64>::with_capacity(0, 0),
expected,
$scenario,
);
run_scenario(
StringViewArrayBuilder::with_capacity(0),
expected,
$scenario,
);
}};
}
fn assert_finish_errs_on_length_mismatch<B>(mut builder: B)
where
B: BulkNullStringArrayBuilder,
{
builder.append_value("a");
builder.append_value("b");
let nulls = NullBuffer::from(vec![true, false, true]);
assert!(builder.finish(Some(nulls)).is_err());
}
#[test]
#[should_panic(expected = "capacity integer overflow")]
fn test_overflow_concat_string_builder() {
let _builder = ConcatStringBuilder::with_capacity(usize::MAX, usize::MAX);
}
#[test]
#[should_panic(expected = "capacity integer overflow")]
fn test_overflow_concat_large_string_builder() {
let _builder = ConcatLargeStringBuilder::with_capacity(usize::MAX, usize::MAX);
}
#[test]
fn bulk_append_value_with_nulls() {
check_on_all_builders!(
&[
Some("a string longer than twelve bytes"),
None,
Some("short"),
None,
],
|b| {
b.append_value("a string longer than twelve bytes");
b.append_placeholder();
b.append_value("short");
b.append_placeholder();
},
);
}
#[test]
fn bulk_empty_builder() {
check_on_all_builders!(&[], |_b| {});
}
#[test]
fn bulk_all_placeholders() {
check_on_all_builders!(&[None, None, None], |b| {
b.append_placeholder();
b.append_placeholder();
b.append_placeholder();
});
}
#[test]
fn bulk_append_value_no_nulls() {
check_on_all_builders!(
&[
Some("foo"),
Some(""),
Some("a string longer than twelve bytes")
],
|b| {
b.append_value("foo");
b.append_value("");
b.append_value("a string longer than twelve bytes");
},
);
}
#[test]
fn bulk_append_with() {
check_on_all_builders!(
&[
Some("hello"),
None,
Some("hello world"),
Some("a long string of 25 bytes"),
Some(""),
],
|b| {
b.append_with(|w| w.write_str("hello"));
b.append_placeholder();
b.append_with(|w| {
w.write_str("hello ");
w.write_str("world");
});
b.append_with(|w| w.write_str("a long string of 25 bytes"));
b.append_with(|_w| {});
},
);
}
#[test]
fn bulk_append_with_chars() {
check_on_all_builders!(&[Some("hé!"), Some("x")], |b| {
b.append_with(|w| {
w.write_char('h');
w.write_char('é');
w.write_char('!');
});
b.append_with(|w| w.write_char('x'));
});
}
#[test]
fn bulk_append_byte_map() {
check_on_all_builders!(&[Some("HELLO"), Some("aXcaX"), Some("")], |b| unsafe {
b.append_byte_map(b"hello", |x| x.to_ascii_uppercase());
b.append_byte_map(b"abcab", |x| if x == b'b' { b'X' } else { x });
b.append_byte_map(b"", |x| x);
},);
}
#[test]
fn bulk_finish_errors_on_null_buffer_length_mismatch() {
assert_finish_errs_on_length_mismatch(
GenericStringArrayBuilder::<i32>::with_capacity(2, 4),
);
assert_finish_errs_on_length_mismatch(
GenericStringArrayBuilder::<i64>::with_capacity(2, 4),
);
assert_finish_errs_on_length_mismatch(StringViewArrayBuilder::with_capacity(2));
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "placeholder rows")]
fn string_array_builder_placeholder_without_null_mask() {
let mut builder = GenericStringArrayBuilder::<i32>::with_capacity(2, 4);
builder.append_value("a");
builder.append_placeholder();
let nulls = NullBuffer::from(vec![true, true]);
let _ = builder.finish(Some(nulls));
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "placeholder rows")]
fn string_array_builder_placeholder_with_none_null_buffer() {
let mut builder = GenericStringArrayBuilder::<i32>::with_capacity(1, 4);
builder.append_placeholder();
let _ = builder.finish(None);
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "placeholder rows")]
fn string_view_array_builder_placeholder_without_null_mask() {
let mut builder = StringViewArrayBuilder::with_capacity(2);
builder.append_value("a");
builder.append_placeholder();
let nulls = NullBuffer::from(vec![true, true]);
let _ = builder.finish(Some(nulls));
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "placeholder rows")]
fn string_view_array_builder_placeholder_with_none_null_buffer() {
let mut builder = StringViewArrayBuilder::with_capacity(1);
builder.append_placeholder();
let _ = builder.finish(None);
}
#[test]
fn string_view_array_builder_append_with_inline() {
let mut builder = StringViewArrayBuilder::with_capacity(4);
let inputs = ["hello", "world!", "", "0123456789ab"];
for s in &inputs {
builder.append_with(|w| w.write_str(s));
}
let array = builder.finish(None).unwrap();
assert_eq!(array.len(), inputs.len());
for (i, s) in inputs.iter().enumerate() {
assert_eq!(array.value(i), *s);
}
assert_eq!(array.data_buffers().len(), 0);
}
#[test]
fn string_view_array_builder_append_byte_map() {
let mut builder = StringViewArrayBuilder::with_capacity(4);
unsafe {
builder.append_byte_map(b"hello", |b| b.to_ascii_uppercase());
builder.append_byte_map(b"a long string of 25 bytes", |b| {
if b == b' ' { b'_' } else { b }
});
builder.append_byte_map(b"abcdefghijkl", |b| b);
builder.append_byte_map(b"", |b| b);
}
let array = builder.finish(None).unwrap();
assert_eq!(array.value(0), "HELLO");
assert_eq!(array.value(1), "a_long_string_of_25_bytes");
assert_eq!(array.value(2), "abcdefghijkl");
assert_eq!(array.value(3), "");
assert_eq!(array.data_buffers().len(), 1);
assert_eq!(array.data_buffers()[0].len(), 25);
}
#[test]
fn string_view_array_builder_append_with_at_inline_boundary() {
let mut builder = StringViewArrayBuilder::with_capacity(2);
builder.append_with(|w| {
w.write_str("hello");
w.write_str(" world!");
});
builder.append_with(|w| {
for _ in 0..6 {
w.write_str("ab");
}
});
let array = builder.finish(None).unwrap();
assert_eq!(array.value(0), "hello world!");
assert_eq!(array.value(1), "abababababab");
assert_eq!(array.data_buffers().len(), 0);
}
#[test]
fn string_view_array_builder_append_with_spill_on_overflow() {
let mut builder = StringViewArrayBuilder::with_capacity(1);
builder.append_with(|w| {
w.write_str("hello world!");
w.write_str("X");
});
let array = builder.finish(None).unwrap();
assert_eq!(array.value(0), "hello world!X");
assert_eq!(array.data_buffers().len(), 1);
assert_eq!(array.data_buffers()[0].len(), 13);
}
#[test]
fn string_view_array_builder_append_with_long_single_write() {
let mut builder = StringViewArrayBuilder::with_capacity(1);
builder.append_with(|w| w.write_str("a long string of 25 bytes"));
let array = builder.finish(None).unwrap();
assert_eq!(array.value(0), "a long string of 25 bytes");
assert_eq!(array.data_buffers().len(), 1);
assert_eq!(array.data_buffers()[0].len(), 25);
}
#[test]
fn string_view_array_builder_append_with_many_small_writes_spilling() {
let mut builder = StringViewArrayBuilder::with_capacity(1);
builder.append_with(|w| {
for _ in 0..30 {
w.write_str("ab");
}
});
let array = builder.finish(None).unwrap();
assert_eq!(array.value(0), "ab".repeat(30));
assert_eq!(array.data_buffers().len(), 1);
assert_eq!(array.data_buffers()[0].len(), 60);
}
#[test]
fn string_view_array_builder_append_with_chars() {
let mut builder = StringViewArrayBuilder::with_capacity(2);
builder.append_with(|w| {
w.write_char('é');
w.write_char('!');
});
builder.append_with(|w| {
for _ in 0..10 {
w.write_char('🦀');
}
});
let array = builder.finish(None).unwrap();
assert_eq!(array.value(0), "é!");
assert_eq!(array.value(1), "🦀".repeat(10));
}
#[test]
fn string_view_array_builder_append_with_block_rotation() {
const STR_LEN: usize = 500;
const N: usize = 40;
let s = "x".repeat(STR_LEN);
let mut builder = StringViewArrayBuilder::with_capacity(N);
for _ in 0..N {
builder.append_with(|w| w.write_str(&s));
}
let array = builder.finish(None).unwrap();
assert_eq!(array.len(), N);
assert!(
array.data_buffers().len() >= 2,
"expected multiple data buffers, got {}",
array.data_buffers().len()
);
let total: usize = array.data_buffers().iter().map(|b| b.len()).sum();
assert_eq!(total, N * STR_LEN);
for i in 0..N {
assert_eq!(array.value(i), s);
}
}
#[test]
fn string_view_array_builder_flushes_full_blocks() {
let value = "x".repeat(300);
let mut builder = StringViewArrayBuilder::with_capacity(100);
for _ in 0..100 {
builder.append_value(&value);
}
let array = builder.finish(None).unwrap();
assert_eq!(array.len(), 100);
assert!(
array.data_buffers().len() > 1,
"expected multiple data buffers, got {}",
array.data_buffers().len()
);
for i in 0..100 {
assert_eq!(array.value(i), value);
}
}
}