Skip to main content

vortex_layout/layouts/zoned/
writer.rs

1//! Write-time assembly for zoned layouts.
2
3// SPDX-License-Identifier: Apache-2.0
4// SPDX-FileCopyrightText: Copyright the Vortex contributors
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use futures::StreamExt as _;
10use parking_lot::Mutex;
11use vortex_array::ArrayContext;
12use vortex_array::IntoArray;
13use vortex_array::VortexSessionExecute;
14use vortex_array::expr::stats::Stat;
15use vortex_array::stats::PRUNING_STATS;
16use vortex_error::VortexResult;
17use vortex_error::vortex_ensure;
18use vortex_io::session::RuntimeSessionExt;
19use vortex_session::VortexSession;
20use vortex_utils::parallelism::get_available_parallelism;
21
22use crate::IntoLayout;
23use crate::LayoutRef;
24use crate::LayoutStrategy;
25use crate::layouts::zoned::StatsAccumulator;
26use crate::layouts::zoned::ZonedLayout;
27use crate::segments::SegmentSinkRef;
28use crate::sequence::SendableSequentialStream;
29use crate::sequence::SequencePointer;
30use crate::sequence::SequentialArrayStreamExt;
31use crate::sequence::SequentialStreamAdapter;
32use crate::sequence::SequentialStreamExt;
33
34/// Configuration for building zoned layouts.
35///
36/// The input stream is assumed to already be partitioned into one chunk per zone, except
37/// possibly the final partial zone.
38pub struct ZonedLayoutOptions {
39    /// The size of a statistics block
40    pub block_size: usize,
41    /// The statistics to collect for each block.
42    pub stats: Arc<[Stat]>,
43    /// Maximum length of a variable length statistics
44    pub max_variable_length_statistics_size: usize,
45    /// Number of chunks to compute in parallel.
46    pub concurrency: usize,
47}
48
49impl Default for ZonedLayoutOptions {
50    fn default() -> Self {
51        Self {
52            block_size: 8192,
53            stats: PRUNING_STATS.into(),
54            max_variable_length_statistics_size: 64,
55            concurrency: get_available_parallelism().unwrap_or(1),
56        }
57    }
58}
59
60pub struct ZonedStrategy {
61    child: Arc<dyn LayoutStrategy>,
62    stats: Arc<dyn LayoutStrategy>,
63    options: ZonedLayoutOptions,
64}
65
66impl ZonedStrategy {
67    /// Create a writer that emits a data child plus an auxiliary per-zone stats child.
68    pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
69        child: Child,
70        stats: Stats,
71        options: ZonedLayoutOptions,
72    ) -> Self {
73        Self {
74            child: Arc::new(child),
75            stats: Arc::new(stats),
76            options,
77        }
78    }
79}
80
81#[async_trait]
82impl LayoutStrategy for ZonedStrategy {
83    async fn write_stream(
84        &self,
85        ctx: ArrayContext,
86        segment_sink: SegmentSinkRef,
87        stream: SendableSequentialStream,
88        mut eof: SequencePointer,
89        session: &VortexSession,
90    ) -> VortexResult<LayoutRef> {
91        vortex_ensure!(
92            self.options.block_size > 0,
93            "ZonedStrategy requires block_size > 0 when writing"
94        );
95
96        let stats = Arc::clone(&self.options.stats);
97        let session = session.clone();
98        let compute_session = session.clone();
99        let handle = session.handle();
100        let handle2 = handle.clone();
101
102        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
103            stream.dtype(),
104            &stats,
105            self.options.max_variable_length_statistics_size,
106        )));
107
108        // We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
109        let stream = SequentialStreamAdapter::new(
110            stream.dtype().clone(),
111            stream
112                .map(move |chunk| {
113                    let stats = Arc::clone(&stats);
114                    let session = compute_session.clone();
115                    handle2.spawn_cpu(move || {
116                        let (sequence_id, chunk) = chunk?;
117                        chunk
118                            .statistics()
119                            .compute_all(&stats, &mut session.create_execution_ctx())?;
120                        Ok((sequence_id, chunk))
121                    })
122                })
123                .buffered(self.options.concurrency),
124        )
125        .sendable();
126
127        // Now we accumulate the stats we computed above, this time we cannot spawn because we
128        // need to feed the accumulator an ordered stream.
129        let stats_accumulator2 = Arc::clone(&stats_accumulator);
130        let stream = SequentialStreamAdapter::new(
131            stream.dtype().clone(),
132            stream.map(move |item| {
133                let (sequence_id, chunk) = item?;
134                // We have already computed per-chunk statistics, so avoid trying again for any that failed.
135                stats_accumulator2
136                    .lock()
137                    .push_chunk_without_compute(&chunk)?;
138                Ok((sequence_id, chunk))
139            }),
140        )
141        .sendable();
142
143        let block_size = self.options.block_size;
144
145        // The eof used for the data child should appear _before_ our own stats tables.
146        let data_eof = eof.split_off();
147        let data_layout = self
148            .child
149            .write_stream(
150                ctx.clone(),
151                Arc::clone(&segment_sink),
152                stream,
153                data_eof,
154                &session,
155            )
156            .await?;
157
158        let Some((stats_array, stats)) = stats_accumulator.lock().as_array()? else {
159            // If we have no stats (e.g. the DType doesn't support them), then we just return the
160            // child layout.
161            return Ok(data_layout);
162        };
163
164        // We must defer creating the stats table LayoutWriter until now, because the DType of
165        // the table depends on which stats were successfully computed.
166        let stats_stream = stats_array
167            .into_array()
168            .to_array_stream()
169            .sequenced(eof.split_off());
170        let zones_layout = self
171            .stats
172            .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session)
173            .await?;
174
175        Ok(ZonedLayout::new(data_layout, zones_layout, block_size, stats).into_layout())
176    }
177
178    fn buffered_bytes(&self) -> u64 {
179        self.child.buffered_bytes() + self.stats.buffered_bytes()
180    }
181}