Skip to main content

vortex_layout/layouts/
buffered.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures::StreamExt as _;
12use futures::pin_mut;
13use vortex_array::ArrayContext;
14use vortex_error::VortexResult;
15use vortex_session::VortexSession;
16
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::segments::SegmentSinkRef;
20use crate::sequence::SendableSequentialStream;
21use crate::sequence::SequencePointer;
22use crate::sequence::SequentialStreamAdapter;
23use crate::sequence::SequentialStreamExt as _;
24
25#[derive(Clone)]
26pub struct BufferedStrategy {
27    child: Arc<dyn LayoutStrategy>,
28    buffer_size: u64,
29    buffered_bytes: Arc<AtomicU64>,
30}
31
32impl BufferedStrategy {
33    pub fn new<S: LayoutStrategy>(child: S, buffer_size: u64) -> Self {
34        Self {
35            child: Arc::new(child),
36            buffer_size,
37            buffered_bytes: Arc::new(AtomicU64::new(0)),
38        }
39    }
40}
41
42#[async_trait]
43impl LayoutStrategy for BufferedStrategy {
44    async fn write_stream(
45        &self,
46        ctx: ArrayContext,
47        segment_sink: SegmentSinkRef,
48        stream: SendableSequentialStream,
49        eof: SequencePointer,
50        session: &VortexSession,
51    ) -> VortexResult<LayoutRef> {
52        let dtype = stream.dtype().clone();
53        let buffer_size = self.buffer_size;
54
55        let buffered_bytes_counter = Arc::clone(&self.buffered_bytes);
56        let buffered_stream = try_stream! {
57            let stream = stream.peekable();
58            pin_mut!(stream);
59
60            let mut nbytes = 0u64;
61            let mut chunks = VecDeque::new();
62
63            while let Some(chunk) = stream.as_mut().next().await {
64                let (sequence_id, chunk) = chunk?;
65                let chunk_size = chunk.nbytes();
66                nbytes += chunk_size;
67                buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
68                chunks.push_back(chunk);
69
70                // If this is the last element, flush everything.
71                if stream.as_mut().peek().await.is_none() {
72                    let mut sequence_ptr = sequence_id.descend();
73                    while let Some(chunk) = chunks.pop_front() {
74                        buffered_bytes_counter.fetch_sub(chunk.nbytes(), Ordering::Relaxed);
75                        yield (sequence_ptr.advance(), chunk)
76                    }
77                    break;
78                }
79
80                if nbytes < 2 * buffer_size {
81                    continue;
82                };
83
84                // Wait until we're at 2x the buffer size before flushing 1x the buffer size.
85                // This avoids small tail stragglers being flushed at the end of the file.
86                let mut sequence_ptr = sequence_id.descend();
87                while nbytes > buffer_size {
88                    let Some(chunk) = chunks.pop_front() else {
89                        break;
90                    };
91                    let chunk_size = chunk.nbytes();
92                    nbytes -= chunk_size;
93                    buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
94                    yield (sequence_ptr.advance(), chunk)
95                }
96            }
97        };
98
99        self.child
100            .write_stream(
101                ctx,
102                segment_sink,
103                SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
104                eof,
105                session,
106            )
107            .await
108    }
109
110    fn buffered_bytes(&self) -> u64 {
111        self.buffered_bytes.load(Ordering::Relaxed) + self.child.buffered_bytes()
112    }
113}