vortex_layout/layouts/chunked/
writer.rs1use async_trait::async_trait;
5use futures::StreamExt;
6use futures::stream::once;
7use vortex_array::ArrayContext;
8use vortex_error::{VortexExpect, VortexResult};
9
10use crate::children::OwnedLayoutChildren;
11use crate::layouts::chunked::ChunkedLayout;
12use crate::segments::SequenceWriter;
13use crate::{
14 IntoLayout, LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
15 SequentialStreamExt as _,
16};
17
18#[derive(Clone)]
19pub struct ChunkedLayoutStrategy<S> {
20 pub chunk_strategy: S,
22}
23
24impl<S> ChunkedLayoutStrategy<S>
25where
26 S: LayoutStrategy,
27{
28 pub fn new(chunk_strategy: S) -> Self {
29 Self { chunk_strategy }
30 }
31}
32
33#[async_trait]
34impl<S> LayoutStrategy for ChunkedLayoutStrategy<S>
35where
36 S: LayoutStrategy,
37{
38 async fn write_stream(
39 &self,
40 ctx: &ArrayContext,
41 sequence_writer: SequenceWriter,
42 mut stream: SendableSequentialStream,
43 ) -> VortexResult<LayoutRef> {
44 let ctx = ctx.clone();
45 let mut child_layouts = Vec::new();
46 let mut row_count = 0;
47 let dtype = stream.dtype().clone();
48 while let Some(chunk) = stream.next().await {
49 let (sequence_id, chunk) = chunk?;
50 row_count += chunk.len() as u64;
51 let layout = self
52 .chunk_strategy
53 .write_stream(
54 &ctx,
55 sequence_writer.clone(),
56 SequentialStreamAdapter::new(
57 dtype.clone(),
58 once(async { Ok((sequence_id, chunk)) }),
59 )
60 .sendable(),
61 )
62 .await?;
63 child_layouts.push(layout);
64 }
65
66 if child_layouts.len() == 1 {
67 Ok(child_layouts.pop().vortex_expect("must have one child"))
68 } else {
69 Ok(ChunkedLayout::new(
70 row_count,
71 dtype,
72 OwnedLayoutChildren::layout_children(child_layouts),
73 )
74 .into_layout())
75 }
76 }
77}