vortex_layout/layouts/
repartition.rs

1use std::collections::VecDeque;
2
3use vortex_array::arcref::ArcRef;
4use vortex_array::arrays::ChunkedArray;
5use vortex_array::compute::slice;
6use vortex_array::nbytes::NBytes;
7use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
8use vortex_dtype::DType;
9use vortex_error::{VortexExpect, VortexResult};
10
11use crate::segments::SegmentWriter;
12use crate::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
13
14pub struct RepartitionStrategy {
15    pub options: RepartitionWriterOptions,
16    pub child: ArcRef<dyn LayoutStrategy>,
17}
18
19impl LayoutStrategy for RepartitionStrategy {
20    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
21        Ok(RepartitionWriter::new(
22            dtype.clone(),
23            self.child.new_writer(ctx, dtype)?,
24            self.options.clone(),
25        )
26        .boxed())
27    }
28}
29
30#[derive(Clone)]
31pub struct RepartitionWriterOptions {
32    /// The minimum uncompressed size in bytes for a block.
33    pub block_size_minimum: usize,
34    /// The multiple of the number of rows in each block.
35    pub block_len_multiple: usize,
36}
37
38/// Repartition a stream of arrays into blocks.
39///
40/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
41/// multiple of `block_len_multiple` rows.
42pub struct RepartitionWriter {
43    dtype: DType,
44    chunks: VecDeque<ArrayRef>,
45    row_count: usize,
46    nbytes: usize,
47    writer: Box<dyn LayoutWriter>,
48    options: RepartitionWriterOptions,
49}
50
51impl RepartitionWriter {
52    pub fn new(
53        dtype: DType,
54        writer: Box<dyn LayoutWriter>,
55        options: RepartitionWriterOptions,
56    ) -> Self {
57        Self {
58            dtype,
59            chunks: VecDeque::new(),
60            row_count: 0,
61            nbytes: 0,
62            writer,
63            options,
64        }
65    }
66
67    fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
68        if self.nbytes >= self.options.block_size_minimum {
69            let nblocks = self.row_count / self.options.block_len_multiple;
70
71            // If we don't have a full block, then wait for more
72            if nblocks == 0 {
73                return Ok(());
74            }
75
76            let mut chunks = Vec::with_capacity(self.chunks.len());
77            let mut remaining = nblocks * self.options.block_len_multiple;
78
79            while remaining > 0 {
80                let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
81                self.row_count -= chunk.len();
82                self.nbytes -= chunk.nbytes();
83
84                let len = chunk.len();
85
86                if len > remaining {
87                    let left = slice(&chunk, 0, remaining)?;
88                    let right = slice(&chunk, remaining, len)?;
89                    self.row_count += right.len();
90                    self.nbytes += right.nbytes();
91                    self.chunks.push_front(right);
92
93                    chunks.push(left);
94                    remaining = 0;
95                } else {
96                    chunks.push(chunk);
97                    remaining -= len;
98                }
99            }
100
101            // Combine the chunks to and flush them to the layout.
102            assert!(!chunks.is_empty());
103            let chunk = ChunkedArray::new_unchecked(chunks, self.dtype.clone())
104                .to_canonical()?
105                .into_array();
106
107            self.writer.push_chunk(segments, chunk)?;
108        }
109
110        Ok(())
111    }
112}
113
114impl LayoutWriter for RepartitionWriter {
115    fn push_chunk(
116        &mut self,
117        segment_writer: &mut dyn SegmentWriter,
118        chunk: ArrayRef,
119    ) -> VortexResult<()> {
120        // We make sure the chunks are canonical so our nbytes measurement is accurate.
121        let chunk = chunk.to_canonical()?.into_array();
122
123        // Split chunks into 8192 blocks to make sure we don't over-size them.
124        let mut offset = 0;
125        while offset < chunk.len() {
126            let end = (offset + self.options.block_len_multiple).min(chunk.len());
127            let c = slice(&chunk, offset, end)?;
128            self.row_count += c.len();
129            self.nbytes += c.nbytes();
130            self.chunks.push_back(c);
131            offset = end;
132
133            self.maybe_flush_chunk(segment_writer)?;
134        }
135
136        Ok(())
137    }
138
139    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
140        let chunk =
141            ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
142                .to_canonical()?
143                .into_array();
144        self.writer.push_chunk(segment_writer, chunk)?;
145        self.writer.flush(segment_writer)
146    }
147
148    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
149        self.writer.finish(segment_writer)
150    }
151}