vortex_layout/layouts/
buffered.rs1use std::collections::VecDeque;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures::StreamExt as _;
12use futures::pin_mut;
13use vortex_array::ArrayContext;
14use vortex_error::VortexResult;
15use vortex_session::VortexSession;
16
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::segments::SegmentSinkRef;
20use crate::sequence::SendableSequentialStream;
21use crate::sequence::SequencePointer;
22use crate::sequence::SequentialStreamAdapter;
23use crate::sequence::SequentialStreamExt as _;
24
25#[derive(Clone)]
26pub struct BufferedStrategy {
27 child: Arc<dyn LayoutStrategy>,
28 buffer_size: u64,
29 buffered_bytes: Arc<AtomicU64>,
30}
31
32impl BufferedStrategy {
33 pub fn new<S: LayoutStrategy>(child: S, buffer_size: u64) -> Self {
34 Self {
35 child: Arc::new(child),
36 buffer_size,
37 buffered_bytes: Arc::new(AtomicU64::new(0)),
38 }
39 }
40}
41
42#[async_trait]
43impl LayoutStrategy for BufferedStrategy {
44 async fn write_stream(
45 &self,
46 ctx: ArrayContext,
47 segment_sink: SegmentSinkRef,
48 stream: SendableSequentialStream,
49 eof: SequencePointer,
50 session: &VortexSession,
51 ) -> VortexResult<LayoutRef> {
52 let dtype = stream.dtype().clone();
53 let buffer_size = self.buffer_size;
54
55 let buffered_bytes_counter = Arc::clone(&self.buffered_bytes);
56 let buffered_stream = try_stream! {
57 let stream = stream.peekable();
58 pin_mut!(stream);
59
60 let mut nbytes = 0u64;
61 let mut chunks = VecDeque::new();
62
63 while let Some(chunk) = stream.as_mut().next().await {
64 let (sequence_id, chunk) = chunk?;
65 let chunk_size = chunk.nbytes();
66 nbytes += chunk_size;
67 buffered_bytes_counter.fetch_add(chunk_size, Ordering::Relaxed);
68 chunks.push_back(chunk);
69
70 if stream.as_mut().peek().await.is_none() {
72 let mut sequence_ptr = sequence_id.descend();
73 while let Some(chunk) = chunks.pop_front() {
74 buffered_bytes_counter.fetch_sub(chunk.nbytes(), Ordering::Relaxed);
75 yield (sequence_ptr.advance(), chunk)
76 }
77 break;
78 }
79
80 if nbytes < 2 * buffer_size {
81 continue;
82 };
83
84 let mut sequence_ptr = sequence_id.descend();
87 while nbytes > buffer_size {
88 let Some(chunk) = chunks.pop_front() else {
89 break;
90 };
91 let chunk_size = chunk.nbytes();
92 nbytes -= chunk_size;
93 buffered_bytes_counter.fetch_sub(chunk_size, Ordering::Relaxed);
94 yield (sequence_ptr.advance(), chunk)
95 }
96 }
97 };
98
99 self.child
100 .write_stream(
101 ctx,
102 segment_sink,
103 SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
104 eof,
105 session,
106 )
107 .await
108 }
109
110 fn buffered_bytes(&self) -> u64 {
111 self.buffered_bytes.load(Ordering::Relaxed) + self.child.buffered_bytes()
112 }
113}