vortex_layout/layouts/
buffered.rs1use std::collections::VecDeque;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures::StreamExt as _;
12use vortex_array::ArrayContext;
13use vortex_error::VortexResult;
14use vortex_io::runtime::Handle;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStreamAdapter;
22use crate::sequence::SequentialStreamExt as _;
23
24#[derive(Clone)]
25pub struct BufferedStrategy {
26 child: Arc<dyn LayoutStrategy>,
27 buffer_size: u64,
28 buffered_bytes: Arc<AtomicU64>,
29}
30
31impl BufferedStrategy {
32 pub fn new<S: LayoutStrategy>(child: S, buffer_size: u64) -> Self {
33 Self {
34 child: Arc::new(child),
35 buffer_size,
36 buffered_bytes: Arc::new(AtomicU64::new(0)),
37 }
38 }
39}
40
41#[async_trait]
42impl LayoutStrategy for BufferedStrategy {
43 async fn write_stream(
44 &self,
45 ctx: ArrayContext,
46 segment_sink: SegmentSinkRef,
47 mut stream: SendableSequentialStream,
48 mut eof: SequencePointer,
49 handle: Handle,
50 ) -> VortexResult<LayoutRef> {
51 let dtype = stream.dtype().clone();
52 let buffer_size = self.buffer_size;
53
54 let mut final_flush = eof.split_off();
58
59 let buffered_bytes_counter = self.buffered_bytes.clone();
60 let buffered_stream = try_stream! {
61 let mut nbytes = 0u64;
62 let mut chunks = VecDeque::new();
63
64 while let Some(chunk) = stream.as_mut().next().await {
65 let (sequence_id, chunk) = chunk?;
66 let chunk_size = chunk.nbytes();
67 nbytes += chunk_size;
68 buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
69 chunks.push_back(chunk);
70
71 if nbytes < 2 * buffer_size {
72 continue;
73 };
74
75 let mut sequence_ptr = sequence_id.descend();
78 while nbytes > buffer_size {
79 let Some(chunk) = chunks.pop_front() else {
80 break;
81 };
82 let chunk_size = chunk.nbytes();
83 nbytes -= chunk_size;
84 buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
85 yield (sequence_ptr.advance(), chunk)
86 }
87 }
88
89 while let Some(chunk) = chunks.pop_front() {
91 let chunk_size = chunk.nbytes();
92 buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
93 yield (final_flush.advance(), chunk)
94 }
95 };
96
97 self.child
98 .write_stream(
99 ctx,
100 segment_sink,
101 SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
102 eof,
103 handle,
104 )
105 .await
106 }
107
108 fn buffered_bytes(&self) -> u64 {
109 self.buffered_bytes.load(Ordering::Relaxed) + self.child.buffered_bytes()
110 }
111}