Skip to main content

vortex_layout/layouts/zoned/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::IntoArray;
11use vortex_array::VortexSessionExecute;
12use vortex_array::expr::stats::Stat;
13use vortex_array::stats::PRUNING_STATS;
14use vortex_error::VortexResult;
15use vortex_io::session::RuntimeSessionExt;
16use vortex_session::VortexSession;
17
18use crate::IntoLayout;
19use crate::LayoutRef;
20use crate::LayoutStrategy;
21use crate::layouts::zoned::ZonedLayout;
22use crate::layouts::zoned::zone_map::StatsAccumulator;
23use crate::segments::SegmentSinkRef;
24use crate::sequence::SendableSequentialStream;
25use crate::sequence::SequencePointer;
26use crate::sequence::SequentialArrayStreamExt;
27use crate::sequence::SequentialStreamAdapter;
28use crate::sequence::SequentialStreamExt;
29
30pub struct ZonedLayoutOptions {
31    /// The size of a statistics block
32    pub block_size: usize,
33    /// The statistics to collect for each block.
34    pub stats: Arc<[Stat]>,
35    /// Maximum length of a variable length statistics
36    pub max_variable_length_statistics_size: usize,
37    /// Number of chunks to compute in parallel.
38    pub concurrency: usize,
39}
40
41impl Default for ZonedLayoutOptions {
42    fn default() -> Self {
43        Self {
44            block_size: 8192,
45            stats: PRUNING_STATS.into(),
46            max_variable_length_statistics_size: 64,
47            concurrency: std::thread::available_parallelism()
48                .map(|v| v.get())
49                .unwrap_or(1),
50        }
51    }
52}
53
54pub struct ZonedStrategy {
55    child: Arc<dyn LayoutStrategy>,
56    stats: Arc<dyn LayoutStrategy>,
57    options: ZonedLayoutOptions,
58}
59
60impl ZonedStrategy {
61    pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
62        child: Child,
63        stats: Stats,
64        options: ZonedLayoutOptions,
65    ) -> Self {
66        Self {
67            child: Arc::new(child),
68            stats: Arc::new(stats),
69            options,
70        }
71    }
72}
73
74#[async_trait]
75impl LayoutStrategy for ZonedStrategy {
76    async fn write_stream(
77        &self,
78        ctx: ArrayContext,
79        segment_sink: SegmentSinkRef,
80        stream: SendableSequentialStream,
81        mut eof: SequencePointer,
82        session: &VortexSession,
83    ) -> VortexResult<LayoutRef> {
84        let stats = Arc::clone(&self.options.stats);
85        let session = session.clone();
86        let compute_session = session.clone();
87        let handle = session.handle();
88        let handle2 = handle.clone();
89
90        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
91            stream.dtype(),
92            &stats,
93            self.options.max_variable_length_statistics_size,
94        )));
95
96        // We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
97        let stream = SequentialStreamAdapter::new(
98            stream.dtype().clone(),
99            stream
100                .map(move |chunk| {
101                    let stats = Arc::clone(&stats);
102                    let session = compute_session.clone();
103                    handle2.spawn_cpu(move || {
104                        let (sequence_id, chunk) = chunk?;
105                        chunk
106                            .statistics()
107                            .compute_all(&stats, &mut session.create_execution_ctx())?;
108                        VortexResult::Ok((sequence_id, chunk))
109                    })
110                })
111                .buffered(self.options.concurrency),
112        )
113        .sendable();
114
115        // Now we accumulate the stats we computed above, this time we cannot spawn because we
116        // need to feed the accumulator an ordered stream.
117        let stats_accumulator2 = Arc::clone(&stats_accumulator);
118        let stream = SequentialStreamAdapter::new(
119            stream.dtype().clone(),
120            stream.map(move |item| {
121                let (sequence_id, chunk) = item?;
122                // We have already computed per-chunk statistics, so avoid trying again for any that failed.
123                stats_accumulator2
124                    .lock()
125                    .push_chunk_without_compute(&chunk)?;
126                Ok((sequence_id, chunk))
127            }),
128        )
129        .sendable();
130
131        let block_size = self.options.block_size;
132
133        // The eof used for the data child should appear _before_ our own stats tables.
134        let data_eof = eof.split_off();
135        let data_layout = self
136            .child
137            .write_stream(
138                ctx.clone(),
139                Arc::clone(&segment_sink),
140                stream,
141                data_eof,
142                &session,
143            )
144            .await?;
145
146        let Some(stats_table) = stats_accumulator.lock().as_stats_table()? else {
147            // If we have no stats (e.g. the DType doesn't support them), then we just return the
148            // child layout.
149            return Ok(data_layout);
150        };
151
152        // We must defer creating the stats table LayoutWriter until now, because the DType of
153        // the table depends on which stats were successfully computed.
154        let stats_stream = stats_table
155            .array()
156            .clone()
157            .into_array()
158            .to_array_stream()
159            .sequenced(eof.split_off());
160        let zones_layout = self
161            .stats
162            .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session)
163            .await?;
164
165        Ok(ZonedLayout::new(
166            data_layout,
167            zones_layout,
168            block_size,
169            Arc::clone(stats_table.present_stats()),
170        )
171        .into_layout())
172    }
173
174    fn buffered_bytes(&self) -> u64 {
175        self.child.buffered_bytes() + self.stats.buffered_bytes()
176    }
177}