vortex_layout/layouts/zoned/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::stats::{PRUNING_STATS, Stat};
11use vortex_error::VortexResult;
12use vortex_io::runtime::Handle;
13
14use crate::layouts::zoned::ZonedLayout;
15use crate::layouts::zoned::zone_map::StatsAccumulator;
16use crate::segments::SegmentSinkRef;
17use crate::sequence::{
18    SendableSequentialStream, SequencePointer, SequentialArrayStreamExt, SequentialStreamAdapter,
19    SequentialStreamExt,
20};
21use crate::{IntoLayout, LayoutRef, LayoutStrategy};
22
23pub struct ZonedLayoutOptions {
24    /// The size of a statistics block
25    pub block_size: usize,
26    /// The statistics to collect for each block.
27    pub stats: Arc<[Stat]>,
28    /// Maximum length of a variable length statistics
29    pub max_variable_length_statistics_size: usize,
30    /// Number of chunks to compute in parallel.
31    pub concurrency: usize,
32}
33
34impl Default for ZonedLayoutOptions {
35    fn default() -> Self {
36        Self {
37            block_size: 8192,
38            stats: PRUNING_STATS.into(),
39            max_variable_length_statistics_size: 64,
40            concurrency: std::thread::available_parallelism()
41                .map(|v| v.get())
42                .unwrap_or(1),
43        }
44    }
45}
46
47pub struct ZonedStrategy {
48    child: Arc<dyn LayoutStrategy>,
49    stats: Arc<dyn LayoutStrategy>,
50    options: ZonedLayoutOptions,
51}
52
53impl ZonedStrategy {
54    pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
55        child: Child,
56        stats: Stats,
57        options: ZonedLayoutOptions,
58    ) -> Self {
59        Self {
60            child: Arc::new(child),
61            stats: Arc::new(stats),
62            options,
63        }
64    }
65}
66
67#[async_trait]
68impl LayoutStrategy for ZonedStrategy {
69    async fn write_stream(
70        &self,
71        ctx: ArrayContext,
72        segment_sink: SegmentSinkRef,
73        stream: SendableSequentialStream,
74        mut eof: SequencePointer,
75        handle: Handle,
76    ) -> VortexResult<LayoutRef> {
77        let stats = self.options.stats.clone();
78        let handle2 = handle.clone();
79
80        let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
81            stream.dtype(),
82            &stats,
83            self.options.max_variable_length_statistics_size,
84        )));
85
86        // We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
87        let stream = SequentialStreamAdapter::new(
88            stream.dtype().clone(),
89            stream
90                .map(move |chunk| {
91                    let stats = stats.clone();
92                    handle2.spawn_cpu(move || {
93                        let (sequence_id, chunk) = chunk?;
94                        chunk.statistics().compute_all(&stats)?;
95                        VortexResult::Ok((sequence_id, chunk))
96                    })
97                })
98                .buffered(self.options.concurrency),
99        )
100        .sendable();
101
102        // Now we accumulate the stats we computed above, this time we cannot spawn because we
103        // need to feed the accumulator an ordered stream.
104        let stats_accumulator2 = stats_accumulator.clone();
105        let stream = SequentialStreamAdapter::new(
106            stream.dtype().clone(),
107            stream.map(move |item| {
108                let (sequence_id, chunk) = item?;
109                // We have already computed per-chunk statistics, so avoid trying again for any that failed.
110                stats_accumulator2
111                    .lock()
112                    .push_chunk_without_compute(&chunk)?;
113                Ok((sequence_id, chunk))
114            }),
115        )
116        .sendable();
117
118        let block_size = self.options.block_size;
119
120        // The eof used for the data child should appear _before_ our own stats tables.
121        let data_eof = eof.split_off();
122        let data_layout = self
123            .child
124            .write_stream(
125                ctx.clone(),
126                segment_sink.clone(),
127                stream,
128                data_eof,
129                handle.clone(),
130            )
131            .await?;
132
133        let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
134            // If we have no stats (e.g. the DType doesn't support them), then we just return the
135            // child layout.
136            return Ok(data_layout);
137        };
138
139        // We must defer creating the stats table LayoutWriter until now, because the DType of
140        // the table depends on which stats were successfully computed.
141        let stats_stream = stats_table
142            .array()
143            .to_array_stream()
144            .sequenced(eof.split_off());
145        let zones_layout = self
146            .stats
147            .write_stream(ctx, segment_sink.clone(), stats_stream, eof, handle)
148            .await?;
149
150        Ok(ZonedLayout::new(
151            data_layout,
152            zones_layout,
153            block_size,
154            stats_table.present_stats().clone(),
155        )
156        .into_layout())
157    }
158
159    fn buffered_bytes(&self) -> u64 {
160        self.child.buffered_bytes() + self.stats.buffered_bytes()
161    }
162}