Skip to main content

vortex_layout/layouts/
repartition.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5use std::sync::Arc;
6
7use async_stream::try_stream;
8use async_trait::async_trait;
9use futures::StreamExt as _;
10use futures::pin_mut;
11use vortex_array::ArrayContext;
12use vortex_array::ArrayRef;
13use vortex_array::IntoArray;
14use vortex_array::arrays::ChunkedArray;
15use vortex_array::dtype::DType;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_session::VortexSession;
19
20use crate::LayoutRef;
21use crate::LayoutStrategy;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt;
27
28#[derive(Clone)]
29pub struct RepartitionWriterOptions {
30    /// The minimum uncompressed size in bytes for a block.
31    pub block_size_minimum: u64,
32    /// The multiple of the number of rows in each block.
33    pub block_len_multiple: usize,
34    /// Optional target uncompressed size in bytes for a block.
35    ///
36    /// The repartition writer attempts to produce partitions with this uncompressed size. This is
37    /// only a best effort attempt: the partitions may be arbitrarily larger or smaller. Reasons for
38    /// this include:
39    ///
40    /// 1. The size of one element may not perfectly divide the target size, resulting in blocks
41    ///    that are either too large or too small.
42    ///
43    /// 2. Variable length types are expensive to pack due to the need to read each element length.
44    ///
45    /// 3. View types are expensive to pack due to each view sharing an arbitrary slice of data.
46    pub block_size_target: Option<u64>,
47    pub canonicalize: bool,
48}
49
50impl RepartitionWriterOptions {
51    /// Compute the effective block length for a given [`DType`].
52    ///
53    /// For fixed-width types where [`DType::element_size`] is known and large enough that
54    /// `element_size * block_len_multiple` would exceed `block_size_target`, this reduces the
55    /// block length so each block stays close to the target byte size.
56    fn effective_block_len(&self, dtype: &DType) -> usize {
57        let Some(block_size_target) = self.block_size_target else {
58            return self.block_len_multiple;
59        };
60        match dtype.element_size() {
61            Some(elem_size) if elem_size > 0 => {
62                // `div_ceil` ensures we overshoot the block_size_target; therefore preventing
63                // `write_stream` from combining adjacent 0.9 MiB chunks into one 1.8 MiB chunk.
64                let max_rows = usize::try_from(block_size_target.div_ceil(elem_size as u64))
65                    .unwrap_or(usize::MAX);
66                self.block_len_multiple.min(max_rows).max(1)
67            }
68            _ => self.block_len_multiple,
69        }
70    }
71}
72
73/// Repartition a stream of arrays into blocks.
74///
75/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
76/// multiple of `block_len_multiple` rows.
77#[derive(Clone)]
78pub struct RepartitionStrategy {
79    child: Arc<dyn LayoutStrategy>,
80    options: RepartitionWriterOptions,
81}
82
83impl RepartitionStrategy {
84    pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
85        Self {
86            child: Arc::new(child),
87            options,
88        }
89    }
90}
91
92#[async_trait]
93impl LayoutStrategy for RepartitionStrategy {
94    async fn write_stream(
95        &self,
96        ctx: ArrayContext,
97        segment_sink: SegmentSinkRef,
98        stream: SendableSequentialStream,
99        eof: SequencePointer,
100        session: &VortexSession,
101    ) -> VortexResult<LayoutRef> {
102        // TODO(os): spawn stream below like:
103        // canon_stream = stream.map(async {to_canonical}).map(spawn).buffered(parallelism)
104        let dtype = stream.dtype().clone();
105        let stream = if self.options.canonicalize {
106            SequentialStreamAdapter::new(
107                dtype.clone(),
108                stream.map(|chunk| {
109                    let (sequence_id, chunk) = chunk?;
110                    VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
111                }),
112            )
113            .sendable()
114        } else {
115            stream
116        };
117
118        let dtype_clone = dtype.clone();
119        let options = self.options.clone();
120
121        // For fixed-width types with large per-element sizes, reduce the block_len_multiple
122        // so that each block targets block_size_target bytes rather than producing oversized
123        // segments.
124        let block_len = options.effective_block_len(&dtype);
125        let block_size_minimum = options.block_size_minimum;
126
127        let repartitioned_stream = try_stream! {
128            let canonical_stream = stream.peekable();
129            pin_mut!(canonical_stream);
130
131            let mut chunks = ChunksBuffer::new(block_size_minimum, block_len);
132            while let Some(chunk) = canonical_stream.as_mut().next().await {
133                let (sequence_id, chunk) = chunk?;
134                let mut sequence_pointer = sequence_id.descend();
135                let mut offset = 0;
136                while offset < chunk.len() {
137                    let end = (offset + block_len).min(chunk.len());
138                    let sliced = chunk.slice(offset..end)?;
139                    chunks.push_back(sliced);
140                    offset = end;
141
142                    if chunks.have_enough() {
143                        let output_chunks = chunks.collect_exact_blocks()?;
144                        assert!(!output_chunks.is_empty());
145                        let chunked =
146                            ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
147                        if !chunked.is_empty() {
148                            yield (
149                                sequence_pointer.advance(),
150                                chunked.into_array().to_canonical()?.into_array(),
151                            )
152                        }
153                    }
154                }
155                if canonical_stream.as_mut().peek().await.is_none() {
156                    let to_flush = ChunkedArray::try_new(
157                        chunks.data.drain(..).map(|(arr, _)| arr).collect(),
158                        dtype_clone.clone(),
159                    )?;
160                    if !to_flush.is_empty() {
161                        yield (
162                            sequence_pointer.advance(),
163                            to_flush.into_array().to_canonical()?.into_array(),
164                        )
165                    }
166                }
167            }
168        };
169
170        self.child
171            .write_stream(
172                ctx,
173                segment_sink,
174                SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
175                eof,
176                session,
177            )
178            .await
179    }
180
181    fn buffered_bytes(&self) -> u64 {
182        // TODO(os): we should probably add the buffered bytes from this strategy on top,
183        // it is currently better to not add it at all because these buffered arrays are
184        // potentially sliced and uncompressed. They would overestimate the actual bytes
185        // that will end up in the file when flushed.
186        self.child.buffered_bytes()
187    }
188}
189
190struct ChunksBuffer {
191    /// Each entry stores the chunk and the `nbytes()` snapshot taken at push time.
192    /// This avoids accounting mismatches when interior-mutable arrays (e.g. `SharedArray`)
193    /// change their reported size after being pushed.
194    data: VecDeque<(ArrayRef, u64)>,
195    row_count: usize,
196    nbytes: u64,
197    block_size_minimum: u64,
198    block_len_multiple: usize,
199}
200
201impl ChunksBuffer {
202    fn new(block_size_minimum: u64, block_len_multiple: usize) -> Self {
203        Self {
204            data: Default::default(),
205            row_count: 0,
206            nbytes: 0,
207            block_size_minimum,
208            block_len_multiple,
209        }
210    }
211
212    fn have_enough(&self) -> bool {
213        self.nbytes >= self.block_size_minimum && self.row_count >= self.block_len_multiple
214    }
215
216    fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
217        let nblocks = self.row_count / self.block_len_multiple;
218        let mut res = Vec::with_capacity(self.data.len());
219        let mut remaining = nblocks * self.block_len_multiple;
220        while remaining > 0 {
221            let (chunk, _) = self
222                .pop_front()
223                .vortex_expect("must have at least one chunk");
224            let len = chunk.len();
225
226            if len > remaining {
227                let left = chunk.slice(0..remaining)?;
228                let right = chunk.slice(remaining..len)?;
229                self.push_front(right);
230                res.push(left);
231                remaining = 0;
232            } else {
233                res.push(chunk);
234                remaining -= len;
235            }
236        }
237        Ok(res)
238    }
239
240    fn push_back(&mut self, chunk: ArrayRef) {
241        let nb = chunk.nbytes();
242        self.row_count += chunk.len();
243        self.nbytes += nb;
244        self.data.push_back((chunk, nb));
245    }
246
247    fn push_front(&mut self, chunk: ArrayRef) {
248        let nb = chunk.nbytes();
249        self.row_count += chunk.len();
250        self.nbytes += nb;
251        self.data.push_front((chunk, nb));
252    }
253
254    fn pop_front(&mut self) -> Option<(ArrayRef, u64)> {
255        let res = self.data.pop_front();
256        if let Some((chunk, nb)) = res.as_ref() {
257            self.row_count -= chunk.len();
258            self.nbytes -= nb;
259        }
260        res
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use std::sync::Arc;
267
268    use vortex_array::ArrayContext;
269    use vortex_array::IntoArray;
270    use vortex_array::arrays::ConstantArray;
271    use vortex_array::arrays::FixedSizeListArray;
272    use vortex_array::arrays::PrimitiveArray;
273    use vortex_array::arrays::SharedArray;
274    use vortex_array::dtype::DType;
275    use vortex_array::dtype::Nullability::NonNullable;
276    use vortex_array::dtype::PType;
277    use vortex_array::validity::Validity;
278    use vortex_error::VortexResult;
279    use vortex_io::runtime::single::block_on;
280    use vortex_io::session::RuntimeSessionExt;
281
282    use super::*;
283    use crate::LayoutStrategy;
284    use crate::layouts::chunked::writer::ChunkedLayoutStrategy;
285    use crate::layouts::flat::writer::FlatLayoutStrategy;
286    use crate::segments::TestSegments;
287    use crate::sequence::SequenceId;
288    use crate::sequence::SequentialArrayStreamExt;
289    use crate::test::SESSION;
290
291    const ONE_MEG: u64 = 1 << 20;
292
293    #[test]
294    fn effective_block_len_small_elements() {
295        // f64 = 8 bytes/element. 8192 * 8 = 64 KiB << 1 MiB, so no reduction.
296        let dtype = DType::Primitive(PType::F64, NonNullable);
297        let options = RepartitionWriterOptions {
298            block_size_minimum: 0,
299            block_len_multiple: 8192,
300            block_size_target: Some(ONE_MEG),
301            canonicalize: false,
302        };
303        assert_eq!(options.effective_block_len(&dtype), 8192);
304    }
305
306    #[test]
307    fn effective_block_len_large_elements() {
308        // FixedSizeList(f64, 1000) = 8000 bytes/element.
309        // div_ceil(1 MiB, 8000) = 132, so effective block len = min(8192, 132) = 132.
310        let dtype = DType::FixedSizeList(
311            Arc::new(DType::Primitive(PType::F64, NonNullable)),
312            1000,
313            NonNullable,
314        );
315        let options = RepartitionWriterOptions {
316            block_size_minimum: 0,
317            block_len_multiple: 8192,
318            block_size_target: Some(ONE_MEG),
319            canonicalize: false,
320        };
321        assert_eq!(options.effective_block_len(&dtype), 132);
322    }
323
324    #[test]
325    fn effective_block_len_variable_width() {
326        // Utf8 has no known element_size, so block_len_multiple is unchanged.
327        let dtype = DType::Utf8(NonNullable);
328        let options = RepartitionWriterOptions {
329            block_size_minimum: 0,
330            block_len_multiple: 8192,
331            block_size_target: Some(ONE_MEG),
332            canonicalize: false,
333        };
334        assert_eq!(options.effective_block_len(&dtype), 8192);
335    }
336
337    #[test]
338    fn effective_block_len_very_large_elements() {
339        // FixedSizeList(f64, 1_000_000) = 8_000_000 bytes/element.
340        // 1 MiB / 8_000_000 = 0, clamped to max(1) = 1.
341        let dtype = DType::FixedSizeList(
342            Arc::new(DType::Primitive(PType::F64, NonNullable)),
343            1_000_000,
344            NonNullable,
345        );
346        let options = RepartitionWriterOptions {
347            block_size_minimum: 0,
348            block_len_multiple: 8192,
349            block_size_target: Some(ONE_MEG),
350            canonicalize: false,
351        };
352        assert_eq!(options.effective_block_len(&dtype), 1);
353    }
354
355    #[test]
356    fn repartition_large_element_type_produces_small_blocks() -> VortexResult<()> {
357        // Create a FixedSizeList(f64, 1000) array with 1000 lists.
358        // Each list is 8000 bytes, so 1000 lists = 8 MiB total.
359        // With block_size_target = 1 MiB, effective block_len = 133.
360        // We expect the repartition to produce blocks of 132 rows each.
361        let list_size: u32 = 1000;
362        let num_lists: usize = 1000;
363        let total_elements = list_size as usize * num_lists;
364
365        let elements = PrimitiveArray::from_iter((0..total_elements).map(|i| i as f64));
366        let fsl = FixedSizeListArray::new(
367            elements.into_array(),
368            list_size,
369            Validity::NonNullable,
370            num_lists,
371        );
372
373        let ctx = ArrayContext::empty();
374        let segments = Arc::new(TestSegments::default());
375        let (ptr, eof) = SequenceId::root().split();
376
377        let child = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
378        let strategy = RepartitionStrategy::new(
379            child,
380            RepartitionWriterOptions {
381                block_size_minimum: 0,
382                block_len_multiple: 8192,
383                block_size_target: Some(ONE_MEG),
384                canonicalize: false,
385            },
386        );
387
388        let stream = fsl.into_array().to_array_stream().sequenced(ptr);
389        let layout = block_on(|handle| async move {
390            let session = SESSION.clone().with_handle(handle);
391            strategy
392                .write_stream(
393                    ctx,
394                    Arc::<TestSegments>::clone(&segments),
395                    stream,
396                    eof,
397                    &session,
398                )
399                .await
400        })?;
401
402        // The layout should be a ChunkedLayout with multiple children.
403        // With 1000 rows and effective block_len = 132:
404        //   - 7 full blocks of 132 rows = 924 rows
405        //   - 1 remainder block of 76 rows
406        //   - Total: 8 blocks, 1000 rows
407        assert_eq!(layout.row_count(), num_lists as u64);
408
409        // All non-last children should have 131 rows.
410        let nchildren = layout.nchildren();
411        assert!(nchildren > 1, "expected multiple chunks, got {nchildren}");
412
413        for i in 0..nchildren - 1 {
414            let child = layout.child(i)?;
415            assert_eq!(
416                child.row_count(),
417                132,
418                "chunk {i} has {} rows, expected 131",
419                child.row_count()
420            );
421        }
422
423        // Last child gets the remainder.
424        let last = layout.child(nchildren - 1)?;
425        assert_eq!(last.row_count(), 1000 - 132 * (nchildren as u64 - 1));
426
427        Ok(())
428    }
429
430    #[test]
431    fn repartition_small_element_type_unchanged() -> VortexResult<()> {
432        // For f64 (8 bytes/element), effective block_len stays at 8192.
433        // With 10000 elements and block_size_minimum=0, we get one block of 8192
434        // and one remainder of 1808.
435        let num_elements: usize = 10000;
436        let elements = PrimitiveArray::from_iter((0..num_elements).map(|i| i as f64));
437
438        let ctx = ArrayContext::empty();
439        let segments = Arc::new(TestSegments::default());
440        let (ptr, eof) = SequenceId::root().split();
441
442        let child = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
443        let strategy = RepartitionStrategy::new(
444            child,
445            RepartitionWriterOptions {
446                block_size_minimum: 0,
447                block_len_multiple: 8192,
448                block_size_target: Some(ONE_MEG),
449                canonicalize: false,
450            },
451        );
452
453        let stream = elements.into_array().to_array_stream().sequenced(ptr);
454        let layout = block_on(|handle| async move {
455            let session = SESSION.clone().with_handle(handle);
456            strategy
457                .write_stream(
458                    ctx,
459                    Arc::<TestSegments>::clone(&segments),
460                    stream,
461                    eof,
462                    &session,
463                )
464                .await
465        })?;
466
467        assert_eq!(layout.row_count(), num_elements as u64);
468        assert_eq!(layout.nchildren(), 2);
469        assert_eq!(layout.child(0)?.row_count(), 8192);
470        assert_eq!(layout.child(1)?.row_count(), 1808);
471
472        Ok(())
473    }
474
475    /// Regression test: `SharedArray` slices sharing an `Arc<Mutex<SharedState>>` can
476    /// transition from Source to Cached when any one of them is canonicalized. This caused
477    /// `pop_front` to panic with `attempt to subtract with overflow` because the buffer's
478    /// running `nbytes` total was accumulated with the smaller Source-era values while
479    /// `pop_front` subtracted the larger Cached-era values.
480    #[test]
481    fn chunks_buffer_pop_front_no_panic_after_shared_execution() -> VortexResult<()> {
482        let n = 20_000usize;
483        let block_len = 10_000usize;
484
485        let constant = ConstantArray::new(42i64, n);
486        let shared = SharedArray::new(constant.into_array());
487        let shared_handle = shared.clone();
488        let arr = shared.into_array();
489
490        let s1 = arr.slice(0..block_len)?;
491        let s2 = arr.slice(block_len..n)?;
492
493        let mut buf = ChunksBuffer::new(0, block_len);
494        buf.push_back(s1);
495        buf.push_back(s2);
496
497        let _output = buf.pop_front().unwrap();
498
499        // Transition SharedState from Source to Cached for ALL slices sharing this Arc.
500        use vortex_array::arrays::shared::SharedArrayExt;
501        shared_handle.get_or_compute(|source| source.to_canonical())?;
502
503        // Before the fix this panicked with "attempt to subtract with overflow".
504        let _s2 = buf.pop_front().unwrap();
505        assert_eq!(buf.nbytes, 0);
506        assert_eq!(buf.row_count, 0);
507
508        Ok(())
509    }
510}