use crate::coalesce::InProgressArray;
use arrow_array::cast::AsArray;
use arrow_array::types::ByteViewType;
use arrow_array::{Array, ArrayRef, GenericByteViewArray};
use arrow_buffer::{Buffer, NullBufferBuilder};
use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
use arrow_schema::ArrowError;
use std::marker::PhantomData;
use std::sync::Arc;
pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
source: Option<Source>,
batch_size: usize,
views: Vec<u128>,
nulls: NullBufferBuilder,
current: Option<Vec<u8>>,
completed: Vec<Buffer>,
buffer_source: BufferSource,
_phantom: PhantomData<B>,
}
struct Source {
array: ArrayRef,
need_gc: bool,
ideal_buffer_size: usize,
}
impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InProgressByteViewArray")
.field("batch_size", &self.batch_size)
.field("views", &self.views.len())
.field("nulls", &self.nulls)
.field("current", &self.current.as_ref().map(|_| "Some(...)"))
.field("completed", &self.completed.len())
.finish()
}
}
impl<B: ByteViewType> InProgressByteViewArray<B> {
pub(crate) fn new(batch_size: usize) -> Self {
let buffer_source = BufferSource::new();
Self {
batch_size,
source: None,
views: Vec::new(), nulls: NullBufferBuilder::new(batch_size), current: None,
completed: vec![],
buffer_source,
_phantom: PhantomData,
}
}
fn ensure_capacity(&mut self) {
if self.views.capacity() == 0 {
self.views.reserve(self.batch_size);
}
}
fn finish_current(&mut self) {
let Some(next_buffer) = self.current.take() else {
return;
};
self.completed.push(next_buffer.into());
}
#[inline(never)]
fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
if let Some(buffer) = self.current.take() {
self.completed.push(buffer.into());
}
let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
self.completed.extend_from_slice(buffers);
if starting_buffer == 0 {
self.views.extend_from_slice(views);
} else {
let updated_views = views.iter().map(|v| {
let mut byte_view = ByteView::from(*v);
if byte_view.length > MAX_INLINE_VIEW_LEN {
byte_view.buffer_index += starting_buffer;
};
byte_view.as_u128()
});
self.views.extend(updated_views);
}
}
#[inline(never)]
fn append_views_and_copy_strings(
&mut self,
views: &[u128],
view_buffer_size: usize,
buffers: &[Buffer],
) {
let Some(current) = self.current.take() else {
let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
return;
};
let mut remaining_capacity = current.capacity() - current.len();
if view_buffer_size <= remaining_capacity {
self.append_views_and_copy_strings_inner(views, current, buffers);
return;
}
let mut num_view_to_current = 0;
for view in views {
let b = ByteView::from(*view);
let str_len = b.length;
if remaining_capacity < str_len as usize {
break;
}
if str_len > MAX_INLINE_VIEW_LEN {
remaining_capacity -= str_len as usize;
}
num_view_to_current += 1;
}
let first_views = &views[0..num_view_to_current];
let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
self.append_views_and_copy_strings_inner(first_views, current, buffers);
let completed = self.current.take().expect("completed");
self.completed.push(completed.into());
let remaining_views = &views[num_view_to_current..];
let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
}
#[inline(never)]
fn append_views_and_copy_strings_inner(
&mut self,
views: &[u128],
mut dst_buffer: Vec<u8>,
buffers: &[Buffer],
) {
assert!(self.current.is_none(), "current buffer should be None");
if views.is_empty() {
self.current = Some(dst_buffer);
return;
}
let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
#[cfg(debug_assertions)]
{
let total_length: usize = views
.iter()
.filter_map(|v| {
let b = ByteView::from(*v);
if b.length > MAX_INLINE_VIEW_LEN {
Some(b.length as usize)
} else {
None
}
})
.sum();
debug_assert!(
dst_buffer.capacity() >= total_length,
"dst_buffer capacity {} is less than total length {}",
dst_buffer.capacity(),
total_length
);
}
let new_views = views.iter().map(|v| {
let mut b: ByteView = ByteView::from(*v);
if b.length > MAX_INLINE_VIEW_LEN {
let buffer_index = b.buffer_index as usize;
let buffer_offset = b.offset as usize;
let str_len = b.length as usize;
b.offset = dst_buffer.len() as u32;
b.buffer_index = new_buffer_index;
let src = unsafe {
buffers
.get_unchecked(buffer_index)
.get_unchecked(buffer_offset..buffer_offset + str_len)
};
dst_buffer.extend_from_slice(src);
}
b.as_u128()
});
self.views.extend(new_views);
self.current = Some(dst_buffer);
}
}
impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
fn set_source(&mut self, source: Option<ArrayRef>) {
self.source = source.map(|array| {
let s = array.as_byte_view::<B>();
let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
(false, 0)
} else {
let ideal_buffer_size = s.total_buffer_bytes_used();
let actual_buffer_size =
s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
let need_gc =
ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
(need_gc, ideal_buffer_size)
};
Source {
array,
need_gc,
ideal_buffer_size,
}
})
}
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
self.ensure_capacity();
let source = self.source.take().ok_or_else(|| {
ArrowError::InvalidArgumentError(
"Internal Error: InProgressByteViewArray: source not set".to_string(),
)
})?;
let s = source.array.as_byte_view::<B>();
if let Some(nulls) = s.nulls().as_ref() {
let nulls = nulls.slice(offset, len);
self.nulls.append_buffer(&nulls);
} else {
self.nulls.append_n_non_nulls(len);
};
let buffers = s.data_buffers();
let views = &s.views().as_ref()[offset..offset + len];
if source.ideal_buffer_size == 0 {
self.views.extend_from_slice(views);
self.source = Some(source);
return Ok(());
}
if source.need_gc {
self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
} else {
self.append_views_and_update_buffer_index(views, buffers);
}
self.source = Some(source);
Ok(())
}
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
self.finish_current();
assert!(self.current.is_none());
let buffers = std::mem::take(&mut self.completed);
let views = std::mem::take(&mut self.views);
let nulls = self.nulls.finish();
self.nulls = NullBufferBuilder::new(self.batch_size);
let new_array =
unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
Ok(Arc::new(new_array))
}
}
const STARTING_BLOCK_SIZE: usize = 4 * 1024; const MAX_BLOCK_SIZE: usize = 1024 * 1024;
#[derive(Debug)]
struct BufferSource {
current_size: usize,
}
impl BufferSource {
fn new() -> Self {
Self {
current_size: STARTING_BLOCK_SIZE,
}
}
fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
let size = self.next_size(min_size);
Vec::with_capacity(size)
}
fn next_size(&mut self, min_size: usize) -> usize {
if self.current_size < MAX_BLOCK_SIZE {
self.current_size = self.current_size.saturating_mul(2);
}
if self.current_size >= min_size {
self.current_size
} else {
while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
self.current_size = self.current_size.saturating_mul(2);
}
self.current_size.max(min_size)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buffer_source() {
let mut source = BufferSource::new();
assert_eq!(source.next_buffer(1000).capacity(), 8192);
assert_eq!(source.next_buffer(1000).capacity(), 16384);
assert_eq!(source.next_buffer(1000).capacity(), 32768);
assert_eq!(source.next_buffer(1000).capacity(), 65536);
assert_eq!(source.next_buffer(1000).capacity(), 131072);
assert_eq!(source.next_buffer(1000).capacity(), 262144);
assert_eq!(source.next_buffer(1000).capacity(), 524288);
assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
}
#[test]
fn test_buffer_source_with_min_small() {
let mut source = BufferSource::new();
assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
}
#[test]
fn test_buffer_source_with_min_large() {
let mut source = BufferSource::new();
assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
}
}