vortex_layout/layouts/
collect.rs1use std::sync::Arc;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::pin_mut;
10use vortex_array::ArrayContext;
11use vortex_array::arrays::ChunkedArray;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_io::runtime::Handle;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStream;
22use crate::sequence::SequentialStreamAdapter;
23
24pub struct CollectStrategy {
27 child: Arc<dyn LayoutStrategy>,
28}
29
30impl CollectStrategy {
31 pub fn new<S: LayoutStrategy>(child: S) -> CollectStrategy {
32 CollectStrategy {
33 child: Arc::new(child),
34 }
35 }
36}
37
38#[async_trait]
39impl LayoutStrategy for CollectStrategy {
40 async fn write_stream(
41 &self,
42 ctx: ArrayContext,
43 segment_sink: SegmentSinkRef,
44 stream: SendableSequentialStream,
45 eof: SequencePointer,
46 handle: Handle,
47 ) -> VortexResult<LayoutRef> {
48 let dtype = stream.dtype().clone();
50
51 let _dtype = dtype.clone();
52 let collected_stream = try_stream! {
53 pin_mut!(stream);
54
55 let mut chunks = Vec::new();
56 let mut latest_sequence_id = None;
57 while let Some(chunk) = stream.next().await {
58 let (sequence_id, chunk) = chunk?;
59 latest_sequence_id = Some(sequence_id);
60 chunks.push(chunk);
61 }
62
63 let collected = ChunkedArray::try_new(chunks, _dtype)?.to_array();
64 yield (latest_sequence_id.vortex_expect("must have visited at least one chunk"), collected);
65 };
66
67 let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
68
69 self.child
70 .write_stream(ctx, segment_sink, adapted, eof, handle)
71 .await
72 }
73
74 fn buffered_bytes(&self) -> u64 {
75 todo!()
76 }
77}