vortex_layout/layouts/
repartition.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5
6use arcref::ArcRef;
7use async_stream::try_stream;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::arrays::ChunkedArray;
10use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
11use vortex_error::{VortexExpect, VortexResult};
12
13use crate::segments::SequenceWriter;
14use crate::{
15    LayoutStrategy, SendableLayoutFuture, SendableSequentialStream, SequentialStreamAdapter,
16    SequentialStreamExt,
17};
18
19#[derive(Clone)]
20pub struct RepartitionWriterOptions {
21    /// The minimum uncompressed size in bytes for a block.
22    pub block_size_minimum: u64,
23    /// The multiple of the number of rows in each block.
24    pub block_len_multiple: usize,
25}
26
27/// Repartition a stream of arrays into blocks.
28///
29/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
30/// multiple of `block_len_multiple` rows.
31pub struct RepartitionStrategy {
32    options: RepartitionWriterOptions,
33    child: ArcRef<dyn LayoutStrategy>,
34}
35
36impl RepartitionStrategy {
37    pub fn new(child: ArcRef<dyn LayoutStrategy>, options: RepartitionWriterOptions) -> Self {
38        Self { options, child }
39    }
40}
41
42impl LayoutStrategy for RepartitionStrategy {
43    fn write_stream(
44        &self,
45        ctx: &ArrayContext,
46        sequence_writer: SequenceWriter,
47        stream: SendableSequentialStream,
48    ) -> SendableLayoutFuture {
49        // TODO(os): spawn stream below like:
50        // canon_stream = stream.map(async {to_canonical}).map(spawn).buffered(parallelism)
51        let dtype = stream.dtype().clone();
52        let canonical_stream = SequentialStreamAdapter::new(
53            dtype.clone(),
54            stream.map(|chunk| {
55                let (sequence_id, chunk) = chunk?;
56                VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
57            }),
58        )
59        .sendable();
60
61        let dtype_clone = dtype.clone();
62        let options = self.options.clone();
63        let repartitioned_stream = try_stream! {
64            let canonical_stream = canonical_stream.peekable();
65            pin_mut!(canonical_stream);
66            let mut chunks = ChunksBuffer::new(options.clone());
67            while let Some(chunk) = canonical_stream.as_mut().next().await {
68                let (sequence_id, chunk) = chunk?;
69                let mut sequence_pointer = sequence_id.descend();
70                let mut offset = 0;
71                while offset < chunk.len() {
72                    let end = (offset + options.block_len_multiple).min(chunk.len());
73                    let sliced = chunk.slice(offset, end)?;
74                    chunks.push_back(sliced);
75                    offset = end;
76
77                    if chunks.have_enough() {
78                        let output_chunks = chunks.collect_exact_blocks()?;
79                        assert!(!output_chunks.is_empty());
80                        let chunked =
81                            ChunkedArray::new_unchecked(output_chunks, dtype_clone.clone());
82                        if !chunked.is_empty() {
83                            yield (
84                                sequence_pointer.advance(),
85                                chunked.to_canonical()?.into_array(),
86                            )
87                        }
88                    }
89                }
90                if canonical_stream.as_mut().peek().await.is_none() {
91                    let to_flush = ChunkedArray::new_unchecked(
92                        chunks.data.drain(..).collect(),
93                        dtype_clone.clone(),
94                    );
95                    if !to_flush.is_empty() {
96                        yield (
97                            sequence_pointer.advance(),
98                            to_flush.to_canonical()?.into_array(),
99                        )
100                    }
101                }
102            }
103        };
104
105        self.child.write_stream(
106            ctx,
107            sequence_writer,
108            SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
109        )
110    }
111}
112
113struct ChunksBuffer {
114    data: VecDeque<ArrayRef>,
115    row_count: usize,
116    nbytes: u64,
117    options: RepartitionWriterOptions,
118}
119
120impl ChunksBuffer {
121    fn new(options: RepartitionWriterOptions) -> Self {
122        Self {
123            data: Default::default(),
124            row_count: 0,
125            nbytes: 0,
126            options,
127        }
128    }
129
130    fn have_enough(&self) -> bool {
131        self.nbytes >= self.options.block_size_minimum
132            && self.row_count >= self.options.block_len_multiple
133    }
134
135    fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
136        let nblocks = self.row_count / self.options.block_len_multiple;
137        let mut res = Vec::with_capacity(self.data.len());
138        let mut remaining = nblocks * self.options.block_len_multiple;
139        while remaining > 0 {
140            let chunk = self
141                .pop_front()
142                .vortex_expect("must have at least one chunk");
143            let len = chunk.len();
144
145            if len > remaining {
146                let left = chunk.slice(0, remaining)?;
147                let right = chunk.slice(remaining, len)?;
148                self.push_front(right);
149                res.push(left);
150                remaining = 0;
151            } else {
152                res.push(chunk);
153                remaining -= len;
154            }
155        }
156        Ok(res)
157    }
158
159    fn push_back(&mut self, chunk: ArrayRef) {
160        self.row_count += chunk.len();
161        self.nbytes += chunk.nbytes();
162        self.data.push_back(chunk);
163    }
164
165    fn push_front(&mut self, chunk: ArrayRef) {
166        self.row_count += chunk.len();
167        self.nbytes += chunk.nbytes();
168        self.data.push_front(chunk);
169    }
170
171    fn pop_front(&mut self) -> Option<ArrayRef> {
172        let res = self.data.pop_front();
173        if let Some(chunk) = res.as_ref() {
174            self.row_count -= chunk.len();
175            self.nbytes -= chunk.nbytes();
176        }
177        res
178    }
179}