vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{FutureExt as _, StreamExt as _};
8use vortex_array::stats::Stat;
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_btrblocks::BtrBlocksCompressor;
11use vortex_error::VortexResult;
12
13use crate::segments::SequenceWriter;
14use crate::{
15    LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
16    SequentialStreamExt as _, TaskExecutor, TaskExecutorExt as _,
17};
18
19/// A boxed compressor function from arrays into compressed arrays.
20///
21/// Both the balanced `BtrBlocksCompressor` and the size-optimized `CompactCompressor`
22/// meet this interface.
23///
24/// API consumers are also free to implement this trait to provide new plugin compressors.
25pub trait CompressorPlugin: Send + Sync + 'static {
26    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
27}
28
29impl CompressorPlugin for Arc<dyn CompressorPlugin> {
30    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
31        self.as_ref().compress_chunk(chunk)
32    }
33}
34
35impl<F> CompressorPlugin for F
36where
37    F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
38{
39    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
40        self(chunk)
41    }
42}
43
44impl CompressorPlugin for BtrBlocksCompressor {
45    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
46        self.compress(chunk)
47    }
48}
49
50#[cfg(feature = "zstd")]
51impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
52    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
53        self.compress(chunk)
54    }
55}
56
57/// A layout writer that compresses chunks.
58#[derive(Clone)]
59pub struct CompressingStrategy<S> {
60    child: S,
61    compressor: Arc<dyn CompressorPlugin>,
62    executor: Arc<dyn TaskExecutor>,
63    parallelism: usize,
64}
65
66impl<S: LayoutStrategy> CompressingStrategy<S> {
67    /// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
68    ///
69    /// This provides a good balance between decoding speed and small file size.
70    ///
71    /// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays,
72    /// which is useful when compressing dictionary codes to avoid recursive dictionary encoding.
73    pub fn new_btrblocks(
74        child: S,
75        executor: Arc<dyn TaskExecutor>,
76        parallelism: usize,
77        exclude_int_dict_encoding: bool,
78    ) -> Self {
79        Self {
80            child,
81            compressor: Arc::new(BtrBlocksCompressor {
82                exclude_int_dict_encoding,
83            }),
84            executor,
85            parallelism,
86        }
87    }
88
89    /// Create a new writer that compresses using a `CompactCompressor` to compress chunks.
90    ///
91    /// This may create smaller files than the BtrBlocks writer, in exchange for some penalty
92    /// to decoding performance. This is only recommended for datasets that make heavy use of
93    /// floating point numbers.
94    ///
95    /// [`CompactCompressor`]: crate::layouts::compact::CompactCompressor
96    #[cfg(feature = "zstd")]
97    pub fn new_compact(
98        child: S,
99        compressor: crate::layouts::compact::CompactCompressor,
100        executor: Arc<dyn TaskExecutor>,
101        parallelism: usize,
102    ) -> Self {
103        Self {
104            child,
105            compressor: Arc::new(compressor),
106            executor,
107            parallelism,
108        }
109    }
110
111    /// Create a new compressor from a plugin interface.
112    pub fn new_opaque<C: CompressorPlugin>(
113        child: S,
114        compressor: C,
115        executor: Arc<dyn TaskExecutor>,
116        parallelism: usize,
117    ) -> Self {
118        Self {
119            child,
120            compressor: Arc::new(compressor),
121            executor,
122            parallelism,
123        }
124    }
125}
126
127#[async_trait]
128impl<S> LayoutStrategy for CompressingStrategy<S>
129where
130    S: LayoutStrategy,
131{
132    async fn write_stream(
133        &self,
134        ctx: &ArrayContext,
135        sequence_writer: SequenceWriter,
136        stream: SendableSequentialStream,
137    ) -> VortexResult<LayoutRef> {
138        let dtype = stream.dtype().clone();
139        let compressor = self.compressor.clone();
140        let executor = self.executor.clone();
141
142        let stream = stream
143            .map(move |chunk| {
144                let compressor = compressor.clone();
145                async move {
146                    let (sequence_id, chunk) = chunk?;
147                    // Compute the stats for the chunk prior to compression
148                    chunk
149                        .statistics()
150                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
151                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
152                }
153                .boxed()
154            })
155            .map(move |compress_future| executor.spawn(compress_future))
156            .buffered(self.parallelism);
157
158        self.child
159            .write_stream(
160                ctx,
161                sequence_writer,
162                SequentialStreamAdapter::new(dtype, stream).sendable(),
163            )
164            .await
165    }
166}