vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::IntoArray;
11use vortex_array::expr::stats::Stat;
12use vortex_array::stats::PRUNING_STATS;
13use vortex_error::VortexResult;
14use vortex_io::session::RuntimeSessionExt;
15use vortex_session::VortexSession;
16
17use crate::IntoLayout;
18use crate::LayoutRef;
19use crate::LayoutStrategy;
20use crate::layouts::zoned::ZonedLayout;
21use crate::layouts::zoned::zone_map::StatsAccumulator;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialArrayStreamExt;
26use crate::sequence::SequentialStreamAdapter;
27use crate::sequence::SequentialStreamExt;
28
29pub struct ZonedLayoutOptions {
30 pub block_size: usize,
32 pub stats: Arc<[Stat]>,
34 pub max_variable_length_statistics_size: usize,
36 pub concurrency: usize,
38}
39
40impl Default for ZonedLayoutOptions {
41 fn default() -> Self {
42 Self {
43 block_size: 8192,
44 stats: PRUNING_STATS.into(),
45 max_variable_length_statistics_size: 64,
46 concurrency: std::thread::available_parallelism()
47 .map(|v| v.get())
48 .unwrap_or(1),
49 }
50 }
51}
52
53pub struct ZonedStrategy {
54 child: Arc<dyn LayoutStrategy>,
55 stats: Arc<dyn LayoutStrategy>,
56 options: ZonedLayoutOptions,
57}
58
59impl ZonedStrategy {
60 pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
61 child: Child,
62 stats: Stats,
63 options: ZonedLayoutOptions,
64 ) -> Self {
65 Self {
66 child: Arc::new(child),
67 stats: Arc::new(stats),
68 options,
69 }
70 }
71}
72
73#[async_trait]
74impl LayoutStrategy for ZonedStrategy {
75 async fn write_stream(
76 &self,
77 ctx: ArrayContext,
78 segment_sink: SegmentSinkRef,
79 stream: SendableSequentialStream,
80 mut eof: SequencePointer,
81 session: &VortexSession,
82 ) -> VortexResult<LayoutRef> {
83 let stats = Arc::clone(&self.options.stats);
84 let handle = session.handle();
85 let handle2 = handle.clone();
86
87 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
88 stream.dtype(),
89 &stats,
90 self.options.max_variable_length_statistics_size,
91 )));
92
93 let stream = SequentialStreamAdapter::new(
95 stream.dtype().clone(),
96 stream
97 .map(move |chunk| {
98 let stats = Arc::clone(&stats);
99 handle2.spawn_cpu(move || {
100 let (sequence_id, chunk) = chunk?;
101 chunk.statistics().compute_all(&stats)?;
102 VortexResult::Ok((sequence_id, chunk))
103 })
104 })
105 .buffered(self.options.concurrency),
106 )
107 .sendable();
108
109 let stats_accumulator2 = Arc::clone(&stats_accumulator);
112 let stream = SequentialStreamAdapter::new(
113 stream.dtype().clone(),
114 stream.map(move |item| {
115 let (sequence_id, chunk) = item?;
116 stats_accumulator2
118 .lock()
119 .push_chunk_without_compute(&chunk)?;
120 Ok((sequence_id, chunk))
121 }),
122 )
123 .sendable();
124
125 let block_size = self.options.block_size;
126
127 let data_eof = eof.split_off();
129 let data_layout = self
130 .child
131 .write_stream(
132 ctx.clone(),
133 Arc::clone(&segment_sink),
134 stream,
135 data_eof,
136 session,
137 )
138 .await?;
139
140 let Some(stats_table) = stats_accumulator.lock().as_stats_table()? else {
141 return Ok(data_layout);
144 };
145
146 let stats_stream = stats_table
149 .array()
150 .clone()
151 .into_array()
152 .to_array_stream()
153 .sequenced(eof.split_off());
154 let zones_layout = self
155 .stats
156 .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, session)
157 .await?;
158
159 Ok(ZonedLayout::new(
160 data_layout,
161 zones_layout,
162 block_size,
163 Arc::clone(stats_table.present_stats()),
164 )
165 .into_layout())
166 }
167
168 fn buffered_bytes(&self) -> u64 {
169 self.child.buffered_bytes() + self.stats.buffered_bytes()
170 }
171}