vortex_layout/layouts/
buffered.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5
6use arcref::ArcRef;
7use async_stream::try_stream;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::ArrayContext;
10
11use crate::segments::SequenceWriter;
12use crate::{
13    LayoutStrategy, SendableLayoutFuture, SendableSequentialStream, SequentialStreamAdapter,
14    SequentialStreamExt as _,
15};
16
17pub struct BufferedStrategy {
18    child: ArcRef<dyn LayoutStrategy>,
19    buffer_size: u64,
20}
21
22impl BufferedStrategy {
23    pub fn new(child: ArcRef<dyn LayoutStrategy>, buffer_size: u64) -> Self {
24        Self { child, buffer_size }
25    }
26}
27
28impl LayoutStrategy for BufferedStrategy {
29    fn write_stream(
30        &self,
31        ctx: &ArrayContext,
32        sequence_writer: SequenceWriter,
33        stream: SendableSequentialStream,
34    ) -> SendableLayoutFuture {
35        let dtype = stream.dtype().clone();
36        let buffer_size = self.buffer_size;
37        let buffered_stream = try_stream! {
38            let stream = stream.peekable();
39            pin_mut!(stream);
40
41            let mut nbytes = 0u64;
42            let mut chunks = VecDeque::new();
43
44            while let Some(chunk) = stream.as_mut().next().await {
45                let (sequence_id, chunk) = chunk?;
46                nbytes += chunk.nbytes();
47                chunks.push_back(chunk);
48
49                // if this is the last element, flush everything
50                if stream.as_mut().peek().await.is_none() {
51                    let mut sequence_pointer = sequence_id.descend();
52                    while let Some(chunk) = chunks.pop_front() {
53                        yield (sequence_pointer.advance(), chunk)
54                    }
55                    break;
56                }
57
58                if nbytes < 2 * buffer_size {
59                    continue;
60                };
61                // Wait until we're at 2x the buffer size before flushing 1x the buffer size
62                // This avoids small tail stragglers being flushed at the end of the file.
63                let mut sequence_pointer = sequence_id.descend();
64                while nbytes > buffer_size {
65                    let Some(chunk) = chunks.pop_front() else {
66                        break;
67                    };
68                    nbytes -= chunk.nbytes();
69                    yield (sequence_pointer.advance(), chunk)
70                }
71            }
72        };
73        self.child.write_stream(
74            ctx,
75            sequence_writer,
76            SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
77        )
78    }
79}