vortex_layout/layouts/zoned/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use arcref::ArcRef;
8use futures::stream::once;
9use futures::{FutureExt, StreamExt as _};
10use parking_lot::Mutex;
11use vortex_array::stats::{PRUNING_STATS, Stat};
12use vortex_array::stream::{ArrayStreamAdapter, ArrayStreamExt};
13use vortex_array::{ArrayContext, ArrayRef};
14use vortex_error::VortexResult;
15
16use crate::layouts::zoned::ZonedLayout;
17use crate::layouts::zoned::zone_map::StatsAccumulator;
18use crate::segments::SequenceWriter;
19use crate::sequence::SequenceId;
20use crate::{
21    IntoLayout, LayoutStrategy, SendableLayoutFuture, SendableSequentialStream,
22    SequentialStreamAdapter, SequentialStreamExt, TaskExecutor, TaskExecutorExt,
23};
24
25pub struct ZonedLayoutOptions {
26    /// The size of a statistics block
27    pub block_size: usize,
28    /// The statistics to collect for each block.
29    pub stats: Arc<[Stat]>,
30    /// Maximum length of a variable length statistics
31    pub max_variable_length_statistics_size: usize,
32    /// Number of chunks to compute in parallel.
33    pub parallelism: usize,
34}
35
36impl Default for ZonedLayoutOptions {
37    fn default() -> Self {
38        Self {
39            block_size: 8192,
40            stats: PRUNING_STATS.into(),
41            max_variable_length_statistics_size: 64,
42            parallelism: 16,
43        }
44    }
45}
46
47pub struct ZonedStrategy {
48    child: ArcRef<dyn LayoutStrategy>,
49    stats: ArcRef<dyn LayoutStrategy>,
50    options: ZonedLayoutOptions,
51    executor: Arc<dyn TaskExecutor>,
52}
53
54impl ZonedStrategy {
55    pub fn new(
56        child: ArcRef<dyn LayoutStrategy>,
57        stats: ArcRef<dyn LayoutStrategy>,
58        options: ZonedLayoutOptions,
59        executor: Arc<dyn TaskExecutor>,
60    ) -> Self {
61        Self {
62            child,
63            stats,
64            options,
65            executor,
66        }
67    }
68}
69
70impl LayoutStrategy for ZonedStrategy {
71    fn write_stream(
72        &self,
73        ctx: &ArrayContext,
74        sequence_writer: SequenceWriter,
75        stream: SendableSequentialStream,
76    ) -> SendableLayoutFuture {
77        let executor = self.executor.clone();
78        let stats = self.options.stats.clone();
79        let precomputed_stream = SequentialStreamAdapter::new(
80            stream.dtype().clone(),
81            stream
82                .map(move |chunk| {
83                    let stats = stats.clone();
84                    async move {
85                        let (sequence_id, chunk) = chunk?;
86                        chunk.statistics().compute_all(&stats)?;
87                        VortexResult::Ok((sequence_id, chunk))
88                    }
89                    .boxed()
90                })
91                .map(move |stats_future| executor.spawn(stats_future))
92                .buffered(self.options.parallelism),
93        )
94        .sendable();
95
96        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
97            precomputed_stream.dtype(),
98            &self.options.stats,
99            self.options.max_variable_length_statistics_size,
100        )));
101        let stream = SequentialStreamAdapter::new(
102            precomputed_stream.dtype().clone(),
103            precomputed_stream.scan(stats_accumulator.clone(), |acc, item| {
104                future::ready(Some(accumulate_stats(acc, item)))
105            }),
106        )
107        .sendable();
108
109        let ctx = ctx.clone();
110        let child = self.child.clone();
111        let stats_strategy = self.stats.clone();
112        let block_size = self.options.block_size;
113        Box::pin(async move {
114            let data_layout = child
115                .write_stream(&ctx, sequence_writer.clone(), stream)
116                .await?;
117
118            let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
119                // If we have no stats (e.g. the DType doesn't support them), then we just return the
120                // child layout.
121                return Ok(data_layout);
122            };
123            // We must defer creating the stats table LayoutWriter until now, because the DType of
124            // the table depends on which stats were successfully computed.
125            let stats_array = stats_table.array().to_array().clone();
126
127            let stats_stream =
128                sequence_writer.new_sequential(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
129                    stats_array.dtype().clone(),
130                    once(async { Ok(stats_array) }),
131                )));
132
133            let zones_layout = stats_strategy
134                .write_stream(&ctx, sequence_writer, stats_stream)
135                .await?;
136
137            Ok(ZonedLayout::new(
138                data_layout,
139                zones_layout,
140                block_size,
141                stats_table.present_stats().clone(),
142            )
143            .into_layout())
144        })
145    }
146}
147
148fn accumulate_stats(
149    stats_accumulator: &mut Arc<Mutex<StatsAccumulator>>,
150    item: VortexResult<(SequenceId, ArrayRef)>,
151) -> VortexResult<(SequenceId, ArrayRef)> {
152    let (sequence_id, chunk) = item?;
153    stats_accumulator.lock().push_chunk(&chunk)?;
154    Ok((sequence_id, chunk))
155}