vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::IntoArray;
11use vortex_array::VortexSessionExecute;
12use vortex_array::expr::stats::Stat;
13use vortex_array::stats::PRUNING_STATS;
14use vortex_error::VortexResult;
15use vortex_io::session::RuntimeSessionExt;
16use vortex_session::VortexSession;
17
18use crate::IntoLayout;
19use crate::LayoutRef;
20use crate::LayoutStrategy;
21use crate::layouts::zoned::ZonedLayout;
22use crate::layouts::zoned::zone_map::StatsAccumulator;
23use crate::segments::SegmentSinkRef;
24use crate::sequence::SendableSequentialStream;
25use crate::sequence::SequencePointer;
26use crate::sequence::SequentialArrayStreamExt;
27use crate::sequence::SequentialStreamAdapter;
28use crate::sequence::SequentialStreamExt;
29
30pub struct ZonedLayoutOptions {
31 pub block_size: usize,
33 pub stats: Arc<[Stat]>,
35 pub max_variable_length_statistics_size: usize,
37 pub concurrency: usize,
39}
40
41impl Default for ZonedLayoutOptions {
42 fn default() -> Self {
43 Self {
44 block_size: 8192,
45 stats: PRUNING_STATS.into(),
46 max_variable_length_statistics_size: 64,
47 concurrency: std::thread::available_parallelism()
48 .map(|v| v.get())
49 .unwrap_or(1),
50 }
51 }
52}
53
54pub struct ZonedStrategy {
55 child: Arc<dyn LayoutStrategy>,
56 stats: Arc<dyn LayoutStrategy>,
57 options: ZonedLayoutOptions,
58}
59
60impl ZonedStrategy {
61 pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
62 child: Child,
63 stats: Stats,
64 options: ZonedLayoutOptions,
65 ) -> Self {
66 Self {
67 child: Arc::new(child),
68 stats: Arc::new(stats),
69 options,
70 }
71 }
72}
73
74#[async_trait]
75impl LayoutStrategy for ZonedStrategy {
76 async fn write_stream(
77 &self,
78 ctx: ArrayContext,
79 segment_sink: SegmentSinkRef,
80 stream: SendableSequentialStream,
81 mut eof: SequencePointer,
82 session: &VortexSession,
83 ) -> VortexResult<LayoutRef> {
84 let stats = Arc::clone(&self.options.stats);
85 let session = session.clone();
86 let compute_session = session.clone();
87 let handle = session.handle();
88 let handle2 = handle.clone();
89
90 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
91 stream.dtype(),
92 &stats,
93 self.options.max_variable_length_statistics_size,
94 )));
95
96 let stream = SequentialStreamAdapter::new(
98 stream.dtype().clone(),
99 stream
100 .map(move |chunk| {
101 let stats = Arc::clone(&stats);
102 let session = compute_session.clone();
103 handle2.spawn_cpu(move || {
104 let (sequence_id, chunk) = chunk?;
105 chunk
106 .statistics()
107 .compute_all(&stats, &mut session.create_execution_ctx())?;
108 VortexResult::Ok((sequence_id, chunk))
109 })
110 })
111 .buffered(self.options.concurrency),
112 )
113 .sendable();
114
115 let stats_accumulator2 = Arc::clone(&stats_accumulator);
118 let stream = SequentialStreamAdapter::new(
119 stream.dtype().clone(),
120 stream.map(move |item| {
121 let (sequence_id, chunk) = item?;
122 stats_accumulator2
124 .lock()
125 .push_chunk_without_compute(&chunk)?;
126 Ok((sequence_id, chunk))
127 }),
128 )
129 .sendable();
130
131 let block_size = self.options.block_size;
132
133 let data_eof = eof.split_off();
135 let data_layout = self
136 .child
137 .write_stream(
138 ctx.clone(),
139 Arc::clone(&segment_sink),
140 stream,
141 data_eof,
142 &session,
143 )
144 .await?;
145
146 let Some(stats_table) = stats_accumulator.lock().as_stats_table()? else {
147 return Ok(data_layout);
150 };
151
152 let stats_stream = stats_table
155 .array()
156 .clone()
157 .into_array()
158 .to_array_stream()
159 .sequenced(eof.split_off());
160 let zones_layout = self
161 .stats
162 .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session)
163 .await?;
164
165 Ok(ZonedLayout::new(
166 data_layout,
167 zones_layout,
168 block_size,
169 Arc::clone(stats_table.present_stats()),
170 )
171 .into_layout())
172 }
173
174 fn buffered_bytes(&self) -> u64 {
175 self.child.buffered_bytes() + self.stats.buffered_bytes()
176 }
177}