use std::collections::VecDeque;
use std::sync::Arc;
use async_stream::try_stream;
use async_trait::async_trait;
use futures::StreamExt as _;
use futures::pin_mut;
use vortex_array::ArrayContext;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::arrays::ChunkedArray;
use vortex_array::dtype::DType;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_session::VortexSession;
use crate::LayoutRef;
use crate::LayoutStrategy;
use crate::segments::SegmentSinkRef;
use crate::sequence::SendableSequentialStream;
use crate::sequence::SequencePointer;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;
#[derive(Clone)]
pub struct RepartitionWriterOptions {
pub block_size_minimum: u64,
pub block_len_multiple: usize,
pub block_size_target: Option<u64>,
pub canonicalize: bool,
}
impl RepartitionWriterOptions {
fn effective_block_len(&self, dtype: &DType) -> usize {
let Some(block_size_target) = self.block_size_target else {
return self.block_len_multiple;
};
match dtype.element_size() {
Some(elem_size) if elem_size > 0 => {
let max_rows = usize::try_from(block_size_target.div_ceil(elem_size as u64))
.unwrap_or(usize::MAX);
self.block_len_multiple.min(max_rows).max(1)
}
_ => self.block_len_multiple,
}
}
}
#[derive(Clone)]
pub struct RepartitionStrategy {
child: Arc<dyn LayoutStrategy>,
options: RepartitionWriterOptions,
}
impl RepartitionStrategy {
pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
Self {
child: Arc::new(child),
options,
}
}
}
#[async_trait]
impl LayoutStrategy for RepartitionStrategy {
async fn write_stream(
&self,
ctx: ArrayContext,
segment_sink: SegmentSinkRef,
stream: SendableSequentialStream,
eof: SequencePointer,
session: &VortexSession,
) -> VortexResult<LayoutRef> {
let dtype = stream.dtype().clone();
let stream = if self.options.canonicalize {
SequentialStreamAdapter::new(
dtype.clone(),
stream.map(|chunk| {
let (sequence_id, chunk) = chunk?;
VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
}),
)
.sendable()
} else {
stream
};
let dtype_clone = dtype.clone();
let options = self.options.clone();
let block_len = options.effective_block_len(&dtype);
let block_size_minimum = options.block_size_minimum;
let repartitioned_stream = try_stream! {
let canonical_stream = stream.peekable();
pin_mut!(canonical_stream);
let mut chunks = ChunksBuffer::new(block_size_minimum, block_len);
while let Some(chunk) = canonical_stream.as_mut().next().await {
let (sequence_id, chunk) = chunk?;
let mut sequence_pointer = sequence_id.descend();
let mut offset = 0;
while offset < chunk.len() {
let end = (offset + block_len).min(chunk.len());
let sliced = chunk.slice(offset..end)?;
chunks.push_back(sliced);
offset = end;
if chunks.have_enough() {
let output_chunks = chunks.collect_exact_blocks()?;
assert!(!output_chunks.is_empty());
let chunked =
ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
if !chunked.is_empty() {
yield (
sequence_pointer.advance(),
chunked.into_array().to_canonical()?.into_array(),
)
}
}
}
if canonical_stream.as_mut().peek().await.is_none() {
let to_flush = ChunkedArray::try_new(
chunks.data.drain(..).map(|(arr, _)| arr).collect(),
dtype_clone.clone(),
)?;
if !to_flush.is_empty() {
yield (
sequence_pointer.advance(),
to_flush.into_array().to_canonical()?.into_array(),
)
}
}
}
};
self.child
.write_stream(
ctx,
segment_sink,
SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
eof,
session,
)
.await
}
fn buffered_bytes(&self) -> u64 {
self.child.buffered_bytes()
}
}
struct ChunksBuffer {
data: VecDeque<(ArrayRef, u64)>,
row_count: usize,
nbytes: u64,
block_size_minimum: u64,
block_len_multiple: usize,
}
impl ChunksBuffer {
fn new(block_size_minimum: u64, block_len_multiple: usize) -> Self {
Self {
data: Default::default(),
row_count: 0,
nbytes: 0,
block_size_minimum,
block_len_multiple,
}
}
fn have_enough(&self) -> bool {
self.nbytes >= self.block_size_minimum && self.row_count >= self.block_len_multiple
}
fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
let nblocks = self.row_count / self.block_len_multiple;
let mut res = Vec::with_capacity(self.data.len());
let mut remaining = nblocks * self.block_len_multiple;
while remaining > 0 {
let (chunk, _) = self
.pop_front()
.vortex_expect("must have at least one chunk");
let len = chunk.len();
if len > remaining {
let left = chunk.slice(0..remaining)?;
let right = chunk.slice(remaining..len)?;
self.push_front(right);
res.push(left);
remaining = 0;
} else {
res.push(chunk);
remaining -= len;
}
}
Ok(res)
}
fn push_back(&mut self, chunk: ArrayRef) {
let nb = chunk.nbytes();
self.row_count += chunk.len();
self.nbytes += nb;
self.data.push_back((chunk, nb));
}
fn push_front(&mut self, chunk: ArrayRef) {
let nb = chunk.nbytes();
self.row_count += chunk.len();
self.nbytes += nb;
self.data.push_front((chunk, nb));
}
fn pop_front(&mut self) -> Option<(ArrayRef, u64)> {
let res = self.data.pop_front();
if let Some((chunk, nb)) = res.as_ref() {
self.row_count -= chunk.len();
self.nbytes -= nb;
}
res
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use vortex_array::ArrayContext;
use vortex_array::IntoArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::FixedSizeListArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::SharedArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability::NonNullable;
use vortex_array::dtype::PType;
use vortex_array::validity::Validity;
use vortex_error::VortexResult;
use vortex_io::runtime::single::block_on;
use vortex_io::session::RuntimeSessionExt;
use super::*;
use crate::LayoutStrategy;
use crate::layouts::chunked::writer::ChunkedLayoutStrategy;
use crate::layouts::flat::writer::FlatLayoutStrategy;
use crate::segments::TestSegments;
use crate::sequence::SequenceId;
use crate::sequence::SequentialArrayStreamExt;
use crate::test::SESSION;
const ONE_MEG: u64 = 1 << 20;
#[test]
fn effective_block_len_small_elements() {
let dtype = DType::Primitive(PType::F64, NonNullable);
let options = RepartitionWriterOptions {
block_size_minimum: 0,
block_len_multiple: 8192,
block_size_target: Some(ONE_MEG),
canonicalize: false,
};
assert_eq!(options.effective_block_len(&dtype), 8192);
}
#[test]
fn effective_block_len_large_elements() {
let dtype = DType::FixedSizeList(
Arc::new(DType::Primitive(PType::F64, NonNullable)),
1000,
NonNullable,
);
let options = RepartitionWriterOptions {
block_size_minimum: 0,
block_len_multiple: 8192,
block_size_target: Some(ONE_MEG),
canonicalize: false,
};
assert_eq!(options.effective_block_len(&dtype), 132);
}
#[test]
fn effective_block_len_variable_width() {
let dtype = DType::Utf8(NonNullable);
let options = RepartitionWriterOptions {
block_size_minimum: 0,
block_len_multiple: 8192,
block_size_target: Some(ONE_MEG),
canonicalize: false,
};
assert_eq!(options.effective_block_len(&dtype), 8192);
}
#[test]
fn effective_block_len_very_large_elements() {
let dtype = DType::FixedSizeList(
Arc::new(DType::Primitive(PType::F64, NonNullable)),
1_000_000,
NonNullable,
);
let options = RepartitionWriterOptions {
block_size_minimum: 0,
block_len_multiple: 8192,
block_size_target: Some(ONE_MEG),
canonicalize: false,
};
assert_eq!(options.effective_block_len(&dtype), 1);
}
#[test]
fn repartition_large_element_type_produces_small_blocks() -> VortexResult<()> {
let list_size: u32 = 1000;
let num_lists: usize = 1000;
let total_elements = list_size as usize * num_lists;
let elements = PrimitiveArray::from_iter((0..total_elements).map(|i| i as f64));
let fsl = FixedSizeListArray::new(
elements.into_array(),
list_size,
Validity::NonNullable,
num_lists,
);
let ctx = ArrayContext::empty();
let segments = Arc::new(TestSegments::default());
let (ptr, eof) = SequenceId::root().split();
let child = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
let strategy = RepartitionStrategy::new(
child,
RepartitionWriterOptions {
block_size_minimum: 0,
block_len_multiple: 8192,
block_size_target: Some(ONE_MEG),
canonicalize: false,
},
);
let stream = fsl.into_array().to_array_stream().sequenced(ptr);
let layout = block_on(|handle| async move {
let session = SESSION.clone().with_handle(handle);
strategy
.write_stream(
ctx,
Arc::<TestSegments>::clone(&segments),
stream,
eof,
&session,
)
.await
})?;
assert_eq!(layout.row_count(), num_lists as u64);
let nchildren = layout.nchildren();
assert!(nchildren > 1, "expected multiple chunks, got {nchildren}");
for i in 0..nchildren - 1 {
let child = layout.child(i)?;
assert_eq!(
child.row_count(),
132,
"chunk {i} has {} rows, expected 131",
child.row_count()
);
}
let last = layout.child(nchildren - 1)?;
assert_eq!(last.row_count(), 1000 - 132 * (nchildren as u64 - 1));
Ok(())
}
#[test]
fn repartition_small_element_type_unchanged() -> VortexResult<()> {
let num_elements: usize = 10000;
let elements = PrimitiveArray::from_iter((0..num_elements).map(|i| i as f64));
let ctx = ArrayContext::empty();
let segments = Arc::new(TestSegments::default());
let (ptr, eof) = SequenceId::root().split();
let child = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
let strategy = RepartitionStrategy::new(
child,
RepartitionWriterOptions {
block_size_minimum: 0,
block_len_multiple: 8192,
block_size_target: Some(ONE_MEG),
canonicalize: false,
},
);
let stream = elements.into_array().to_array_stream().sequenced(ptr);
let layout = block_on(|handle| async move {
let session = SESSION.clone().with_handle(handle);
strategy
.write_stream(
ctx,
Arc::<TestSegments>::clone(&segments),
stream,
eof,
&session,
)
.await
})?;
assert_eq!(layout.row_count(), num_elements as u64);
assert_eq!(layout.nchildren(), 2);
assert_eq!(layout.child(0)?.row_count(), 8192);
assert_eq!(layout.child(1)?.row_count(), 1808);
Ok(())
}
#[test]
fn chunks_buffer_pop_front_no_panic_after_shared_execution() -> VortexResult<()> {
let n = 20_000usize;
let block_len = 10_000usize;
let constant = ConstantArray::new(42i64, n);
let shared = SharedArray::new(constant.into_array());
let shared_handle = shared.clone();
let arr = shared.into_array();
let s1 = arr.slice(0..block_len)?;
let s2 = arr.slice(block_len..n)?;
let mut buf = ChunksBuffer::new(0, block_len);
buf.push_back(s1);
buf.push_back(s2);
let _output = buf.pop_front().unwrap();
use vortex_array::arrays::shared::SharedArrayExt;
shared_handle.get_or_compute(|source| source.to_canonical())?;
let _s2 = buf.pop_front().unwrap();
assert_eq!(buf.nbytes, 0);
assert_eq!(buf.row_count, 0);
Ok(())
}
}