vortex_layout/layouts/zoned/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::stream::once;
9use futures::{FutureExt, StreamExt as _};
10use parking_lot::Mutex;
11use vortex_array::stats::{PRUNING_STATS, Stat};
12use vortex_array::stream::{ArrayStreamAdapter, ArrayStreamExt};
13use vortex_array::{ArrayContext, ArrayRef};
14use vortex_error::VortexResult;
15
16use crate::layouts::zoned::ZonedLayout;
17use crate::layouts::zoned::zone_map::StatsAccumulator;
18use crate::segments::SequenceWriter;
19use crate::sequence::SequenceId;
20use crate::{
21    IntoLayout, LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
22    SequentialStreamExt, TaskExecutor, TaskExecutorExt,
23};
24
25pub struct ZonedLayoutOptions {
26    /// The size of a statistics block
27    pub block_size: usize,
28    /// The statistics to collect for each block.
29    pub stats: Arc<[Stat]>,
30    /// Maximum length of a variable length statistics
31    pub max_variable_length_statistics_size: usize,
32    /// Number of chunks to compute in parallel.
33    pub parallelism: usize,
34}
35
36impl Default for ZonedLayoutOptions {
37    fn default() -> Self {
38        Self {
39            block_size: 8192,
40            stats: PRUNING_STATS.into(),
41            max_variable_length_statistics_size: 64,
42            parallelism: 16,
43        }
44    }
45}
46
47pub struct ZonedStrategy<Child, Stats> {
48    child: Child,
49    stats: Stats,
50    options: ZonedLayoutOptions,
51    executor: Arc<dyn TaskExecutor>,
52}
53
54impl<Child, Stats> ZonedStrategy<Child, Stats>
55where
56    Child: LayoutStrategy,
57    Stats: LayoutStrategy,
58{
59    pub fn new(
60        child: Child,
61        stats: Stats,
62        options: ZonedLayoutOptions,
63        executor: Arc<dyn TaskExecutor>,
64    ) -> Self {
65        Self {
66            child,
67            stats,
68            options,
69            executor,
70        }
71    }
72}
73
74#[async_trait]
75impl<Child, Stats> LayoutStrategy for ZonedStrategy<Child, Stats>
76where
77    Child: LayoutStrategy,
78    Stats: LayoutStrategy,
79{
80    async fn write_stream(
81        &self,
82        ctx: &ArrayContext,
83        sequence_writer: SequenceWriter,
84        stream: SendableSequentialStream,
85    ) -> VortexResult<LayoutRef> {
86        let executor = self.executor.clone();
87        let stats = self.options.stats.clone();
88        let precomputed_stream = SequentialStreamAdapter::new(
89            stream.dtype().clone(),
90            stream
91                .map(move |chunk| {
92                    let stats = stats.clone();
93                    async move {
94                        let (sequence_id, chunk) = chunk?;
95                        chunk.statistics().compute_all(&stats)?;
96                        VortexResult::Ok((sequence_id, chunk))
97                    }
98                    .boxed()
99                })
100                .map(move |stats_future| executor.spawn(stats_future))
101                .buffered(self.options.parallelism),
102        )
103        .sendable();
104
105        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
106            precomputed_stream.dtype(),
107            &self.options.stats,
108            self.options.max_variable_length_statistics_size,
109        )));
110        let stream = SequentialStreamAdapter::new(
111            precomputed_stream.dtype().clone(),
112            precomputed_stream.scan(stats_accumulator.clone(), |acc, item| {
113                future::ready(Some(accumulate_stats(acc, item)))
114            }),
115        )
116        .sendable();
117
118        let ctx = ctx.clone();
119        let block_size = self.options.block_size;
120        let data_layout = self
121            .child
122            .write_stream(&ctx, sequence_writer.clone(), stream)
123            .await?;
124
125        let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
126            // If we have no stats (e.g. the DType doesn't support them), then we just return the
127            // child layout.
128            return Ok(data_layout);
129        };
130        // We must defer creating the stats table LayoutWriter until now, because the DType of
131        // the table depends on which stats were successfully computed.
132        let stats_array = stats_table.array().to_array().clone();
133
134        let stats_stream = sequence_writer.new_sequential(ArrayStreamExt::boxed(
135            ArrayStreamAdapter::new(stats_array.dtype().clone(), once(async { Ok(stats_array) })),
136        ));
137
138        let zones_layout = self
139            .stats
140            .write_stream(&ctx, sequence_writer, stats_stream)
141            .await?;
142
143        Ok(ZonedLayout::new(
144            data_layout,
145            zones_layout,
146            block_size,
147            stats_table.present_stats().clone(),
148        )
149        .into_layout())
150    }
151}
152
153fn accumulate_stats(
154    stats_accumulator: &mut Arc<Mutex<StatsAccumulator>>,
155    item: VortexResult<(SequenceId, ArrayRef)>,
156) -> VortexResult<(SequenceId, ArrayRef)> {
157    let (sequence_id, chunk) = item?;
158    stats_accumulator.lock().push_chunk(&chunk)?;
159    Ok((sequence_id, chunk))
160}