vortex_layout/layouts/
buffered.rs1use std::collections::VecDeque;
5
6use arcref::ArcRef;
7use async_stream::try_stream;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::ArrayContext;
10
11use crate::segments::SequenceWriter;
12use crate::{
13 LayoutStrategy, SendableLayoutFuture, SendableSequentialStream, SequentialStreamAdapter,
14 SequentialStreamExt as _,
15};
16
17pub struct BufferedStrategy {
18 child: ArcRef<dyn LayoutStrategy>,
19 buffer_size: u64,
20}
21
22impl BufferedStrategy {
23 pub fn new(child: ArcRef<dyn LayoutStrategy>, buffer_size: u64) -> Self {
24 Self { child, buffer_size }
25 }
26}
27
28impl LayoutStrategy for BufferedStrategy {
29 fn write_stream(
30 &self,
31 ctx: &ArrayContext,
32 sequence_writer: SequenceWriter,
33 stream: SendableSequentialStream,
34 ) -> SendableLayoutFuture {
35 let dtype = stream.dtype().clone();
36 let buffer_size = self.buffer_size;
37 let buffered_stream = try_stream! {
38 let stream = stream.peekable();
39 pin_mut!(stream);
40
41 let mut nbytes = 0u64;
42 let mut chunks = VecDeque::new();
43
44 while let Some(chunk) = stream.as_mut().next().await {
45 let (sequence_id, chunk) = chunk?;
46 nbytes += chunk.nbytes();
47 chunks.push_back(chunk);
48
49 if stream.as_mut().peek().await.is_none() {
51 let mut sequence_pointer = sequence_id.descend();
52 while let Some(chunk) = chunks.pop_front() {
53 yield (sequence_pointer.advance(), chunk)
54 }
55 break;
56 }
57
58 if nbytes < 2 * buffer_size {
59 continue;
60 };
61 let mut sequence_pointer = sequence_id.descend();
64 while nbytes > buffer_size {
65 let Some(chunk) = chunks.pop_front() else {
66 break;
67 };
68 nbytes -= chunk.nbytes();
69 yield (sequence_pointer.advance(), chunk)
70 }
71 }
72 };
73 self.child.write_stream(
74 ctx,
75 sequence_writer,
76 SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
77 )
78 }
79}