1use std::collections::VecDeque;
5use std::sync::Arc;
6
7use async_stream::try_stream;
8use async_trait::async_trait;
9use futures::StreamExt as _;
10use futures::pin_mut;
11use vortex_array::ArrayContext;
12use vortex_array::ArrayRef;
13use vortex_array::IntoArray;
14use vortex_array::arrays::ChunkedArray;
15use vortex_array::dtype::DType;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_session::VortexSession;
19
20use crate::LayoutRef;
21use crate::LayoutStrategy;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt;
27
28#[derive(Clone)]
29pub struct RepartitionWriterOptions {
30 pub block_size_minimum: u64,
32 pub block_len_multiple: usize,
34 pub block_size_target: Option<u64>,
47 pub canonicalize: bool,
48}
49
50impl RepartitionWriterOptions {
51 fn effective_block_len(&self, dtype: &DType) -> usize {
57 let Some(block_size_target) = self.block_size_target else {
58 return self.block_len_multiple;
59 };
60 match dtype.element_size() {
61 Some(elem_size) if elem_size > 0 => {
62 let max_rows = usize::try_from(block_size_target.div_ceil(elem_size as u64))
65 .unwrap_or(usize::MAX);
66 self.block_len_multiple.min(max_rows).max(1)
67 }
68 _ => self.block_len_multiple,
69 }
70 }
71}
72
73#[derive(Clone)]
78pub struct RepartitionStrategy {
79 child: Arc<dyn LayoutStrategy>,
80 options: RepartitionWriterOptions,
81}
82
83impl RepartitionStrategy {
84 pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
85 Self {
86 child: Arc::new(child),
87 options,
88 }
89 }
90}
91
92#[async_trait]
93impl LayoutStrategy for RepartitionStrategy {
94 async fn write_stream(
95 &self,
96 ctx: ArrayContext,
97 segment_sink: SegmentSinkRef,
98 stream: SendableSequentialStream,
99 eof: SequencePointer,
100 session: &VortexSession,
101 ) -> VortexResult<LayoutRef> {
102 let dtype = stream.dtype().clone();
105 let stream = if self.options.canonicalize {
106 SequentialStreamAdapter::new(
107 dtype.clone(),
108 stream.map(|chunk| {
109 let (sequence_id, chunk) = chunk?;
110 VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
111 }),
112 )
113 .sendable()
114 } else {
115 stream
116 };
117
118 let dtype_clone = dtype.clone();
119 let options = self.options.clone();
120
121 let block_len = options.effective_block_len(&dtype);
125 let block_size_minimum = options.block_size_minimum;
126
127 let repartitioned_stream = try_stream! {
128 let canonical_stream = stream.peekable();
129 pin_mut!(canonical_stream);
130
131 let mut chunks = ChunksBuffer::new(block_size_minimum, block_len);
132 while let Some(chunk) = canonical_stream.as_mut().next().await {
133 let (sequence_id, chunk) = chunk?;
134 let mut sequence_pointer = sequence_id.descend();
135 let mut offset = 0;
136 while offset < chunk.len() {
137 let end = (offset + block_len).min(chunk.len());
138 let sliced = chunk.slice(offset..end)?;
139 chunks.push_back(sliced);
140 offset = end;
141
142 if chunks.have_enough() {
143 let output_chunks = chunks.collect_exact_blocks()?;
144 assert!(!output_chunks.is_empty());
145 let chunked =
146 ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
147 if !chunked.is_empty() {
148 yield (
149 sequence_pointer.advance(),
150 chunked.into_array().to_canonical()?.into_array(),
151 )
152 }
153 }
154 }
155 if canonical_stream.as_mut().peek().await.is_none() {
156 let to_flush = ChunkedArray::try_new(
157 chunks.data.drain(..).map(|(arr, _)| arr).collect(),
158 dtype_clone.clone(),
159 )?;
160 if !to_flush.is_empty() {
161 yield (
162 sequence_pointer.advance(),
163 to_flush.into_array().to_canonical()?.into_array(),
164 )
165 }
166 }
167 }
168 };
169
170 self.child
171 .write_stream(
172 ctx,
173 segment_sink,
174 SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
175 eof,
176 session,
177 )
178 .await
179 }
180
181 fn buffered_bytes(&self) -> u64 {
182 self.child.buffered_bytes()
187 }
188}
189
190struct ChunksBuffer {
191 data: VecDeque<(ArrayRef, u64)>,
195 row_count: usize,
196 nbytes: u64,
197 block_size_minimum: u64,
198 block_len_multiple: usize,
199}
200
201impl ChunksBuffer {
202 fn new(block_size_minimum: u64, block_len_multiple: usize) -> Self {
203 Self {
204 data: Default::default(),
205 row_count: 0,
206 nbytes: 0,
207 block_size_minimum,
208 block_len_multiple,
209 }
210 }
211
212 fn have_enough(&self) -> bool {
213 self.nbytes >= self.block_size_minimum && self.row_count >= self.block_len_multiple
214 }
215
216 fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
217 let nblocks = self.row_count / self.block_len_multiple;
218 let mut res = Vec::with_capacity(self.data.len());
219 let mut remaining = nblocks * self.block_len_multiple;
220 while remaining > 0 {
221 let (chunk, _) = self
222 .pop_front()
223 .vortex_expect("must have at least one chunk");
224 let len = chunk.len();
225
226 if len > remaining {
227 let left = chunk.slice(0..remaining)?;
228 let right = chunk.slice(remaining..len)?;
229 self.push_front(right);
230 res.push(left);
231 remaining = 0;
232 } else {
233 res.push(chunk);
234 remaining -= len;
235 }
236 }
237 Ok(res)
238 }
239
240 fn push_back(&mut self, chunk: ArrayRef) {
241 let nb = chunk.nbytes();
242 self.row_count += chunk.len();
243 self.nbytes += nb;
244 self.data.push_back((chunk, nb));
245 }
246
247 fn push_front(&mut self, chunk: ArrayRef) {
248 let nb = chunk.nbytes();
249 self.row_count += chunk.len();
250 self.nbytes += nb;
251 self.data.push_front((chunk, nb));
252 }
253
254 fn pop_front(&mut self) -> Option<(ArrayRef, u64)> {
255 let res = self.data.pop_front();
256 if let Some((chunk, nb)) = res.as_ref() {
257 self.row_count -= chunk.len();
258 self.nbytes -= nb;
259 }
260 res
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use std::sync::Arc;
267
268 use vortex_array::ArrayContext;
269 use vortex_array::IntoArray;
270 use vortex_array::arrays::ConstantArray;
271 use vortex_array::arrays::FixedSizeListArray;
272 use vortex_array::arrays::PrimitiveArray;
273 use vortex_array::arrays::SharedArray;
274 use vortex_array::dtype::DType;
275 use vortex_array::dtype::Nullability::NonNullable;
276 use vortex_array::dtype::PType;
277 use vortex_array::validity::Validity;
278 use vortex_error::VortexResult;
279 use vortex_io::runtime::single::block_on;
280 use vortex_io::session::RuntimeSessionExt;
281
282 use super::*;
283 use crate::LayoutStrategy;
284 use crate::layouts::chunked::writer::ChunkedLayoutStrategy;
285 use crate::layouts::flat::writer::FlatLayoutStrategy;
286 use crate::segments::TestSegments;
287 use crate::sequence::SequenceId;
288 use crate::sequence::SequentialArrayStreamExt;
289 use crate::test::SESSION;
290
291 const ONE_MEG: u64 = 1 << 20;
292
293 #[test]
294 fn effective_block_len_small_elements() {
295 let dtype = DType::Primitive(PType::F64, NonNullable);
297 let options = RepartitionWriterOptions {
298 block_size_minimum: 0,
299 block_len_multiple: 8192,
300 block_size_target: Some(ONE_MEG),
301 canonicalize: false,
302 };
303 assert_eq!(options.effective_block_len(&dtype), 8192);
304 }
305
306 #[test]
307 fn effective_block_len_large_elements() {
308 let dtype = DType::FixedSizeList(
311 Arc::new(DType::Primitive(PType::F64, NonNullable)),
312 1000,
313 NonNullable,
314 );
315 let options = RepartitionWriterOptions {
316 block_size_minimum: 0,
317 block_len_multiple: 8192,
318 block_size_target: Some(ONE_MEG),
319 canonicalize: false,
320 };
321 assert_eq!(options.effective_block_len(&dtype), 132);
322 }
323
324 #[test]
325 fn effective_block_len_variable_width() {
326 let dtype = DType::Utf8(NonNullable);
328 let options = RepartitionWriterOptions {
329 block_size_minimum: 0,
330 block_len_multiple: 8192,
331 block_size_target: Some(ONE_MEG),
332 canonicalize: false,
333 };
334 assert_eq!(options.effective_block_len(&dtype), 8192);
335 }
336
337 #[test]
338 fn effective_block_len_very_large_elements() {
339 let dtype = DType::FixedSizeList(
342 Arc::new(DType::Primitive(PType::F64, NonNullable)),
343 1_000_000,
344 NonNullable,
345 );
346 let options = RepartitionWriterOptions {
347 block_size_minimum: 0,
348 block_len_multiple: 8192,
349 block_size_target: Some(ONE_MEG),
350 canonicalize: false,
351 };
352 assert_eq!(options.effective_block_len(&dtype), 1);
353 }
354
355 #[test]
356 fn repartition_large_element_type_produces_small_blocks() -> VortexResult<()> {
357 let list_size: u32 = 1000;
362 let num_lists: usize = 1000;
363 let total_elements = list_size as usize * num_lists;
364
365 let elements = PrimitiveArray::from_iter((0..total_elements).map(|i| i as f64));
366 let fsl = FixedSizeListArray::new(
367 elements.into_array(),
368 list_size,
369 Validity::NonNullable,
370 num_lists,
371 );
372
373 let ctx = ArrayContext::empty();
374 let segments = Arc::new(TestSegments::default());
375 let (ptr, eof) = SequenceId::root().split();
376
377 let child = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
378 let strategy = RepartitionStrategy::new(
379 child,
380 RepartitionWriterOptions {
381 block_size_minimum: 0,
382 block_len_multiple: 8192,
383 block_size_target: Some(ONE_MEG),
384 canonicalize: false,
385 },
386 );
387
388 let stream = fsl.into_array().to_array_stream().sequenced(ptr);
389 let layout = block_on(|handle| async move {
390 let session = SESSION.clone().with_handle(handle);
391 strategy
392 .write_stream(
393 ctx,
394 Arc::<TestSegments>::clone(&segments),
395 stream,
396 eof,
397 &session,
398 )
399 .await
400 })?;
401
402 assert_eq!(layout.row_count(), num_lists as u64);
408
409 let nchildren = layout.nchildren();
411 assert!(nchildren > 1, "expected multiple chunks, got {nchildren}");
412
413 for i in 0..nchildren - 1 {
414 let child = layout.child(i)?;
415 assert_eq!(
416 child.row_count(),
417 132,
418 "chunk {i} has {} rows, expected 131",
419 child.row_count()
420 );
421 }
422
423 let last = layout.child(nchildren - 1)?;
425 assert_eq!(last.row_count(), 1000 - 132 * (nchildren as u64 - 1));
426
427 Ok(())
428 }
429
430 #[test]
431 fn repartition_small_element_type_unchanged() -> VortexResult<()> {
432 let num_elements: usize = 10000;
436 let elements = PrimitiveArray::from_iter((0..num_elements).map(|i| i as f64));
437
438 let ctx = ArrayContext::empty();
439 let segments = Arc::new(TestSegments::default());
440 let (ptr, eof) = SequenceId::root().split();
441
442 let child = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
443 let strategy = RepartitionStrategy::new(
444 child,
445 RepartitionWriterOptions {
446 block_size_minimum: 0,
447 block_len_multiple: 8192,
448 block_size_target: Some(ONE_MEG),
449 canonicalize: false,
450 },
451 );
452
453 let stream = elements.into_array().to_array_stream().sequenced(ptr);
454 let layout = block_on(|handle| async move {
455 let session = SESSION.clone().with_handle(handle);
456 strategy
457 .write_stream(
458 ctx,
459 Arc::<TestSegments>::clone(&segments),
460 stream,
461 eof,
462 &session,
463 )
464 .await
465 })?;
466
467 assert_eq!(layout.row_count(), num_elements as u64);
468 assert_eq!(layout.nchildren(), 2);
469 assert_eq!(layout.child(0)?.row_count(), 8192);
470 assert_eq!(layout.child(1)?.row_count(), 1808);
471
472 Ok(())
473 }
474
475 #[test]
481 fn chunks_buffer_pop_front_no_panic_after_shared_execution() -> VortexResult<()> {
482 let n = 20_000usize;
483 let block_len = 10_000usize;
484
485 let constant = ConstantArray::new(42i64, n);
486 let shared = SharedArray::new(constant.into_array());
487 let shared_handle = shared.clone();
488 let arr = shared.into_array();
489
490 let s1 = arr.slice(0..block_len)?;
491 let s2 = arr.slice(block_len..n)?;
492
493 let mut buf = ChunksBuffer::new(0, block_len);
494 buf.push_back(s1);
495 buf.push_back(s2);
496
497 let _output = buf.pop_front().unwrap();
498
499 use vortex_array::arrays::shared::SharedArrayExt;
501 shared_handle.get_or_compute(|source| source.to_canonical())?;
502
503 let _s2 = buf.pop_front().unwrap();
505 assert_eq!(buf.nbytes, 0);
506 assert_eq!(buf.row_count, 0);
507
508 Ok(())
509 }
510}