Skip to main content

vortex_layout/layouts/
collect.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::pin_mut;
10use vortex_array::ArrayContext;
11use vortex_array::IntoArray;
12use vortex_array::arrays::ChunkedArray;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::runtime::Handle;
16
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::segments::SegmentSinkRef;
20use crate::sequence::SendableSequentialStream;
21use crate::sequence::SequencePointer;
22use crate::sequence::SequentialStream;
23use crate::sequence::SequentialStreamAdapter;
24
25/// A strategy that collects all chunks and turns them into a single array chunk to pass into
26/// a child strategy.
27pub struct CollectStrategy {
28    child: Arc<dyn LayoutStrategy>,
29}
30
31impl CollectStrategy {
32    pub fn new<S: LayoutStrategy>(child: S) -> CollectStrategy {
33        CollectStrategy {
34            child: Arc::new(child),
35        }
36    }
37}
38
39#[async_trait]
40impl LayoutStrategy for CollectStrategy {
41    async fn write_stream(
42        &self,
43        ctx: ArrayContext,
44        segment_sink: SegmentSinkRef,
45        stream: SendableSequentialStream,
46        eof: SequencePointer,
47        handle: Handle,
48    ) -> VortexResult<LayoutRef> {
49        // Read the whole stream, then write one Chunked stream to the inner thing
50        let dtype = stream.dtype().clone();
51
52        let _dtype = dtype.clone();
53        let collected_stream = try_stream! {
54            pin_mut!(stream);
55
56            let mut chunks = Vec::new();
57            let mut latest_sequence_id = None;
58            while let Some(chunk) = stream.next().await {
59                let (sequence_id, chunk) = chunk?;
60                latest_sequence_id = Some(sequence_id);
61                chunks.push(chunk);
62            }
63
64            let collected = ChunkedArray::try_new(chunks, _dtype)?.into_array();
65            yield (latest_sequence_id.vortex_expect("must have visited at least one chunk"), collected);
66        };
67
68        let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
69
70        self.child
71            .write_stream(ctx, segment_sink, adapted, eof, handle)
72            .await
73    }
74
75    fn buffered_bytes(&self) -> u64 {
76        todo!()
77    }
78}