vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::expr::stats::Stat;
11use vortex_array::stats::PRUNING_STATS;
12use vortex_error::VortexResult;
13use vortex_io::runtime::Handle;
14
15use crate::IntoLayout;
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::layouts::zoned::ZonedLayout;
19use crate::layouts::zoned::zone_map::StatsAccumulator;
20use crate::segments::SegmentSinkRef;
21use crate::sequence::SendableSequentialStream;
22use crate::sequence::SequencePointer;
23use crate::sequence::SequentialArrayStreamExt;
24use crate::sequence::SequentialStreamAdapter;
25use crate::sequence::SequentialStreamExt;
26
27pub struct ZonedLayoutOptions {
28 pub block_size: usize,
30 pub stats: Arc<[Stat]>,
32 pub max_variable_length_statistics_size: usize,
34 pub concurrency: usize,
36}
37
38impl Default for ZonedLayoutOptions {
39 fn default() -> Self {
40 Self {
41 block_size: 8192,
42 stats: PRUNING_STATS.into(),
43 max_variable_length_statistics_size: 64,
44 concurrency: std::thread::available_parallelism()
45 .map(|v| v.get())
46 .unwrap_or(1),
47 }
48 }
49}
50
51pub struct ZonedStrategy {
52 child: Arc<dyn LayoutStrategy>,
53 stats: Arc<dyn LayoutStrategy>,
54 options: ZonedLayoutOptions,
55}
56
57impl ZonedStrategy {
58 pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
59 child: Child,
60 stats: Stats,
61 options: ZonedLayoutOptions,
62 ) -> Self {
63 Self {
64 child: Arc::new(child),
65 stats: Arc::new(stats),
66 options,
67 }
68 }
69}
70
71#[async_trait]
72impl LayoutStrategy for ZonedStrategy {
73 async fn write_stream(
74 &self,
75 ctx: ArrayContext,
76 segment_sink: SegmentSinkRef,
77 stream: SendableSequentialStream,
78 mut eof: SequencePointer,
79 handle: Handle,
80 ) -> VortexResult<LayoutRef> {
81 let stats = self.options.stats.clone();
82 let handle2 = handle.clone();
83
84 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
85 stream.dtype(),
86 &stats,
87 self.options.max_variable_length_statistics_size,
88 )));
89
90 let stream = SequentialStreamAdapter::new(
92 stream.dtype().clone(),
93 stream
94 .map(move |chunk| {
95 let stats = stats.clone();
96 handle2.spawn_cpu(move || {
97 let (sequence_id, chunk) = chunk?;
98 chunk.statistics().compute_all(&stats)?;
99 VortexResult::Ok((sequence_id, chunk))
100 })
101 })
102 .buffered(self.options.concurrency),
103 )
104 .sendable();
105
106 let stats_accumulator2 = stats_accumulator.clone();
109 let stream = SequentialStreamAdapter::new(
110 stream.dtype().clone(),
111 stream.map(move |item| {
112 let (sequence_id, chunk) = item?;
113 stats_accumulator2
115 .lock()
116 .push_chunk_without_compute(&chunk)?;
117 Ok((sequence_id, chunk))
118 }),
119 )
120 .sendable();
121
122 let block_size = self.options.block_size;
123
124 let data_eof = eof.split_off();
126 let data_layout = self
127 .child
128 .write_stream(
129 ctx.clone(),
130 segment_sink.clone(),
131 stream,
132 data_eof,
133 handle.clone(),
134 )
135 .await?;
136
137 let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
138 return Ok(data_layout);
141 };
142
143 let stats_stream = stats_table
146 .array()
147 .to_array_stream()
148 .sequenced(eof.split_off());
149 let zones_layout = self
150 .stats
151 .write_stream(ctx, segment_sink.clone(), stats_stream, eof, handle)
152 .await?;
153
154 Ok(ZonedLayout::new(
155 data_layout,
156 zones_layout,
157 block_size,
158 stats_table.present_stats().clone(),
159 )
160 .into_layout())
161 }
162
163 fn buffered_bytes(&self) -> u64 {
164 self.child.buffered_bytes() + self.stats.buffered_bytes()
165 }
166}