vortex_layout/layouts/
buffered.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::ArrayContext;
10use vortex_error::VortexResult;
11
12use crate::segments::SequenceWriter;
13use crate::{
14    LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
15    SequentialStreamExt as _,
16};
17
18#[derive(Clone)]
19pub struct BufferedStrategy<S> {
20    child: S,
21    buffer_size: u64,
22}
23
24impl<S: LayoutStrategy> BufferedStrategy<S> {
25    pub fn new(child: S, buffer_size: u64) -> Self {
26        Self { child, buffer_size }
27    }
28}
29
30#[async_trait]
31impl<S> LayoutStrategy for BufferedStrategy<S>
32where
33    S: LayoutStrategy,
34{
35    async fn write_stream(
36        &self,
37        ctx: &ArrayContext,
38        sequence_writer: SequenceWriter,
39        stream: SendableSequentialStream,
40    ) -> VortexResult<LayoutRef> {
41        let dtype = stream.dtype().clone();
42        let buffer_size = self.buffer_size;
43        let buffered_stream = try_stream! {
44            let stream = stream.peekable();
45            pin_mut!(stream);
46
47            let mut nbytes = 0u64;
48            let mut chunks = VecDeque::new();
49
50            while let Some(chunk) = stream.as_mut().next().await {
51                let (sequence_id, chunk) = chunk?;
52                nbytes += chunk.nbytes();
53                chunks.push_back(chunk);
54
55                // if this is the last element, flush everything
56                if stream.as_mut().peek().await.is_none() {
57                    let mut sequence_pointer = sequence_id.descend();
58                    while let Some(chunk) = chunks.pop_front() {
59                        yield (sequence_pointer.advance(), chunk)
60                    }
61                    break;
62                }
63
64                if nbytes < 2 * buffer_size {
65                    continue;
66                };
67                // Wait until we're at 2x the buffer size before flushing 1x the buffer size
68                // This avoids small tail stragglers being flushed at the end of the file.
69                let mut sequence_pointer = sequence_id.descend();
70                while nbytes > buffer_size {
71                    let Some(chunk) = chunks.pop_front() else {
72                        break;
73                    };
74                    nbytes -= chunk.nbytes();
75                    yield (sequence_pointer.advance(), chunk)
76                }
77            }
78        };
79        self.child
80            .write_stream(
81                ctx,
82                sequence_writer,
83                SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
84            )
85            .await
86    }
87}