vortex_layout/layouts/zoned/
writer.rs1use std::future;
5use std::sync::Arc;
6
7use arcref::ArcRef;
8use futures::stream::once;
9use futures::{FutureExt, StreamExt as _};
10use parking_lot::Mutex;
11use vortex_array::stats::{PRUNING_STATS, Stat};
12use vortex_array::stream::{ArrayStreamAdapter, ArrayStreamExt};
13use vortex_array::{ArrayContext, ArrayRef};
14use vortex_error::VortexResult;
15
16use crate::layouts::zoned::ZonedLayout;
17use crate::layouts::zoned::zone_map::StatsAccumulator;
18use crate::segments::SequenceWriter;
19use crate::sequence::SequenceId;
20use crate::{
21 IntoLayout, LayoutStrategy, SendableLayoutFuture, SendableSequentialStream,
22 SequentialStreamAdapter, SequentialStreamExt, TaskExecutor, TaskExecutorExt,
23};
24
25pub struct ZonedLayoutOptions {
26 pub block_size: usize,
28 pub stats: Arc<[Stat]>,
30 pub max_variable_length_statistics_size: usize,
32 pub parallelism: usize,
34}
35
36impl Default for ZonedLayoutOptions {
37 fn default() -> Self {
38 Self {
39 block_size: 8192,
40 stats: PRUNING_STATS.into(),
41 max_variable_length_statistics_size: 64,
42 parallelism: 16,
43 }
44 }
45}
46
47pub struct ZonedStrategy {
48 child: ArcRef<dyn LayoutStrategy>,
49 stats: ArcRef<dyn LayoutStrategy>,
50 options: ZonedLayoutOptions,
51 executor: Arc<dyn TaskExecutor>,
52}
53
54impl ZonedStrategy {
55 pub fn new(
56 child: ArcRef<dyn LayoutStrategy>,
57 stats: ArcRef<dyn LayoutStrategy>,
58 options: ZonedLayoutOptions,
59 executor: Arc<dyn TaskExecutor>,
60 ) -> Self {
61 Self {
62 child,
63 stats,
64 options,
65 executor,
66 }
67 }
68}
69
70impl LayoutStrategy for ZonedStrategy {
71 fn write_stream(
72 &self,
73 ctx: &ArrayContext,
74 sequence_writer: SequenceWriter,
75 stream: SendableSequentialStream,
76 ) -> SendableLayoutFuture {
77 let executor = self.executor.clone();
78 let stats = self.options.stats.clone();
79 let precomputed_stream = SequentialStreamAdapter::new(
80 stream.dtype().clone(),
81 stream
82 .map(move |chunk| {
83 let stats = stats.clone();
84 async move {
85 let (sequence_id, chunk) = chunk?;
86 chunk.statistics().compute_all(&stats)?;
87 VortexResult::Ok((sequence_id, chunk))
88 }
89 .boxed()
90 })
91 .map(move |stats_future| executor.spawn(stats_future))
92 .buffered(self.options.parallelism),
93 )
94 .sendable();
95
96 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
97 precomputed_stream.dtype(),
98 &self.options.stats,
99 self.options.max_variable_length_statistics_size,
100 )));
101 let stream = SequentialStreamAdapter::new(
102 precomputed_stream.dtype().clone(),
103 precomputed_stream.scan(stats_accumulator.clone(), |acc, item| {
104 future::ready(Some(accumulate_stats(acc, item)))
105 }),
106 )
107 .sendable();
108
109 let ctx = ctx.clone();
110 let child = self.child.clone();
111 let stats_strategy = self.stats.clone();
112 let block_size = self.options.block_size;
113 Box::pin(async move {
114 let data_layout = child
115 .write_stream(&ctx, sequence_writer.clone(), stream)
116 .await?;
117
118 let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
119 return Ok(data_layout);
122 };
123 let stats_array = stats_table.array().to_array().clone();
126
127 let stats_stream =
128 sequence_writer.new_sequential(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
129 stats_array.dtype().clone(),
130 once(async { Ok(stats_array) }),
131 )));
132
133 let zones_layout = stats_strategy
134 .write_stream(&ctx, sequence_writer, stats_stream)
135 .await?;
136
137 Ok(ZonedLayout::new(
138 data_layout,
139 zones_layout,
140 block_size,
141 stats_table.present_stats().clone(),
142 )
143 .into_layout())
144 })
145 }
146}
147
148fn accumulate_stats(
149 stats_accumulator: &mut Arc<Mutex<StatsAccumulator>>,
150 item: VortexResult<(SequenceId, ArrayRef)>,
151) -> VortexResult<(SequenceId, ArrayRef)> {
152 let (sequence_id, chunk) = item?;
153 stats_accumulator.lock().push_chunk(&chunk)?;
154 Ok((sequence_id, chunk))
155}