vortex_layout/layouts/chunked/
writer.rs1use std::sync::Arc;
5
6use async_stream::stream;
7use async_trait::async_trait;
8use futures::{StreamExt, TryStreamExt, stream};
9use vortex_array::ArrayContext;
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_io::runtime::Handle;
12
13use crate::children::OwnedLayoutChildren;
14use crate::layouts::chunked::ChunkedLayout;
15use crate::segments::SegmentSinkRef;
16use crate::sequence::{
17 SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt as _,
18};
19use crate::{IntoLayout, LayoutRef, LayoutStrategy};
20
21#[derive(Clone)]
22pub struct ChunkedLayoutStrategy {
23 pub chunk_strategy: Arc<dyn LayoutStrategy>,
25}
26
27impl ChunkedLayoutStrategy {
28 pub fn new<S: LayoutStrategy>(chunk_strategy: S) -> Self {
29 Self {
30 chunk_strategy: Arc::new(chunk_strategy),
31 }
32 }
33}
34
35#[async_trait]
36impl LayoutStrategy for ChunkedLayoutStrategy {
37 async fn write_stream(
38 &self,
39 ctx: ArrayContext,
40 segment_sink: SegmentSinkRef,
41 stream: SendableSequentialStream,
42 mut eof: SequencePointer,
43 handle: Handle,
44 ) -> VortexResult<LayoutRef> {
45 let dtype = stream.dtype().clone();
46 let dtype2 = dtype.clone();
47 let chunk_strategy = self.chunk_strategy.clone();
48
49 let stream = stream! {
51 let mut stream = stream;
52 while let Some(chunk) = stream.next().await {
53 let chunk_eof = eof.split_off();
54
55 let chunk_strategy = chunk_strategy.clone();
56 let ctx = ctx.clone();
57 let segment_sink = segment_sink.clone();
58 let dtype = dtype2.clone();
59
60 yield handle.spawn_nested(move |handle| async move {
61 chunk_strategy
62 .write_stream(
63 ctx,
64 segment_sink,
65 SequentialStreamAdapter::new(
66 dtype,
67 stream::iter([chunk]),
68 )
69 .sendable(),
70 chunk_eof,
71 handle,
72 )
73 .await
74 })
75 }
76 };
77
78 let mut child_layouts: Vec<LayoutRef> = stream.buffered(usize::MAX).try_collect().await?;
80
81 if child_layouts.len() == 1 {
82 Ok(child_layouts.pop().vortex_expect("must have one child"))
83 } else {
84 let row_count = child_layouts.iter().map(|layout| layout.row_count()).sum();
85 Ok(ChunkedLayout::new(
86 row_count,
87 dtype,
88 OwnedLayoutChildren::layout_children(child_layouts),
89 )
90 .into_layout())
91 }
92 }
93
94 fn buffered_bytes(&self) -> u64 {
95 self.chunk_strategy.buffered_bytes()
96 }
97}