vortex_layout/layouts/zoned/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::expr::stats::Stat;
11use vortex_array::stats::PRUNING_STATS;
12use vortex_error::VortexResult;
13use vortex_io::runtime::Handle;
14
15use crate::IntoLayout;
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::layouts::zoned::ZonedLayout;
19use crate::layouts::zoned::zone_map::StatsAccumulator;
20use crate::segments::SegmentSinkRef;
21use crate::sequence::SendableSequentialStream;
22use crate::sequence::SequencePointer;
23use crate::sequence::SequentialArrayStreamExt;
24use crate::sequence::SequentialStreamAdapter;
25use crate::sequence::SequentialStreamExt;
26
27pub struct ZonedLayoutOptions {
28    /// The size of a statistics block
29    pub block_size: usize,
30    /// The statistics to collect for each block.
31    pub stats: Arc<[Stat]>,
32    /// Maximum length of a variable length statistics
33    pub max_variable_length_statistics_size: usize,
34    /// Number of chunks to compute in parallel.
35    pub concurrency: usize,
36}
37
38impl Default for ZonedLayoutOptions {
39    fn default() -> Self {
40        Self {
41            block_size: 8192,
42            stats: PRUNING_STATS.into(),
43            max_variable_length_statistics_size: 64,
44            concurrency: std::thread::available_parallelism()
45                .map(|v| v.get())
46                .unwrap_or(1),
47        }
48    }
49}
50
51pub struct ZonedStrategy {
52    child: Arc<dyn LayoutStrategy>,
53    stats: Arc<dyn LayoutStrategy>,
54    options: ZonedLayoutOptions,
55}
56
57impl ZonedStrategy {
58    pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
59        child: Child,
60        stats: Stats,
61        options: ZonedLayoutOptions,
62    ) -> Self {
63        Self {
64            child: Arc::new(child),
65            stats: Arc::new(stats),
66            options,
67        }
68    }
69}
70
71#[async_trait]
72impl LayoutStrategy for ZonedStrategy {
73    async fn write_stream(
74        &self,
75        ctx: ArrayContext,
76        segment_sink: SegmentSinkRef,
77        stream: SendableSequentialStream,
78        mut eof: SequencePointer,
79        handle: Handle,
80    ) -> VortexResult<LayoutRef> {
81        let stats = self.options.stats.clone();
82        let handle2 = handle.clone();
83
84        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
85            stream.dtype(),
86            &stats,
87            self.options.max_variable_length_statistics_size,
88        )));
89
90        // We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
91        let stream = SequentialStreamAdapter::new(
92            stream.dtype().clone(),
93            stream
94                .map(move |chunk| {
95                    let stats = stats.clone();
96                    handle2.spawn_cpu(move || {
97                        let (sequence_id, chunk) = chunk?;
98                        chunk.statistics().compute_all(&stats)?;
99                        VortexResult::Ok((sequence_id, chunk))
100                    })
101                })
102                .buffered(self.options.concurrency),
103        )
104        .sendable();
105
106        // Now we accumulate the stats we computed above, this time we cannot spawn because we
107        // need to feed the accumulator an ordered stream.
108        let stats_accumulator2 = stats_accumulator.clone();
109        let stream = SequentialStreamAdapter::new(
110            stream.dtype().clone(),
111            stream.map(move |item| {
112                let (sequence_id, chunk) = item?;
113                // We have already computed per-chunk statistics, so avoid trying again for any that failed.
114                stats_accumulator2
115                    .lock()
116                    .push_chunk_without_compute(&chunk)?;
117                Ok((sequence_id, chunk))
118            }),
119        )
120        .sendable();
121
122        let block_size = self.options.block_size;
123
124        // The eof used for the data child should appear _before_ our own stats tables.
125        let data_eof = eof.split_off();
126        let data_layout = self
127            .child
128            .write_stream(
129                ctx.clone(),
130                segment_sink.clone(),
131                stream,
132                data_eof,
133                handle.clone(),
134            )
135            .await?;
136
137        let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
138            // If we have no stats (e.g. the DType doesn't support them), then we just return the
139            // child layout.
140            return Ok(data_layout);
141        };
142
143        // We must defer creating the stats table LayoutWriter until now, because the DType of
144        // the table depends on which stats were successfully computed.
145        let stats_stream = stats_table
146            .array()
147            .to_array_stream()
148            .sequenced(eof.split_off());
149        let zones_layout = self
150            .stats
151            .write_stream(ctx, segment_sink.clone(), stats_stream, eof, handle)
152            .await?;
153
154        Ok(ZonedLayout::new(
155            data_layout,
156            zones_layout,
157            block_size,
158            stats_table.present_stats().clone(),
159        )
160        .into_layout())
161    }
162
163    fn buffered_bytes(&self) -> u64 {
164        self.child.buffered_bytes() + self.stats.buffered_bytes()
165    }
166}