vortex_layout/layouts/chunked/
writer.rs1use std::sync::Arc;
5
6use arcref::ArcRef;
7use futures::StreamExt;
8use futures::stream::once;
9use vortex_array::ArrayContext;
10use vortex_error::VortexExpect;
11
12use crate::children::OwnedLayoutChildren;
13use crate::layouts::chunked::ChunkedLayout;
14use crate::layouts::flat::writer::FlatLayoutStrategy;
15use crate::segments::SequenceWriter;
16use crate::{
17 IntoLayout, LayoutStrategy, SendableLayoutFuture, SendableSequentialStream,
18 SequentialStreamAdapter, SequentialStreamExt as _,
19};
20
21pub struct ChunkedLayoutStrategy {
22 pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
24}
25
26impl Default for ChunkedLayoutStrategy {
27 fn default() -> Self {
28 Self {
29 chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
30 }
31 }
32}
33
34impl LayoutStrategy for ChunkedLayoutStrategy {
35 fn write_stream(
36 &self,
37 ctx: &ArrayContext,
38 sequence_writer: SequenceWriter,
39 mut stream: SendableSequentialStream,
40 ) -> SendableLayoutFuture {
41 let chunk_strategy = self.chunk_strategy.clone();
42 let ctx = ctx.clone();
43 Box::pin(async move {
44 let mut child_layouts = Vec::new();
45 let mut row_count = 0;
46 let dtype = stream.dtype().clone();
47 while let Some(chunk) = stream.next().await {
48 let (sequence_id, chunk) = chunk?;
49 row_count += chunk.len() as u64;
50 let layout = chunk_strategy
51 .write_stream(
52 &ctx,
53 sequence_writer.clone(),
54 SequentialStreamAdapter::new(
55 dtype.clone(),
56 once(async { Ok((sequence_id, chunk)) }),
57 )
58 .sendable(),
59 )
60 .await?;
61 child_layouts.push(layout);
62 }
63
64 if child_layouts.len() == 1 {
65 Ok(child_layouts.pop().vortex_expect("must have one child"))
66 } else {
67 Ok(ChunkedLayout::new(
68 row_count,
69 dtype,
70 OwnedLayoutChildren::layout_children(child_layouts),
71 )
72 .into_layout())
73 }
74 })
75 }
76}