vortex_layout/layouts/
repartition.rs

1use std::collections::VecDeque;
2
3use arcref::ArcRef;
4use vortex_array::arrays::ChunkedArray;
5use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::segments::SegmentWriter;
10use crate::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
11
12pub struct RepartitionStrategy {
13    pub options: RepartitionWriterOptions,
14    pub child: ArcRef<dyn LayoutStrategy>,
15}
16
17impl LayoutStrategy for RepartitionStrategy {
18    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
19        Ok(RepartitionWriter::new(
20            dtype.clone(),
21            self.child.new_writer(ctx, dtype)?,
22            self.options.clone(),
23        )
24        .boxed())
25    }
26}
27
28#[derive(Clone)]
29pub struct RepartitionWriterOptions {
30    /// The minimum uncompressed size in bytes for a block.
31    pub block_size_minimum: usize,
32    /// The multiple of the number of rows in each block.
33    pub block_len_multiple: usize,
34}
35
36/// Repartition a stream of arrays into blocks.
37///
38/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
39/// multiple of `block_len_multiple` rows.
40pub struct RepartitionWriter {
41    dtype: DType,
42    chunks: VecDeque<ArrayRef>,
43    row_count: usize,
44    nbytes: usize,
45    writer: Box<dyn LayoutWriter>,
46    options: RepartitionWriterOptions,
47}
48
49impl RepartitionWriter {
50    pub fn new(
51        dtype: DType,
52        writer: Box<dyn LayoutWriter>,
53        options: RepartitionWriterOptions,
54    ) -> Self {
55        Self {
56            dtype,
57            chunks: VecDeque::new(),
58            row_count: 0,
59            nbytes: 0,
60            writer,
61            options,
62        }
63    }
64
65    fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
66        if self.nbytes >= self.options.block_size_minimum {
67            let nblocks = self.row_count / self.options.block_len_multiple;
68
69            // If we don't have a full block, then wait for more
70            if nblocks == 0 {
71                return Ok(());
72            }
73
74            let mut chunks = Vec::with_capacity(self.chunks.len());
75            let mut remaining = nblocks * self.options.block_len_multiple;
76
77            while remaining > 0 {
78                let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
79                self.row_count -= chunk.len();
80                self.nbytes -= chunk.nbytes();
81
82                let len = chunk.len();
83
84                if len > remaining {
85                    let left = chunk.slice(0, remaining)?;
86                    let right = chunk.slice(remaining, len)?;
87                    self.row_count += right.len();
88                    self.nbytes += right.nbytes();
89                    self.chunks.push_front(right);
90
91                    chunks.push(left);
92                    remaining = 0;
93                } else {
94                    chunks.push(chunk);
95                    remaining -= len;
96                }
97            }
98
99            // Combine the chunks to and flush them to the layout.
100            assert!(!chunks.is_empty());
101            let chunk = ChunkedArray::new_unchecked(chunks, self.dtype.clone())
102                .to_canonical()?
103                .into_array();
104
105            self.writer.push_chunk(segments, chunk)?;
106        }
107
108        Ok(())
109    }
110}
111
112impl LayoutWriter for RepartitionWriter {
113    fn push_chunk(
114        &mut self,
115        segment_writer: &mut dyn SegmentWriter,
116        chunk: ArrayRef,
117    ) -> VortexResult<()> {
118        assert_eq!(
119            chunk.dtype(),
120            &self.dtype,
121            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
122            chunk.dtype(),
123            self.dtype
124        );
125        // We make sure the chunks are canonical so our nbytes measurement is accurate.
126        let chunk = chunk.to_canonical()?.into_array();
127
128        // Split chunks into 8192 blocks to make sure we don't over-size them.
129        let mut offset = 0;
130        while offset < chunk.len() {
131            let end = (offset + self.options.block_len_multiple).min(chunk.len());
132            let c = chunk.slice(offset, end)?;
133            self.row_count += c.len();
134            self.nbytes += c.nbytes();
135            self.chunks.push_back(c);
136            offset = end;
137
138            self.maybe_flush_chunk(segment_writer)?;
139        }
140
141        Ok(())
142    }
143
144    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
145        let chunk =
146            ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
147                .to_canonical()?
148                .into_array();
149        self.writer.push_chunk(segment_writer, chunk)?;
150        self.writer.flush(segment_writer)
151    }
152
153    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
154        self.writer.finish(segment_writer)
155    }
156}