vortex_layout/layouts/
buffered.rs1use std::collections::VecDeque;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use async_stream::try_stream;
9use async_trait::async_trait;
10use futures::StreamExt as _;
11use vortex_array::ArrayContext;
12use vortex_error::VortexResult;
13use vortex_io::runtime::Handle;
14
15use crate::segments::SegmentSinkRef;
16use crate::sequence::{
17 SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt as _,
18};
19use crate::{LayoutRef, LayoutStrategy};
20
21#[derive(Clone)]
22pub struct BufferedStrategy {
23 child: Arc<dyn LayoutStrategy>,
24 buffer_size: u64,
25 buffered_bytes: Arc<AtomicU64>,
26}
27
28impl BufferedStrategy {
29 pub fn new<S: LayoutStrategy>(child: S, buffer_size: u64) -> Self {
30 Self {
31 child: Arc::new(child),
32 buffer_size,
33 buffered_bytes: Arc::new(AtomicU64::new(0)),
34 }
35 }
36}
37
38#[async_trait]
39impl LayoutStrategy for BufferedStrategy {
40 async fn write_stream(
41 &self,
42 ctx: ArrayContext,
43 segment_sink: SegmentSinkRef,
44 mut stream: SendableSequentialStream,
45 mut eof: SequencePointer,
46 handle: Handle,
47 ) -> VortexResult<LayoutRef> {
48 let dtype = stream.dtype().clone();
49 let buffer_size = self.buffer_size;
50
51 let mut final_flush = eof.split_off();
55
56 let buffered_bytes_counter = self.buffered_bytes.clone();
57 let buffered_stream = try_stream! {
58 let mut nbytes = 0u64;
59 let mut chunks = VecDeque::new();
60
61 while let Some(chunk) = stream.as_mut().next().await {
62 let (sequence_id, chunk) = chunk?;
63 let chunk_size = chunk.nbytes();
64 nbytes += chunk_size;
65 buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
66 chunks.push_back(chunk);
67
68 if nbytes < 2 * buffer_size {
69 continue;
70 };
71
72 let mut sequence_ptr = sequence_id.descend();
75 while nbytes > buffer_size {
76 let Some(chunk) = chunks.pop_front() else {
77 break;
78 };
79 let chunk_size = chunk.nbytes();
80 nbytes -= chunk_size;
81 buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
82 yield (sequence_ptr.advance(), chunk)
83 }
84 }
85
86 while let Some(chunk) = chunks.pop_front() {
88 let chunk_size = chunk.nbytes();
89 buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
90 yield (final_flush.advance(), chunk)
91 }
92 };
93
94 self.child
95 .write_stream(
96 ctx,
97 segment_sink,
98 SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
99 eof,
100 handle,
101 )
102 .await
103 }
104
105 fn buffered_bytes(&self) -> u64 {
106 self.buffered_bytes.load(Ordering::Relaxed) + self.child.buffered_bytes()
107 }
108}