vortex_layout/layouts/
collect.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::{StreamExt, pin_mut};
9use vortex_array::ArrayContext;
10use vortex_array::arrays::ChunkedArray;
11use vortex_error::{VortexExpect, VortexResult};
12use vortex_io::runtime::Handle;
13
14use crate::segments::SegmentSinkRef;
15use crate::sequence::{
16    SendableSequentialStream, SequencePointer, SequentialStream, SequentialStreamAdapter,
17};
18use crate::{LayoutRef, LayoutStrategy};
19
20/// A strategy that collects all chunks and turns them into a single array chunk to pass into
21/// a child strategy.
22pub struct CollectStrategy {
23    child: Arc<dyn LayoutStrategy>,
24}
25
26impl CollectStrategy {
27    pub fn new<S: LayoutStrategy>(child: S) -> CollectStrategy {
28        CollectStrategy {
29            child: Arc::new(child),
30        }
31    }
32}
33
34#[async_trait]
35impl LayoutStrategy for CollectStrategy {
36    async fn write_stream(
37        &self,
38        ctx: ArrayContext,
39        segment_sink: SegmentSinkRef,
40        stream: SendableSequentialStream,
41        eof: SequencePointer,
42        handle: Handle,
43    ) -> VortexResult<LayoutRef> {
44        // Read the whole stream, then write one Chunked stream to the inner thing
45        let dtype = stream.dtype().clone();
46
47        let _dtype = dtype.clone();
48        let collected_stream = try_stream! {
49            pin_mut!(stream);
50
51            let mut chunks = Vec::new();
52            let mut latest_sequence_id = None;
53            while let Some(chunk) = stream.next().await {
54                let (sequence_id, chunk) = chunk?;
55                latest_sequence_id = Some(sequence_id);
56                chunks.push(chunk);
57            }
58
59            let collected = ChunkedArray::try_new(chunks, _dtype)?.to_array();
60            yield (latest_sequence_id.vortex_expect("must have visited at least one chunk"), collected);
61        };
62
63        let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
64
65        self.child
66            .write_stream(ctx, segment_sink, adapted, eof, handle)
67            .await
68    }
69
70    fn buffered_bytes(&self) -> u64 {
71        todo!()
72    }
73}