vortex_layout/layouts/
buffered.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures::StreamExt as _;
12use vortex_array::ArrayContext;
13use vortex_error::VortexResult;
14use vortex_io::runtime::Handle;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStreamAdapter;
22use crate::sequence::SequentialStreamExt as _;
23
24#[derive(Clone)]
25pub struct BufferedStrategy {
26    child: Arc<dyn LayoutStrategy>,
27    buffer_size: u64,
28    buffered_bytes: Arc<AtomicU64>,
29}
30
31impl BufferedStrategy {
32    pub fn new<S: LayoutStrategy>(child: S, buffer_size: u64) -> Self {
33        Self {
34            child: Arc::new(child),
35            buffer_size,
36            buffered_bytes: Arc::new(AtomicU64::new(0)),
37        }
38    }
39}
40
41#[async_trait]
42impl LayoutStrategy for BufferedStrategy {
43    async fn write_stream(
44        &self,
45        ctx: ArrayContext,
46        segment_sink: SegmentSinkRef,
47        mut stream: SendableSequentialStream,
48        mut eof: SequencePointer,
49        handle: Handle,
50    ) -> VortexResult<LayoutRef> {
51        let dtype = stream.dtype().clone();
52        let buffer_size = self.buffer_size;
53
54        // We have no choice but to put our final buffers here!
55        // We cannot hold on to sequence ids across iterations of the stream, otherwise we can
56        // cause deadlocks with other columns that are waiting for us to flush.
57        let mut final_flush = eof.split_off();
58
59        let buffered_bytes_counter = self.buffered_bytes.clone();
60        let buffered_stream = try_stream! {
61            let mut nbytes = 0u64;
62            let mut chunks = VecDeque::new();
63
64            while let Some(chunk) = stream.as_mut().next().await {
65                let (sequence_id, chunk) = chunk?;
66                let chunk_size = chunk.nbytes();
67                nbytes += chunk_size;
68                buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
69                chunks.push_back(chunk);
70
71                if nbytes < 2 * buffer_size {
72                    continue;
73                };
74
75                // Wait until we're at 2x the buffer size before flushing 1x the buffer size
76                // This avoids small tail stragglers being flushed at the end of the file.
77                let mut sequence_ptr = sequence_id.descend();
78                while nbytes > buffer_size {
79                    let Some(chunk) = chunks.pop_front() else {
80                        break;
81                    };
82                    let chunk_size = chunk.nbytes();
83                    nbytes -= chunk_size;
84                    buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
85                    yield (sequence_ptr.advance(), chunk)
86                }
87            }
88
89            // Now the input stream has ended, flush everything
90            while let Some(chunk) = chunks.pop_front() {
91                let chunk_size = chunk.nbytes();
92                buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
93                yield (final_flush.advance(), chunk)
94            }
95        };
96
97        self.child
98            .write_stream(
99                ctx,
100                segment_sink,
101                SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
102                eof,
103                handle,
104            )
105            .await
106    }
107
108    fn buffered_bytes(&self) -> u64 {
109        self.buffered_bytes.load(Ordering::Relaxed) + self.child.buffered_bytes()
110    }
111}