vortex_layout/layouts/
buffered.rs1use std::collections::VecDeque;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::ArrayContext;
10use vortex_error::VortexResult;
11
12use crate::segments::SequenceWriter;
13use crate::{
14 LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
15 SequentialStreamExt as _,
16};
17
18#[derive(Clone)]
19pub struct BufferedStrategy<S> {
20 child: S,
21 buffer_size: u64,
22}
23
24impl<S: LayoutStrategy> BufferedStrategy<S> {
25 pub fn new(child: S, buffer_size: u64) -> Self {
26 Self { child, buffer_size }
27 }
28}
29
30#[async_trait]
31impl<S> LayoutStrategy for BufferedStrategy<S>
32where
33 S: LayoutStrategy,
34{
35 async fn write_stream(
36 &self,
37 ctx: &ArrayContext,
38 sequence_writer: SequenceWriter,
39 stream: SendableSequentialStream,
40 ) -> VortexResult<LayoutRef> {
41 let dtype = stream.dtype().clone();
42 let buffer_size = self.buffer_size;
43 let buffered_stream = try_stream! {
44 let stream = stream.peekable();
45 pin_mut!(stream);
46
47 let mut nbytes = 0u64;
48 let mut chunks = VecDeque::new();
49
50 while let Some(chunk) = stream.as_mut().next().await {
51 let (sequence_id, chunk) = chunk?;
52 nbytes += chunk.nbytes();
53 chunks.push_back(chunk);
54
55 if stream.as_mut().peek().await.is_none() {
57 let mut sequence_pointer = sequence_id.descend();
58 while let Some(chunk) = chunks.pop_front() {
59 yield (sequence_pointer.advance(), chunk)
60 }
61 break;
62 }
63
64 if nbytes < 2 * buffer_size {
65 continue;
66 };
67 let mut sequence_pointer = sequence_id.descend();
70 while nbytes > buffer_size {
71 let Some(chunk) = chunks.pop_front() else {
72 break;
73 };
74 nbytes -= chunk.nbytes();
75 yield (sequence_pointer.advance(), chunk)
76 }
77 }
78 };
79 self.child
80 .write_stream(
81 ctx,
82 sequence_writer,
83 SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
84 )
85 .await
86 }
87}