vortex_layout/layouts/chunked/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_stream::stream;
7use async_trait::async_trait;
8use futures::{StreamExt, TryStreamExt, stream};
9use vortex_array::ArrayContext;
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_io::runtime::Handle;
12
13use crate::children::OwnedLayoutChildren;
14use crate::layouts::chunked::ChunkedLayout;
15use crate::segments::SegmentSinkRef;
16use crate::sequence::{
17    SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt as _,
18};
19use crate::{IntoLayout, LayoutRef, LayoutStrategy};
20
21#[derive(Clone)]
22pub struct ChunkedLayoutStrategy {
23    /// The layout strategy for each chunk.
24    pub chunk_strategy: Arc<dyn LayoutStrategy>,
25}
26
27impl ChunkedLayoutStrategy {
28    pub fn new<S: LayoutStrategy>(chunk_strategy: S) -> Self {
29        Self {
30            chunk_strategy: Arc::new(chunk_strategy),
31        }
32    }
33}
34
35#[async_trait]
36impl LayoutStrategy for ChunkedLayoutStrategy {
37    async fn write_stream(
38        &self,
39        ctx: ArrayContext,
40        segment_sink: SegmentSinkRef,
41        stream: SendableSequentialStream,
42        mut eof: SequencePointer,
43        handle: Handle,
44    ) -> VortexResult<LayoutRef> {
45        let dtype = stream.dtype().clone();
46        let dtype2 = dtype.clone();
47        let chunk_strategy = self.chunk_strategy.clone();
48
49        // We spawn each child to allow parallelism when processing chunks.
50        let stream = stream! {
51            let mut stream = stream;
52            while let Some(chunk) = stream.next().await {
53                let chunk_eof = eof.split_off();
54
55                let chunk_strategy = chunk_strategy.clone();
56                let ctx = ctx.clone();
57                let segment_sink = segment_sink.clone();
58                let dtype = dtype2.clone();
59
60                yield handle.spawn_nested(move |handle| async move {
61                    chunk_strategy
62                        .write_stream(
63                            ctx,
64                            segment_sink,
65                            SequentialStreamAdapter::new(
66                                dtype,
67                                stream::iter([chunk]),
68                            )
69                            .sendable(),
70                            chunk_eof,
71                            handle,
72                        )
73                        .await
74                })
75            }
76        };
77
78        // Poll all of our children concurrently to accumulate their layouts.
79        let mut child_layouts: Vec<LayoutRef> = stream.buffered(usize::MAX).try_collect().await?;
80
81        if child_layouts.len() == 1 {
82            Ok(child_layouts.pop().vortex_expect("must have one child"))
83        } else {
84            let row_count = child_layouts.iter().map(|layout| layout.row_count()).sum();
85            Ok(ChunkedLayout::new(
86                row_count,
87                dtype,
88                OwnedLayoutChildren::layout_children(child_layouts),
89            )
90            .into_layout())
91        }
92    }
93
94    fn buffered_bytes(&self) -> u64 {
95        self.chunk_strategy.buffered_bytes()
96    }
97}