vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::Array;
9use vortex_array::ArrayContext;
10use vortex_array::ArrayRef;
11use vortex_array::expr::stats::Stat;
12use vortex_btrblocks::BtrBlocksCompressor;
13use vortex_error::VortexResult;
14use vortex_io::runtime::Handle;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStreamAdapter;
22use crate::sequence::SequentialStreamExt;
23
24/// A boxed compressor function from arrays into compressed arrays.
25///
26/// Both the balanced `BtrBlocksCompressor` and the size-optimized `CompactCompressor`
27/// meet this interface.
28///
29/// API consumers are also free to implement this trait to provide new plugin compressors.
30pub trait CompressorPlugin: Send + Sync + 'static {
31    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
32}
33
34impl CompressorPlugin for Arc<dyn CompressorPlugin> {
35    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
36        self.as_ref().compress_chunk(chunk)
37    }
38}
39
40impl<F> CompressorPlugin for F
41where
42    F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
43{
44    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
45        self(chunk)
46    }
47}
48
49impl CompressorPlugin for BtrBlocksCompressor {
50    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
51        self.compress(chunk)
52    }
53}
54
55#[cfg(feature = "zstd")]
56impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
57    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
58        self.compress(chunk)
59    }
60}
61
62/// A layout writer that compresses chunks.
63#[derive(Clone)]
64pub struct CompressingStrategy {
65    child: Arc<dyn LayoutStrategy>,
66    compressor: Arc<dyn CompressorPlugin>,
67    concurrency: usize,
68}
69
70impl CompressingStrategy {
71    /// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
72    ///
73    /// This provides a good balance between decoding speed and small file size.
74    ///
75    /// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays,
76    /// which is useful when compressing dictionary codes to avoid recursive dictionary encoding.
77    pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
78        Self::new(
79            child,
80            Arc::new(BtrBlocksCompressor {
81                exclude_int_dict_encoding,
82            }),
83        )
84    }
85
86    /// Create a new writer that compresses using a `CompactCompressor` to compress chunks.
87    ///
88    /// This may create smaller files than the BtrBlocks writer, in exchange for some penalty
89    /// to decoding performance. This is only recommended for datasets that make heavy use of
90    /// floating point numbers.
91    ///
92    /// [`CompactCompressor`]: crate::layouts::compact::CompactCompressor
93    #[cfg(feature = "zstd")]
94    pub fn new_compact<S: LayoutStrategy>(
95        child: S,
96        compressor: crate::layouts::compact::CompactCompressor,
97    ) -> Self {
98        Self::new(child, Arc::new(compressor))
99    }
100
101    /// Create a new compressor from a plugin interface.
102    pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
103        Self::new(child, Arc::new(compressor))
104    }
105
106    fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
107        Self {
108            child: Arc::new(child),
109            compressor,
110            concurrency: std::thread::available_parallelism()
111                .map(|v| v.get())
112                .unwrap_or(1),
113        }
114    }
115
116    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
117        self.concurrency = concurrency;
118        self
119    }
120}
121
122#[async_trait]
123impl LayoutStrategy for CompressingStrategy {
124    async fn write_stream(
125        &self,
126        ctx: ArrayContext,
127        segment_sink: SegmentSinkRef,
128        stream: SendableSequentialStream,
129        eof: SequencePointer,
130        handle: Handle,
131    ) -> VortexResult<LayoutRef> {
132        let dtype = stream.dtype().clone();
133        let compressor = self.compressor.clone();
134
135        let handle2 = handle.clone();
136        let stream = stream
137            .map(move |chunk| {
138                let compressor = compressor.clone();
139                handle2.spawn_cpu(move || {
140                    let (sequence_id, chunk) = chunk?;
141                    // Compute the stats for the chunk prior to compression
142                    chunk
143                        .statistics()
144                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
145                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
146                })
147            })
148            .buffered(self.concurrency);
149
150        self.child
151            .write_stream(
152                ctx,
153                segment_sink,
154                SequentialStreamAdapter::new(dtype, stream).sendable(),
155                eof,
156                handle,
157            )
158            .await
159    }
160
161    fn buffered_bytes(&self) -> u64 {
162        self.child.buffered_bytes()
163    }
164}