vortex_layout/layouts/chunked/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_stream::stream;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::TryStreamExt;
10use futures::stream;
11use vortex_array::ArrayContext;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_io::runtime::Handle;
15
16use crate::IntoLayout;
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::children::OwnedLayoutChildren;
20use crate::layouts::chunked::ChunkedLayout;
21use crate::segments::SegmentSinkRef;
22use crate::sequence::SendableSequentialStream;
23use crate::sequence::SequencePointer;
24use crate::sequence::SequentialStreamAdapter;
25use crate::sequence::SequentialStreamExt as _;
26
27#[derive(Clone)]
28pub struct ChunkedLayoutStrategy {
29    /// The layout strategy for each chunk.
30    pub chunk_strategy: Arc<dyn LayoutStrategy>,
31}
32
33impl ChunkedLayoutStrategy {
34    pub fn new<S: LayoutStrategy>(chunk_strategy: S) -> Self {
35        Self {
36            chunk_strategy: Arc::new(chunk_strategy),
37        }
38    }
39}
40
41#[async_trait]
42impl LayoutStrategy for ChunkedLayoutStrategy {
43    async fn write_stream(
44        &self,
45        ctx: ArrayContext,
46        segment_sink: SegmentSinkRef,
47        stream: SendableSequentialStream,
48        mut eof: SequencePointer,
49        handle: Handle,
50    ) -> VortexResult<LayoutRef> {
51        let dtype = stream.dtype().clone();
52        let dtype2 = dtype.clone();
53        let chunk_strategy = self.chunk_strategy.clone();
54
55        // We spawn each child to allow parallelism when processing chunks.
56        let stream = stream! {
57            let mut stream = stream;
58            while let Some(chunk) = stream.next().await {
59                let chunk_eof = eof.split_off();
60
61                let chunk_strategy = chunk_strategy.clone();
62                let ctx = ctx.clone();
63                let segment_sink = segment_sink.clone();
64                let dtype = dtype2.clone();
65
66                yield handle.spawn_nested(move |handle| async move {
67                    chunk_strategy
68                        .write_stream(
69                            ctx,
70                            segment_sink,
71                            SequentialStreamAdapter::new(
72                                dtype,
73                                stream::iter([chunk]),
74                            )
75                            .sendable(),
76                            chunk_eof,
77                            handle,
78                        )
79                        .await
80                })
81            }
82        };
83
84        // Poll all of our children concurrently to accumulate their layouts.
85        let mut child_layouts: Vec<LayoutRef> = stream.buffered(usize::MAX).try_collect().await?;
86
87        if child_layouts.len() == 1 {
88            Ok(child_layouts.pop().vortex_expect("must have one child"))
89        } else {
90            let row_count = child_layouts.iter().map(|layout| layout.row_count()).sum();
91            Ok(ChunkedLayout::new(
92                row_count,
93                dtype,
94                OwnedLayoutChildren::layout_children(child_layouts),
95            )
96            .into_layout())
97        }
98    }
99
100    fn buffered_bytes(&self) -> u64 {
101        self.chunk_strategy.buffered_bytes()
102    }
103}