vortex_layout/layouts/zoned/
writer.rs1use std::future;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::stream::once;
9use futures::{FutureExt, StreamExt as _};
10use parking_lot::Mutex;
11use vortex_array::stats::{PRUNING_STATS, Stat};
12use vortex_array::stream::{ArrayStreamAdapter, ArrayStreamExt};
13use vortex_array::{ArrayContext, ArrayRef};
14use vortex_error::VortexResult;
15
16use crate::layouts::zoned::ZonedLayout;
17use crate::layouts::zoned::zone_map::StatsAccumulator;
18use crate::segments::SequenceWriter;
19use crate::sequence::SequenceId;
20use crate::{
21 IntoLayout, LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
22 SequentialStreamExt, TaskExecutor, TaskExecutorExt,
23};
24
25pub struct ZonedLayoutOptions {
26 pub block_size: usize,
28 pub stats: Arc<[Stat]>,
30 pub max_variable_length_statistics_size: usize,
32 pub parallelism: usize,
34}
35
36impl Default for ZonedLayoutOptions {
37 fn default() -> Self {
38 Self {
39 block_size: 8192,
40 stats: PRUNING_STATS.into(),
41 max_variable_length_statistics_size: 64,
42 parallelism: 16,
43 }
44 }
45}
46
47pub struct ZonedStrategy<Child, Stats> {
48 child: Child,
49 stats: Stats,
50 options: ZonedLayoutOptions,
51 executor: Arc<dyn TaskExecutor>,
52}
53
54impl<Child, Stats> ZonedStrategy<Child, Stats>
55where
56 Child: LayoutStrategy,
57 Stats: LayoutStrategy,
58{
59 pub fn new(
60 child: Child,
61 stats: Stats,
62 options: ZonedLayoutOptions,
63 executor: Arc<dyn TaskExecutor>,
64 ) -> Self {
65 Self {
66 child,
67 stats,
68 options,
69 executor,
70 }
71 }
72}
73
74#[async_trait]
75impl<Child, Stats> LayoutStrategy for ZonedStrategy<Child, Stats>
76where
77 Child: LayoutStrategy,
78 Stats: LayoutStrategy,
79{
80 async fn write_stream(
81 &self,
82 ctx: &ArrayContext,
83 sequence_writer: SequenceWriter,
84 stream: SendableSequentialStream,
85 ) -> VortexResult<LayoutRef> {
86 let executor = self.executor.clone();
87 let stats = self.options.stats.clone();
88 let precomputed_stream = SequentialStreamAdapter::new(
89 stream.dtype().clone(),
90 stream
91 .map(move |chunk| {
92 let stats = stats.clone();
93 async move {
94 let (sequence_id, chunk) = chunk?;
95 chunk.statistics().compute_all(&stats)?;
96 VortexResult::Ok((sequence_id, chunk))
97 }
98 .boxed()
99 })
100 .map(move |stats_future| executor.spawn(stats_future))
101 .buffered(self.options.parallelism),
102 )
103 .sendable();
104
105 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
106 precomputed_stream.dtype(),
107 &self.options.stats,
108 self.options.max_variable_length_statistics_size,
109 )));
110 let stream = SequentialStreamAdapter::new(
111 precomputed_stream.dtype().clone(),
112 precomputed_stream.scan(stats_accumulator.clone(), |acc, item| {
113 future::ready(Some(accumulate_stats(acc, item)))
114 }),
115 )
116 .sendable();
117
118 let ctx = ctx.clone();
119 let block_size = self.options.block_size;
120 let data_layout = self
121 .child
122 .write_stream(&ctx, sequence_writer.clone(), stream)
123 .await?;
124
125 let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
126 return Ok(data_layout);
129 };
130 let stats_array = stats_table.array().to_array().clone();
133
134 let stats_stream = sequence_writer.new_sequential(ArrayStreamExt::boxed(
135 ArrayStreamAdapter::new(stats_array.dtype().clone(), once(async { Ok(stats_array) })),
136 ));
137
138 let zones_layout = self
139 .stats
140 .write_stream(&ctx, sequence_writer, stats_stream)
141 .await?;
142
143 Ok(ZonedLayout::new(
144 data_layout,
145 zones_layout,
146 block_size,
147 stats_table.present_stats().clone(),
148 )
149 .into_layout())
150 }
151}
152
153fn accumulate_stats(
154 stats_accumulator: &mut Arc<Mutex<StatsAccumulator>>,
155 item: VortexResult<(SequenceId, ArrayRef)>,
156) -> VortexResult<(SequenceId, ArrayRef)> {
157 let (sequence_id, chunk) = item?;
158 stats_accumulator.lock().push_chunk(&chunk)?;
159 Ok((sequence_id, chunk))
160}