vortex_layout/layouts/
collect.rs1use std::sync::Arc;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::pin_mut;
10use vortex_array::ArrayContext;
11use vortex_array::IntoArray;
12use vortex_array::arrays::ChunkedArray;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::runtime::Handle;
16
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::segments::SegmentSinkRef;
20use crate::sequence::SendableSequentialStream;
21use crate::sequence::SequencePointer;
22use crate::sequence::SequentialStream;
23use crate::sequence::SequentialStreamAdapter;
24
25pub struct CollectStrategy {
28 child: Arc<dyn LayoutStrategy>,
29}
30
31impl CollectStrategy {
32 pub fn new<S: LayoutStrategy>(child: S) -> CollectStrategy {
33 CollectStrategy {
34 child: Arc::new(child),
35 }
36 }
37}
38
39#[async_trait]
40impl LayoutStrategy for CollectStrategy {
41 async fn write_stream(
42 &self,
43 ctx: ArrayContext,
44 segment_sink: SegmentSinkRef,
45 stream: SendableSequentialStream,
46 eof: SequencePointer,
47 handle: Handle,
48 ) -> VortexResult<LayoutRef> {
49 let dtype = stream.dtype().clone();
51
52 let _dtype = dtype.clone();
53 let collected_stream = try_stream! {
54 pin_mut!(stream);
55
56 let mut chunks = Vec::new();
57 let mut latest_sequence_id = None;
58 while let Some(chunk) = stream.next().await {
59 let (sequence_id, chunk) = chunk?;
60 latest_sequence_id = Some(sequence_id);
61 chunks.push(chunk);
62 }
63
64 let collected = ChunkedArray::try_new(chunks, _dtype)?.into_array();
65 yield (latest_sequence_id.vortex_expect("must have visited at least one chunk"), collected);
66 };
67
68 let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
69
70 self.child
71 .write_stream(ctx, segment_sink, adapted, eof, handle)
72 .await
73 }
74
75 fn buffered_bytes(&self) -> u64 {
76 todo!()
77 }
78}