vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
7
8use async_trait::async_trait;
9use futures::StreamExt as _;
10use parking_lot::Mutex;
11use vortex_array::ArrayContext;
12use vortex_array::IntoArray;
13use vortex_array::VortexSessionExecute;
14use vortex_array::expr::stats::Stat;
15use vortex_array::stats::PRUNING_STATS;
16use vortex_error::VortexResult;
17use vortex_error::vortex_ensure;
18use vortex_io::session::RuntimeSessionExt;
19use vortex_session::VortexSession;
20use vortex_utils::parallelism::get_available_parallelism;
21
22use crate::IntoLayout;
23use crate::LayoutRef;
24use crate::LayoutStrategy;
25use crate::layouts::zoned::StatsAccumulator;
26use crate::layouts::zoned::ZonedLayout;
27use crate::segments::SegmentSinkRef;
28use crate::sequence::SendableSequentialStream;
29use crate::sequence::SequencePointer;
30use crate::sequence::SequentialArrayStreamExt;
31use crate::sequence::SequentialStreamAdapter;
32use crate::sequence::SequentialStreamExt;
33
34pub struct ZonedLayoutOptions {
39 pub block_size: usize,
41 pub stats: Arc<[Stat]>,
43 pub max_variable_length_statistics_size: usize,
45 pub concurrency: usize,
47}
48
49impl Default for ZonedLayoutOptions {
50 fn default() -> Self {
51 Self {
52 block_size: 8192,
53 stats: PRUNING_STATS.into(),
54 max_variable_length_statistics_size: 64,
55 concurrency: get_available_parallelism().unwrap_or(1),
56 }
57 }
58}
59
60pub struct ZonedStrategy {
61 child: Arc<dyn LayoutStrategy>,
62 stats: Arc<dyn LayoutStrategy>,
63 options: ZonedLayoutOptions,
64}
65
66impl ZonedStrategy {
67 pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
69 child: Child,
70 stats: Stats,
71 options: ZonedLayoutOptions,
72 ) -> Self {
73 Self {
74 child: Arc::new(child),
75 stats: Arc::new(stats),
76 options,
77 }
78 }
79}
80
81#[async_trait]
82impl LayoutStrategy for ZonedStrategy {
83 async fn write_stream(
84 &self,
85 ctx: ArrayContext,
86 segment_sink: SegmentSinkRef,
87 stream: SendableSequentialStream,
88 mut eof: SequencePointer,
89 session: &VortexSession,
90 ) -> VortexResult<LayoutRef> {
91 vortex_ensure!(
92 self.options.block_size > 0,
93 "ZonedStrategy requires block_size > 0 when writing"
94 );
95
96 let stats = Arc::clone(&self.options.stats);
97 let session = session.clone();
98 let compute_session = session.clone();
99 let handle = session.handle();
100 let handle2 = handle.clone();
101
102 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
103 stream.dtype(),
104 &stats,
105 self.options.max_variable_length_statistics_size,
106 )));
107
108 let stream = SequentialStreamAdapter::new(
110 stream.dtype().clone(),
111 stream
112 .map(move |chunk| {
113 let stats = Arc::clone(&stats);
114 let session = compute_session.clone();
115 handle2.spawn_cpu(move || {
116 let (sequence_id, chunk) = chunk?;
117 chunk
118 .statistics()
119 .compute_all(&stats, &mut session.create_execution_ctx())?;
120 Ok((sequence_id, chunk))
121 })
122 })
123 .buffered(self.options.concurrency),
124 )
125 .sendable();
126
127 let stats_accumulator2 = Arc::clone(&stats_accumulator);
130 let stream = SequentialStreamAdapter::new(
131 stream.dtype().clone(),
132 stream.map(move |item| {
133 let (sequence_id, chunk) = item?;
134 stats_accumulator2
136 .lock()
137 .push_chunk_without_compute(&chunk)?;
138 Ok((sequence_id, chunk))
139 }),
140 )
141 .sendable();
142
143 let block_size = self.options.block_size;
144
145 let data_eof = eof.split_off();
147 let data_layout = self
148 .child
149 .write_stream(
150 ctx.clone(),
151 Arc::clone(&segment_sink),
152 stream,
153 data_eof,
154 &session,
155 )
156 .await?;
157
158 let Some((stats_array, stats)) = stats_accumulator.lock().as_array()? else {
159 return Ok(data_layout);
162 };
163
164 let stats_stream = stats_array
167 .into_array()
168 .to_array_stream()
169 .sequenced(eof.split_off());
170 let zones_layout = self
171 .stats
172 .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session)
173 .await?;
174
175 Ok(ZonedLayout::new(data_layout, zones_layout, block_size, stats).into_layout())
176 }
177
178 fn buffered_bytes(&self) -> u64 {
179 self.child.buffered_bytes() + self.stats.buffered_bytes()
180 }
181}