vortex_layout/layouts/
repartition.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5use std::sync::Arc;
6
7use async_stream::try_stream;
8use async_trait::async_trait;
9use futures::{StreamExt as _, pin_mut};
10use vortex_array::arrays::ChunkedArray;
11use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
12use vortex_error::{VortexExpect, VortexResult};
13use vortex_io::runtime::Handle;
14
15use crate::segments::SegmentSinkRef;
16use crate::sequence::{
17    SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt,
18};
19use crate::{LayoutRef, LayoutStrategy};
20
21#[derive(Clone)]
22pub struct RepartitionWriterOptions {
23    /// The minimum uncompressed size in bytes for a block.
24    pub block_size_minimum: u64,
25    /// The multiple of the number of rows in each block.
26    pub block_len_multiple: usize,
27    pub canonicalize: bool,
28}
29
30/// Repartition a stream of arrays into blocks.
31///
32/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
33/// multiple of `block_len_multiple` rows.
34#[derive(Clone)]
35pub struct RepartitionStrategy {
36    child: Arc<dyn LayoutStrategy>,
37    options: RepartitionWriterOptions,
38}
39
40impl RepartitionStrategy {
41    pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
42        Self {
43            child: Arc::new(child),
44            options,
45        }
46    }
47}
48
49#[async_trait]
50impl LayoutStrategy for RepartitionStrategy {
51    async fn write_stream(
52        &self,
53        ctx: ArrayContext,
54        segment_sink: SegmentSinkRef,
55        stream: SendableSequentialStream,
56        eof: SequencePointer,
57        handle: Handle,
58    ) -> VortexResult<LayoutRef> {
59        // TODO(os): spawn stream below like:
60        // canon_stream = stream.map(async {to_canonical}).map(spawn).buffered(parallelism)
61        let dtype = stream.dtype().clone();
62        let stream = if self.options.canonicalize {
63            SequentialStreamAdapter::new(
64                dtype.clone(),
65                stream.map(|chunk| {
66                    let (sequence_id, chunk) = chunk?;
67                    VortexResult::Ok((sequence_id, chunk.to_canonical().into_array()))
68                }),
69            )
70            .sendable()
71        } else {
72            stream
73        };
74
75        let dtype_clone = dtype.clone();
76        let options = self.options.clone();
77        let repartitioned_stream = try_stream! {
78            let canonical_stream = stream.peekable();
79            pin_mut!(canonical_stream);
80
81            let mut chunks = ChunksBuffer::new(options.clone());
82            while let Some(chunk) = canonical_stream.as_mut().next().await {
83                let (sequence_id, chunk) = chunk?;
84                let mut sequence_pointer = sequence_id.descend();
85                let mut offset = 0;
86                while offset < chunk.len() {
87                    let end = (offset + options.block_len_multiple).min(chunk.len());
88                    let sliced = chunk.slice(offset..end);
89                    chunks.push_back(sliced);
90                    offset = end;
91
92                    if chunks.have_enough() {
93                        let output_chunks = chunks.collect_exact_blocks()?;
94                        assert!(!output_chunks.is_empty());
95                        let chunked =
96                            ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
97                        if !chunked.is_empty() {
98                            yield (
99                                sequence_pointer.advance(),
100                                chunked.to_canonical().into_array(),
101                            )
102                        }
103                    }
104                }
105                if canonical_stream.as_mut().peek().await.is_none() {
106                    let to_flush = ChunkedArray::try_new(
107                        chunks.data.drain(..).collect(),
108                        dtype_clone.clone(),
109                    )?;
110                    if !to_flush.is_empty() {
111                        yield (
112                            sequence_pointer.advance(),
113                            to_flush.to_canonical().into_array(),
114                        )
115                    }
116                }
117            }
118        };
119
120        self.child
121            .write_stream(
122                ctx,
123                segment_sink,
124                SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
125                eof,
126                handle,
127            )
128            .await
129    }
130
131    fn buffered_bytes(&self) -> u64 {
132        // TODO(os): we should probably add the buffered bytes from this strategy on top,
133        // it is currently better to not add it at all because these buffered arrays are
134        // potentially sliced and uncompressed. They would overestimate the actual bytes
135        // that will end up in the file when flushed.
136        self.child.buffered_bytes()
137    }
138}
139
140struct ChunksBuffer {
141    data: VecDeque<ArrayRef>,
142    row_count: usize,
143    nbytes: u64,
144    options: RepartitionWriterOptions,
145}
146
147impl ChunksBuffer {
148    fn new(options: RepartitionWriterOptions) -> Self {
149        Self {
150            data: Default::default(),
151            row_count: 0,
152            nbytes: 0,
153            options,
154        }
155    }
156
157    fn have_enough(&self) -> bool {
158        self.nbytes >= self.options.block_size_minimum
159            && self.row_count >= self.options.block_len_multiple
160    }
161
162    fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
163        let nblocks = self.row_count / self.options.block_len_multiple;
164        let mut res = Vec::with_capacity(self.data.len());
165        let mut remaining = nblocks * self.options.block_len_multiple;
166        while remaining > 0 {
167            let chunk = self
168                .pop_front()
169                .vortex_expect("must have at least one chunk");
170            let len = chunk.len();
171
172            if len > remaining {
173                let left = chunk.slice(0..remaining);
174                let right = chunk.slice(remaining..len);
175                self.push_front(right);
176                res.push(left);
177                remaining = 0;
178            } else {
179                res.push(chunk);
180                remaining -= len;
181            }
182        }
183        Ok(res)
184    }
185
186    fn push_back(&mut self, chunk: ArrayRef) {
187        self.row_count += chunk.len();
188        self.nbytes += chunk.nbytes();
189        self.data.push_back(chunk);
190    }
191
192    fn push_front(&mut self, chunk: ArrayRef) {
193        self.row_count += chunk.len();
194        self.nbytes += chunk.nbytes();
195        self.data.push_front(chunk);
196    }
197
198    fn pop_front(&mut self) -> Option<ArrayRef> {
199        let res = self.data.pop_front();
200        if let Some(chunk) = res.as_ref() {
201            self.row_count -= chunk.len();
202            self.nbytes -= chunk.nbytes();
203        }
204        res
205    }
206}