vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{FutureExt as _, StreamExt as _};
8use vortex_array::stats::Stat;
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_btrblocks::BtrBlocksCompressor;
11use vortex_error::VortexResult;
12
13use crate::segments::SequenceWriter;
14use crate::{
15    LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
16    SequentialStreamExt as _, TaskExecutor, TaskExecutorExt as _,
17};
18
19/// A boxed compressor function from arrays into compressed arrays.
20///
21/// Both the balanced `BtrBlocksCompressor` and the size-optimized `CompactCompressor`
22/// meet this interface.
23///
24/// API consumers are also free to implement this trait to provide new plugin compressors.
25pub trait CompressorPlugin: Send + Sync + 'static {
26    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
27}
28
29impl CompressorPlugin for Arc<dyn CompressorPlugin> {
30    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
31        self.as_ref().compress_chunk(chunk)
32    }
33}
34
35impl<F> CompressorPlugin for F
36where
37    F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
38{
39    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
40        self(chunk)
41    }
42}
43
44impl CompressorPlugin for BtrBlocksCompressor {
45    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
46        BtrBlocksCompressor::compress(self, chunk)
47    }
48}
49
50#[cfg(feature = "zstd")]
51impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
52    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
53        self.compress(chunk)
54    }
55}
56
57/// A layout writer that compresses chunks.
58#[derive(Clone)]
59pub struct CompressingStrategy<S> {
60    child: S,
61    compressor: Arc<dyn CompressorPlugin>,
62    executor: Arc<dyn TaskExecutor>,
63    parallelism: usize,
64}
65
66impl<S: LayoutStrategy> CompressingStrategy<S> {
67    /// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
68    ///
69    /// This provides a good balance between decoding speed and small file size.
70    pub fn new_btrblocks(child: S, executor: Arc<dyn TaskExecutor>, parallelism: usize) -> Self {
71        Self {
72            child,
73            compressor: Arc::new(BtrBlocksCompressor),
74            executor,
75            parallelism,
76        }
77    }
78
79    /// Create a new writer that compresses using a `CompactCompressor` to compress chunks.
80    ///
81    /// This may create smaller files than the BtrBlocks writer, in exchange for some penalty
82    /// to decoding performance. This is only recommended for datasets that make heavy use of
83    /// floating point numbers.
84    ///
85    /// [`CompactCompressor`]: crate::layouts::compact::CompactCompressor
86    #[cfg(feature = "zstd")]
87    pub fn new_compact(
88        child: S,
89        compressor: crate::layouts::compact::CompactCompressor,
90        executor: Arc<dyn TaskExecutor>,
91        parallelism: usize,
92    ) -> Self {
93        Self {
94            child,
95            compressor: Arc::new(compressor),
96            executor,
97            parallelism,
98        }
99    }
100
101    /// Create a new compressor from a plugin interface.
102    pub fn new_opaque<C: CompressorPlugin>(
103        child: S,
104        compressor: C,
105        executor: Arc<dyn TaskExecutor>,
106        parallelism: usize,
107    ) -> Self {
108        Self {
109            child,
110            compressor: Arc::new(compressor),
111            executor,
112            parallelism,
113        }
114    }
115}
116
117#[async_trait]
118impl<S> LayoutStrategy for CompressingStrategy<S>
119where
120    S: LayoutStrategy,
121{
122    async fn write_stream(
123        &self,
124        ctx: &ArrayContext,
125        sequence_writer: SequenceWriter,
126        stream: SendableSequentialStream,
127    ) -> VortexResult<LayoutRef> {
128        let dtype = stream.dtype().clone();
129        let compressor = self.compressor.clone();
130        let executor = self.executor.clone();
131
132        let stream = stream
133            .map(move |chunk| {
134                let compressor = compressor.clone();
135                async move {
136                    let (sequence_id, chunk) = chunk?;
137                    // Compute the stats for the chunk prior to compression
138                    chunk
139                        .statistics()
140                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
141                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
142                }
143                .boxed()
144            })
145            .map(move |compress_future| executor.spawn(compress_future))
146            .buffered(self.parallelism);
147
148        self.child
149            .write_stream(
150                ctx,
151                sequence_writer,
152                SequentialStreamAdapter::new(dtype, stream).sendable(),
153            )
154            .await
155    }
156}