Skip to main content

vortex_layout/layouts/chunked/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_stream::stream;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::TryStreamExt;
10use futures::stream;
11use vortex_array::ArrayContext;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_io::session::RuntimeSessionExt;
15use vortex_session::VortexSession;
16
17use crate::IntoLayout;
18use crate::LayoutRef;
19use crate::LayoutStrategy;
20use crate::children::OwnedLayoutChildren;
21use crate::layouts::chunked::ChunkedLayout;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt as _;
27
28#[derive(Clone)]
29pub struct ChunkedLayoutStrategy {
30    /// The layout strategy for each chunk.
31    pub chunk_strategy: Arc<dyn LayoutStrategy>,
32}
33
34impl ChunkedLayoutStrategy {
35    pub fn new<S: LayoutStrategy>(chunk_strategy: S) -> Self {
36        Self {
37            chunk_strategy: Arc::new(chunk_strategy),
38        }
39    }
40}
41
42#[async_trait]
43impl LayoutStrategy for ChunkedLayoutStrategy {
44    async fn write_stream(
45        &self,
46        ctx: ArrayContext,
47        segment_sink: SegmentSinkRef,
48        stream: SendableSequentialStream,
49        mut eof: SequencePointer,
50        session: &VortexSession,
51    ) -> VortexResult<LayoutRef> {
52        let dtype = stream.dtype().clone();
53        let dtype2 = dtype.clone();
54        let chunk_strategy = Arc::clone(&self.chunk_strategy);
55        let handle = session.handle();
56
57        // We spawn each child to allow parallelism when processing chunks.
58        let stream = stream! {
59            let mut stream = stream;
60            while let Some(chunk) = stream.next().await {
61                let chunk_eof = eof.split_off();
62
63                let chunk_strategy = Arc::clone(&chunk_strategy);
64                let ctx = ctx.clone();
65                let segment_sink = Arc::clone(&segment_sink);
66                let dtype = dtype2.clone();
67                let session = session.clone();
68
69                yield handle.spawn_nested(move |handle| async move {
70                    let session = session.with_handle(handle);
71                    chunk_strategy
72                        .write_stream(
73                            ctx,
74                            segment_sink,
75                            SequentialStreamAdapter::new(
76                                dtype,
77                                stream::iter([chunk]),
78                            )
79                            .sendable(),
80                            chunk_eof,
81                            &session,
82                        )
83                        .await
84                })
85            }
86        };
87
88        // Poll all of our children concurrently to accumulate their layouts.
89        let mut child_layouts: Vec<LayoutRef> = stream.buffered(usize::MAX).try_collect().await?;
90
91        if child_layouts.len() == 1 {
92            Ok(child_layouts.pop().vortex_expect("must have one child"))
93        } else {
94            let row_count = child_layouts.iter().map(|layout| layout.row_count()).sum();
95            Ok(ChunkedLayout::new(
96                row_count,
97                dtype,
98                OwnedLayoutChildren::layout_children(child_layouts),
99            )
100            .into_layout())
101        }
102    }
103
104    fn buffered_bytes(&self) -> u64 {
105        self.chunk_strategy.buffered_bytes()
106    }
107}