Skip to main content

vortex_layout/layouts/zoned/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::IntoArray;
11use vortex_array::expr::stats::Stat;
12use vortex_array::stats::PRUNING_STATS;
13use vortex_error::VortexResult;
14use vortex_io::session::RuntimeSessionExt;
15use vortex_session::VortexSession;
16
17use crate::IntoLayout;
18use crate::LayoutRef;
19use crate::LayoutStrategy;
20use crate::layouts::zoned::ZonedLayout;
21use crate::layouts::zoned::zone_map::StatsAccumulator;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialArrayStreamExt;
26use crate::sequence::SequentialStreamAdapter;
27use crate::sequence::SequentialStreamExt;
28
29pub struct ZonedLayoutOptions {
30    /// The size of a statistics block
31    pub block_size: usize,
32    /// The statistics to collect for each block.
33    pub stats: Arc<[Stat]>,
34    /// Maximum length of a variable length statistics
35    pub max_variable_length_statistics_size: usize,
36    /// Number of chunks to compute in parallel.
37    pub concurrency: usize,
38}
39
40impl Default for ZonedLayoutOptions {
41    fn default() -> Self {
42        Self {
43            block_size: 8192,
44            stats: PRUNING_STATS.into(),
45            max_variable_length_statistics_size: 64,
46            concurrency: std::thread::available_parallelism()
47                .map(|v| v.get())
48                .unwrap_or(1),
49        }
50    }
51}
52
53pub struct ZonedStrategy {
54    child: Arc<dyn LayoutStrategy>,
55    stats: Arc<dyn LayoutStrategy>,
56    options: ZonedLayoutOptions,
57}
58
59impl ZonedStrategy {
60    pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
61        child: Child,
62        stats: Stats,
63        options: ZonedLayoutOptions,
64    ) -> Self {
65        Self {
66            child: Arc::new(child),
67            stats: Arc::new(stats),
68            options,
69        }
70    }
71}
72
73#[async_trait]
74impl LayoutStrategy for ZonedStrategy {
75    async fn write_stream(
76        &self,
77        ctx: ArrayContext,
78        segment_sink: SegmentSinkRef,
79        stream: SendableSequentialStream,
80        mut eof: SequencePointer,
81        session: &VortexSession,
82    ) -> VortexResult<LayoutRef> {
83        let stats = Arc::clone(&self.options.stats);
84        let handle = session.handle();
85        let handle2 = handle.clone();
86
87        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
88            stream.dtype(),
89            &stats,
90            self.options.max_variable_length_statistics_size,
91        )));
92
93        // We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
94        let stream = SequentialStreamAdapter::new(
95            stream.dtype().clone(),
96            stream
97                .map(move |chunk| {
98                    let stats = Arc::clone(&stats);
99                    handle2.spawn_cpu(move || {
100                        let (sequence_id, chunk) = chunk?;
101                        chunk.statistics().compute_all(&stats)?;
102                        VortexResult::Ok((sequence_id, chunk))
103                    })
104                })
105                .buffered(self.options.concurrency),
106        )
107        .sendable();
108
109        // Now we accumulate the stats we computed above, this time we cannot spawn because we
110        // need to feed the accumulator an ordered stream.
111        let stats_accumulator2 = Arc::clone(&stats_accumulator);
112        let stream = SequentialStreamAdapter::new(
113            stream.dtype().clone(),
114            stream.map(move |item| {
115                let (sequence_id, chunk) = item?;
116                // We have already computed per-chunk statistics, so avoid trying again for any that failed.
117                stats_accumulator2
118                    .lock()
119                    .push_chunk_without_compute(&chunk)?;
120                Ok((sequence_id, chunk))
121            }),
122        )
123        .sendable();
124
125        let block_size = self.options.block_size;
126
127        // The eof used for the data child should appear _before_ our own stats tables.
128        let data_eof = eof.split_off();
129        let data_layout = self
130            .child
131            .write_stream(
132                ctx.clone(),
133                Arc::clone(&segment_sink),
134                stream,
135                data_eof,
136                session,
137            )
138            .await?;
139
140        let Some(stats_table) = stats_accumulator.lock().as_stats_table()? else {
141            // If we have no stats (e.g. the DType doesn't support them), then we just return the
142            // child layout.
143            return Ok(data_layout);
144        };
145
146        // We must defer creating the stats table LayoutWriter until now, because the DType of
147        // the table depends on which stats were successfully computed.
148        let stats_stream = stats_table
149            .array()
150            .clone()
151            .into_array()
152            .to_array_stream()
153            .sequenced(eof.split_off());
154        let zones_layout = self
155            .stats
156            .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, session)
157            .await?;
158
159        Ok(ZonedLayout::new(
160            data_layout,
161            zones_layout,
162            block_size,
163            Arc::clone(stats_table.present_stats()),
164        )
165        .into_layout())
166    }
167
168    fn buffered_bytes(&self) -> u64 {
169        self.child.buffered_bytes() + self.stats.buffered_bytes()
170    }
171}