vortex_layout/writers/
repartition.rs

1use std::collections::VecDeque;
2
3use vortex_array::arrays::ChunkedArray;
4use vortex_array::compute::slice;
5use vortex_array::nbytes::NBytes;
6use vortex_array::{Array, ArrayRef, IntoArray};
7use vortex_dtype::DType;
8use vortex_error::{VortexExpect, VortexResult};
9
10use crate::segments::SegmentWriter;
11use crate::{Layout, LayoutWriter};
12
13pub struct RepartitionWriterOptions {
14    /// The minimum uncompressed size in bytes for a block.
15    pub block_size_minimum: usize,
16    /// The multiple of the number of rows in each block.
17    pub block_len_multiple: usize,
18}
19
20/// Repartition a stream of arrays into blocks.
21///
22/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
23/// multiple of `block_len_multiple` rows.
24pub struct RepartitionWriter {
25    dtype: DType,
26    chunks: VecDeque<ArrayRef>,
27    row_count: usize,
28    nbytes: usize,
29    writer: Box<dyn LayoutWriter>,
30    options: RepartitionWriterOptions,
31}
32
33impl RepartitionWriter {
34    pub fn new(
35        dtype: DType,
36        writer: Box<dyn LayoutWriter>,
37        options: RepartitionWriterOptions,
38    ) -> Self {
39        Self {
40            dtype,
41            chunks: VecDeque::new(),
42            row_count: 0,
43            nbytes: 0,
44            writer,
45            options,
46        }
47    }
48
49    fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
50        if self.nbytes >= self.options.block_size_minimum {
51            let nblocks = self.row_count / self.options.block_len_multiple;
52
53            // If we don't have a full block, then wait for more
54            if nblocks == 0 {
55                return Ok(());
56            }
57
58            let mut chunks = Vec::with_capacity(self.chunks.len());
59            let mut remaining = nblocks * self.options.block_len_multiple;
60
61            while remaining > 0 {
62                let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
63                self.row_count -= chunk.len();
64                self.nbytes -= chunk.nbytes();
65
66                let len = chunk.len();
67
68                if len > remaining {
69                    let left = slice(&chunk, 0, remaining)?;
70                    let right = slice(&chunk, remaining, len)?;
71                    self.row_count += right.len();
72                    self.nbytes += right.nbytes();
73                    self.chunks.push_front(right);
74
75                    chunks.push(left);
76                    remaining = 0;
77                } else {
78                    chunks.push(chunk);
79                    remaining -= len;
80                }
81            }
82
83            // Combine the chunks to and flush them to the layout.
84            assert!(!chunks.is_empty());
85            let chunk = ChunkedArray::new_unchecked(chunks, self.dtype.clone())
86                .to_canonical()?
87                .into_array();
88
89            self.writer.push_chunk(segments, chunk)?;
90        }
91
92        Ok(())
93    }
94}
95
96impl LayoutWriter for RepartitionWriter {
97    fn push_chunk(
98        &mut self,
99        segment_writer: &mut dyn SegmentWriter,
100        chunk: ArrayRef,
101    ) -> VortexResult<()> {
102        // We make sure the chunks are canonical so our nbytes measurement is accurate.
103        let chunk = chunk.to_canonical()?.into_array();
104
105        // Split chunks into 8192 blocks to make sure we don't over-size them.
106        let mut offset = 0;
107        while offset < chunk.len() {
108            let end = (offset + self.options.block_len_multiple).min(chunk.len());
109            let c = slice(&chunk, offset, end)?;
110            self.row_count += c.len();
111            self.nbytes += c.nbytes();
112            self.chunks.push_back(c);
113            offset = end;
114
115            self.maybe_flush_chunk(segment_writer)?;
116        }
117
118        Ok(())
119    }
120
121    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
122        let chunk =
123            ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
124                .to_canonical()?
125                .into_array();
126        self.writer.push_chunk(segment_writer, chunk)?;
127        self.writer.flush(segment_writer)
128    }
129
130    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
131        self.writer.finish(segment_writer)
132    }
133}