vortex_layout/layouts/
buffered.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::VecDeque;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use async_stream::try_stream;
9use async_trait::async_trait;
10use futures::StreamExt as _;
11use vortex_array::ArrayContext;
12use vortex_error::VortexResult;
13use vortex_io::runtime::Handle;
14
15use crate::segments::SegmentSinkRef;
16use crate::sequence::{
17    SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt as _,
18};
19use crate::{LayoutRef, LayoutStrategy};
20
21#[derive(Clone)]
22pub struct BufferedStrategy {
23    child: Arc<dyn LayoutStrategy>,
24    buffer_size: u64,
25    buffered_bytes: Arc<AtomicU64>,
26}
27
28impl BufferedStrategy {
29    pub fn new<S: LayoutStrategy>(child: S, buffer_size: u64) -> Self {
30        Self {
31            child: Arc::new(child),
32            buffer_size,
33            buffered_bytes: Arc::new(AtomicU64::new(0)),
34        }
35    }
36}
37
38#[async_trait]
39impl LayoutStrategy for BufferedStrategy {
40    async fn write_stream(
41        &self,
42        ctx: ArrayContext,
43        segment_sink: SegmentSinkRef,
44        mut stream: SendableSequentialStream,
45        mut eof: SequencePointer,
46        handle: Handle,
47    ) -> VortexResult<LayoutRef> {
48        let dtype = stream.dtype().clone();
49        let buffer_size = self.buffer_size;
50
51        // We have no choice but to put our final buffers here!
52        // We cannot hold on to sequence ids across iterations of the stream, otherwise we can
53        // cause deadlocks with other columns that are waiting for us to flush.
54        let mut final_flush = eof.split_off();
55
56        let buffered_bytes_counter = self.buffered_bytes.clone();
57        let buffered_stream = try_stream! {
58            let mut nbytes = 0u64;
59            let mut chunks = VecDeque::new();
60
61            while let Some(chunk) = stream.as_mut().next().await {
62                let (sequence_id, chunk) = chunk?;
63                let chunk_size = chunk.nbytes();
64                nbytes += chunk_size;
65                buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
66                chunks.push_back(chunk);
67
68                if nbytes < 2 * buffer_size {
69                    continue;
70                };
71
72                // Wait until we're at 2x the buffer size before flushing 1x the buffer size
73                // This avoids small tail stragglers being flushed at the end of the file.
74                let mut sequence_ptr = sequence_id.descend();
75                while nbytes > buffer_size {
76                    let Some(chunk) = chunks.pop_front() else {
77                        break;
78                    };
79                    let chunk_size = chunk.nbytes();
80                    nbytes -= chunk_size;
81                    buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
82                    yield (sequence_ptr.advance(), chunk)
83                }
84            }
85
86            // Now the input stream has ended, flush everything
87            while let Some(chunk) = chunks.pop_front() {
88                let chunk_size = chunk.nbytes();
89                buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
90                yield (final_flush.advance(), chunk)
91            }
92        };
93
94        self.child
95            .write_stream(
96                ctx,
97                segment_sink,
98                SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
99                eof,
100                handle,
101            )
102            .await
103    }
104
105    fn buffered_bytes(&self) -> u64 {
106        self.buffered_bytes.load(Ordering::Relaxed) + self.child.buffered_bytes()
107    }
108}