vortex_layout/layouts/chunked/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use arcref::ArcRef;
7use futures::StreamExt;
8use futures::stream::once;
9use vortex_array::ArrayContext;
10use vortex_error::VortexExpect;
11
12use crate::children::OwnedLayoutChildren;
13use crate::layouts::chunked::ChunkedLayout;
14use crate::layouts::flat::writer::FlatLayoutStrategy;
15use crate::segments::SequenceWriter;
16use crate::{
17    IntoLayout, LayoutStrategy, SendableLayoutFuture, SendableSequentialStream,
18    SequentialStreamAdapter, SequentialStreamExt as _,
19};
20
21pub struct ChunkedLayoutStrategy {
22    /// The layout strategy for each chunk.
23    pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
24}
25
26impl Default for ChunkedLayoutStrategy {
27    fn default() -> Self {
28        Self {
29            chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
30        }
31    }
32}
33
34impl LayoutStrategy for ChunkedLayoutStrategy {
35    fn write_stream(
36        &self,
37        ctx: &ArrayContext,
38        sequence_writer: SequenceWriter,
39        mut stream: SendableSequentialStream,
40    ) -> SendableLayoutFuture {
41        let chunk_strategy = self.chunk_strategy.clone();
42        let ctx = ctx.clone();
43        Box::pin(async move {
44            let mut child_layouts = Vec::new();
45            let mut row_count = 0;
46            let dtype = stream.dtype().clone();
47            while let Some(chunk) = stream.next().await {
48                let (sequence_id, chunk) = chunk?;
49                row_count += chunk.len() as u64;
50                let layout = chunk_strategy
51                    .write_stream(
52                        &ctx,
53                        sequence_writer.clone(),
54                        SequentialStreamAdapter::new(
55                            dtype.clone(),
56                            once(async { Ok((sequence_id, chunk)) }),
57                        )
58                        .sendable(),
59                    )
60                    .await?;
61                child_layouts.push(layout);
62            }
63
64            if child_layouts.len() == 1 {
65                Ok(child_layouts.pop().vortex_expect("must have one child"))
66            } else {
67                Ok(ChunkedLayout::new(
68                    row_count,
69                    dtype,
70                    OwnedLayoutChildren::layout_children(child_layouts),
71                )
72                .into_layout())
73            }
74        })
75    }
76}