vortex_layout/layouts/
repartition.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::arrays::ChunkedArray;
10use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
11use vortex_error::{VortexExpect, VortexResult};
12
13use crate::segments::SequenceWriter;
14use crate::{
15    LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
16    SequentialStreamExt,
17};
18
19#[derive(Clone)]
20pub struct RepartitionWriterOptions {
21    /// The minimum uncompressed size in bytes for a block.
22    pub block_size_minimum: u64,
23    /// The multiple of the number of rows in each block.
24    pub block_len_multiple: usize,
25}
26
27/// Repartition a stream of arrays into blocks.
28///
29/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
30/// multiple of `block_len_multiple` rows.
31#[derive(Clone)]
32pub struct RepartitionStrategy<S> {
33    child: S,
34    options: RepartitionWriterOptions,
35}
36
37impl<S> RepartitionStrategy<S>
38where
39    S: LayoutStrategy,
40{
41    pub fn new(child: S, options: RepartitionWriterOptions) -> Self {
42        Self { child, options }
43    }
44}
45
46#[async_trait]
47impl<S> LayoutStrategy for RepartitionStrategy<S>
48where
49    S: LayoutStrategy,
50{
51    async fn write_stream(
52        &self,
53        ctx: &ArrayContext,
54        sequence_writer: SequenceWriter,
55        stream: SendableSequentialStream,
56    ) -> VortexResult<LayoutRef> {
57        // TODO(os): spawn stream below like:
58        // canon_stream = stream.map(async {to_canonical}).map(spawn).buffered(parallelism)
59        let dtype = stream.dtype().clone();
60        let canonical_stream = SequentialStreamAdapter::new(
61            dtype.clone(),
62            stream.map(|chunk| {
63                let (sequence_id, chunk) = chunk?;
64                VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
65            }),
66        )
67        .sendable();
68
69        let dtype_clone = dtype.clone();
70        let options = self.options.clone();
71        let repartitioned_stream = try_stream! {
72            let canonical_stream = canonical_stream.peekable();
73            pin_mut!(canonical_stream);
74            let mut chunks = ChunksBuffer::new(options.clone());
75            while let Some(chunk) = canonical_stream.as_mut().next().await {
76                let (sequence_id, chunk) = chunk?;
77                let mut sequence_pointer = sequence_id.descend();
78                let mut offset = 0;
79                while offset < chunk.len() {
80                    let end = (offset + options.block_len_multiple).min(chunk.len());
81                    let sliced = chunk.slice(offset, end)?;
82                    chunks.push_back(sliced);
83                    offset = end;
84
85                    if chunks.have_enough() {
86                        let output_chunks = chunks.collect_exact_blocks()?;
87                        assert!(!output_chunks.is_empty());
88                        let chunked =
89                            ChunkedArray::new_unchecked(output_chunks, dtype_clone.clone());
90                        if !chunked.is_empty() {
91                            yield (
92                                sequence_pointer.advance(),
93                                chunked.to_canonical()?.into_array(),
94                            )
95                        }
96                    }
97                }
98                if canonical_stream.as_mut().peek().await.is_none() {
99                    let to_flush = ChunkedArray::new_unchecked(
100                        chunks.data.drain(..).collect(),
101                        dtype_clone.clone(),
102                    );
103                    if !to_flush.is_empty() {
104                        yield (
105                            sequence_pointer.advance(),
106                            to_flush.to_canonical()?.into_array(),
107                        )
108                    }
109                }
110            }
111        };
112
113        self.child
114            .write_stream(
115                ctx,
116                sequence_writer,
117                SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
118            )
119            .await
120    }
121}
122
123struct ChunksBuffer {
124    data: VecDeque<ArrayRef>,
125    row_count: usize,
126    nbytes: u64,
127    options: RepartitionWriterOptions,
128}
129
130impl ChunksBuffer {
131    fn new(options: RepartitionWriterOptions) -> Self {
132        Self {
133            data: Default::default(),
134            row_count: 0,
135            nbytes: 0,
136            options,
137        }
138    }
139
140    fn have_enough(&self) -> bool {
141        self.nbytes >= self.options.block_size_minimum
142            && self.row_count >= self.options.block_len_multiple
143    }
144
145    fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
146        let nblocks = self.row_count / self.options.block_len_multiple;
147        let mut res = Vec::with_capacity(self.data.len());
148        let mut remaining = nblocks * self.options.block_len_multiple;
149        while remaining > 0 {
150            let chunk = self
151                .pop_front()
152                .vortex_expect("must have at least one chunk");
153            let len = chunk.len();
154
155            if len > remaining {
156                let left = chunk.slice(0, remaining)?;
157                let right = chunk.slice(remaining, len)?;
158                self.push_front(right);
159                res.push(left);
160                remaining = 0;
161            } else {
162                res.push(chunk);
163                remaining -= len;
164            }
165        }
166        Ok(res)
167    }
168
169    fn push_back(&mut self, chunk: ArrayRef) {
170        self.row_count += chunk.len();
171        self.nbytes += chunk.nbytes();
172        self.data.push_back(chunk);
173    }
174
175    fn push_front(&mut self, chunk: ArrayRef) {
176        self.row_count += chunk.len();
177        self.nbytes += chunk.nbytes();
178        self.data.push_front(chunk);
179    }
180
181    fn pop_front(&mut self) -> Option<ArrayRef> {
182        let res = self.data.pop_front();
183        if let Some(chunk) = res.as_ref() {
184            self.row_count -= chunk.len();
185            self.nbytes -= chunk.nbytes();
186        }
187        res
188    }
189}