vortex_layout/layouts/chunked/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use async_trait::async_trait;
5use futures::StreamExt;
6use futures::stream::once;
7use vortex_array::ArrayContext;
8use vortex_error::{VortexExpect, VortexResult};
9
10use crate::children::OwnedLayoutChildren;
11use crate::layouts::chunked::ChunkedLayout;
12use crate::segments::SequenceWriter;
13use crate::{
14    IntoLayout, LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
15    SequentialStreamExt as _,
16};
17
18#[derive(Clone)]
19pub struct ChunkedLayoutStrategy<S> {
20    /// The layout strategy for each chunk.
21    pub chunk_strategy: S,
22}
23
24impl<S> ChunkedLayoutStrategy<S>
25where
26    S: LayoutStrategy,
27{
28    pub fn new(chunk_strategy: S) -> Self {
29        Self { chunk_strategy }
30    }
31}
32
33#[async_trait]
34impl<S> LayoutStrategy for ChunkedLayoutStrategy<S>
35where
36    S: LayoutStrategy,
37{
38    async fn write_stream(
39        &self,
40        ctx: &ArrayContext,
41        sequence_writer: SequenceWriter,
42        mut stream: SendableSequentialStream,
43    ) -> VortexResult<LayoutRef> {
44        let ctx = ctx.clone();
45        let mut child_layouts = Vec::new();
46        let mut row_count = 0;
47        let dtype = stream.dtype().clone();
48        while let Some(chunk) = stream.next().await {
49            let (sequence_id, chunk) = chunk?;
50            row_count += chunk.len() as u64;
51            let layout = self
52                .chunk_strategy
53                .write_stream(
54                    &ctx,
55                    sequence_writer.clone(),
56                    SequentialStreamAdapter::new(
57                        dtype.clone(),
58                        once(async { Ok((sequence_id, chunk)) }),
59                    )
60                    .sendable(),
61                )
62                .await?;
63            child_layouts.push(layout);
64        }
65
66        if child_layouts.len() == 1 {
67            Ok(child_layouts.pop().vortex_expect("must have one child"))
68        } else {
69            Ok(ChunkedLayout::new(
70                row_count,
71                dtype,
72                OwnedLayoutChildren::layout_children(child_layouts),
73            )
74            .into_layout())
75        }
76    }
77}