vortex_layout/layouts/
repartition.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5use std::sync::Arc;
6
7use async_stream::try_stream;
8use async_trait::async_trait;
9use futures::StreamExt as _;
10use futures::pin_mut;
11use vortex_array::Array;
12use vortex_array::ArrayContext;
13use vortex_array::ArrayRef;
14use vortex_array::IntoArray;
15use vortex_array::arrays::ChunkedArray;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_io::runtime::Handle;
19
20use crate::LayoutRef;
21use crate::LayoutStrategy;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt;
27
28#[derive(Clone)]
29pub struct RepartitionWriterOptions {
30    /// The minimum uncompressed size in bytes for a block.
31    pub block_size_minimum: u64,
32    /// The multiple of the number of rows in each block.
33    pub block_len_multiple: usize,
34    pub canonicalize: bool,
35}
36
37/// Repartition a stream of arrays into blocks.
38///
39/// Each emitted block (except the last) is at least `block_size_minimum` bytes and contains a
40/// multiple of `block_len_multiple` rows.
41#[derive(Clone)]
42pub struct RepartitionStrategy {
43    child: Arc<dyn LayoutStrategy>,
44    options: RepartitionWriterOptions,
45}
46
47impl RepartitionStrategy {
48    pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
49        Self {
50            child: Arc::new(child),
51            options,
52        }
53    }
54}
55
56#[async_trait]
57impl LayoutStrategy for RepartitionStrategy {
58    async fn write_stream(
59        &self,
60        ctx: ArrayContext,
61        segment_sink: SegmentSinkRef,
62        stream: SendableSequentialStream,
63        eof: SequencePointer,
64        handle: Handle,
65    ) -> VortexResult<LayoutRef> {
66        // TODO(os): spawn stream below like:
67        // canon_stream = stream.map(async {to_canonical}).map(spawn).buffered(parallelism)
68        let dtype = stream.dtype().clone();
69        let stream = if self.options.canonicalize {
70            SequentialStreamAdapter::new(
71                dtype.clone(),
72                stream.map(|chunk| {
73                    let (sequence_id, chunk) = chunk?;
74                    VortexResult::Ok((sequence_id, chunk.to_canonical().into_array()))
75                }),
76            )
77            .sendable()
78        } else {
79            stream
80        };
81
82        let dtype_clone = dtype.clone();
83        let options = self.options.clone();
84        let repartitioned_stream = try_stream! {
85            let canonical_stream = stream.peekable();
86            pin_mut!(canonical_stream);
87
88            let mut chunks = ChunksBuffer::new(options.clone());
89            while let Some(chunk) = canonical_stream.as_mut().next().await {
90                let (sequence_id, chunk) = chunk?;
91                let mut sequence_pointer = sequence_id.descend();
92                let mut offset = 0;
93                while offset < chunk.len() {
94                    let end = (offset + options.block_len_multiple).min(chunk.len());
95                    let sliced = chunk.slice(offset..end);
96                    chunks.push_back(sliced);
97                    offset = end;
98
99                    if chunks.have_enough() {
100                        let output_chunks = chunks.collect_exact_blocks()?;
101                        assert!(!output_chunks.is_empty());
102                        let chunked =
103                            ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
104                        if !chunked.is_empty() {
105                            yield (
106                                sequence_pointer.advance(),
107                                chunked.to_canonical().into_array(),
108                            )
109                        }
110                    }
111                }
112                if canonical_stream.as_mut().peek().await.is_none() {
113                    let to_flush = ChunkedArray::try_new(
114                        chunks.data.drain(..).collect(),
115                        dtype_clone.clone(),
116                    )?;
117                    if !to_flush.is_empty() {
118                        yield (
119                            sequence_pointer.advance(),
120                            to_flush.to_canonical().into_array(),
121                        )
122                    }
123                }
124            }
125        };
126
127        self.child
128            .write_stream(
129                ctx,
130                segment_sink,
131                SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
132                eof,
133                handle,
134            )
135            .await
136    }
137
138    fn buffered_bytes(&self) -> u64 {
139        // TODO(os): we should probably add the buffered bytes from this strategy on top,
140        // it is currently better to not add it at all because these buffered arrays are
141        // potentially sliced and uncompressed. They would overestimate the actual bytes
142        // that will end up in the file when flushed.
143        self.child.buffered_bytes()
144    }
145}
146
147struct ChunksBuffer {
148    data: VecDeque<ArrayRef>,
149    row_count: usize,
150    nbytes: u64,
151    options: RepartitionWriterOptions,
152}
153
154impl ChunksBuffer {
155    fn new(options: RepartitionWriterOptions) -> Self {
156        Self {
157            data: Default::default(),
158            row_count: 0,
159            nbytes: 0,
160            options,
161        }
162    }
163
164    fn have_enough(&self) -> bool {
165        self.nbytes >= self.options.block_size_minimum
166            && self.row_count >= self.options.block_len_multiple
167    }
168
169    fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
170        let nblocks = self.row_count / self.options.block_len_multiple;
171        let mut res = Vec::with_capacity(self.data.len());
172        let mut remaining = nblocks * self.options.block_len_multiple;
173        while remaining > 0 {
174            let chunk = self
175                .pop_front()
176                .vortex_expect("must have at least one chunk");
177            let len = chunk.len();
178
179            if len > remaining {
180                let left = chunk.slice(0..remaining);
181                let right = chunk.slice(remaining..len);
182                self.push_front(right);
183                res.push(left);
184                remaining = 0;
185            } else {
186                res.push(chunk);
187                remaining -= len;
188            }
189        }
190        Ok(res)
191    }
192
193    fn push_back(&mut self, chunk: ArrayRef) {
194        self.row_count += chunk.len();
195        self.nbytes += chunk.nbytes();
196        self.data.push_back(chunk);
197    }
198
199    fn push_front(&mut self, chunk: ArrayRef) {
200        self.row_count += chunk.len();
201        self.nbytes += chunk.nbytes();
202        self.data.push_front(chunk);
203    }
204
205    fn pop_front(&mut self) -> Option<ArrayRef> {
206        let res = self.data.pop_front();
207        if let Some(chunk) = res.as_ref() {
208            self.row_count -= chunk.len();
209            self.nbytes -= chunk.nbytes();
210        }
211        res
212    }
213}