vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::stats::Stat;
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_btrblocks::BtrBlocksCompressor;
11use vortex_error::VortexResult;
12use vortex_io::runtime::Handle;
13
14use crate::segments::SegmentSinkRef;
15use crate::sequence::{
16    SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt,
17};
18use crate::{LayoutRef, LayoutStrategy};
19
20/// A boxed compressor function from arrays into compressed arrays.
21///
22/// Both the balanced `BtrBlocksCompressor` and the size-optimized `CompactCompressor`
23/// meet this interface.
24///
25/// API consumers are also free to implement this trait to provide new plugin compressors.
26pub trait CompressorPlugin: Send + Sync + 'static {
27    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
28}
29
30impl CompressorPlugin for Arc<dyn CompressorPlugin> {
31    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
32        self.as_ref().compress_chunk(chunk)
33    }
34}
35
36impl<F> CompressorPlugin for F
37where
38    F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
39{
40    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
41        self(chunk)
42    }
43}
44
45impl CompressorPlugin for BtrBlocksCompressor {
46    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
47        self.compress(chunk)
48    }
49}
50
51#[cfg(feature = "zstd")]
52impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
53    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
54        self.compress(chunk)
55    }
56}
57
58/// A layout writer that compresses chunks.
59#[derive(Clone)]
60pub struct CompressingStrategy {
61    child: Arc<dyn LayoutStrategy>,
62    compressor: Arc<dyn CompressorPlugin>,
63    concurrency: usize,
64}
65
66impl CompressingStrategy {
67    /// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
68    ///
69    /// This provides a good balance between decoding speed and small file size.
70    ///
71    /// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays,
72    /// which is useful when compressing dictionary codes to avoid recursive dictionary encoding.
73    pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
74        Self::new(
75            child,
76            Arc::new(BtrBlocksCompressor {
77                exclude_int_dict_encoding,
78            }),
79        )
80    }
81
82    /// Create a new writer that compresses using a `CompactCompressor` to compress chunks.
83    ///
84    /// This may create smaller files than the BtrBlocks writer, in exchange for some penalty
85    /// to decoding performance. This is only recommended for datasets that make heavy use of
86    /// floating point numbers.
87    ///
88    /// [`CompactCompressor`]: crate::layouts::compact::CompactCompressor
89    #[cfg(feature = "zstd")]
90    pub fn new_compact<S: LayoutStrategy>(
91        child: S,
92        compressor: crate::layouts::compact::CompactCompressor,
93    ) -> Self {
94        Self::new(child, Arc::new(compressor))
95    }
96
97    /// Create a new compressor from a plugin interface.
98    pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
99        Self::new(child, Arc::new(compressor))
100    }
101
102    fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
103        Self {
104            child: Arc::new(child),
105            compressor,
106            concurrency: std::thread::available_parallelism()
107                .map(|v| v.get())
108                .unwrap_or(1),
109        }
110    }
111
112    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
113        self.concurrency = concurrency;
114        self
115    }
116}
117
118#[async_trait]
119impl LayoutStrategy for CompressingStrategy {
120    async fn write_stream(
121        &self,
122        ctx: ArrayContext,
123        segment_sink: SegmentSinkRef,
124        stream: SendableSequentialStream,
125        eof: SequencePointer,
126        handle: Handle,
127    ) -> VortexResult<LayoutRef> {
128        let dtype = stream.dtype().clone();
129        let compressor = self.compressor.clone();
130
131        let handle2 = handle.clone();
132        let stream = stream
133            .map(move |chunk| {
134                let compressor = compressor.clone();
135                handle2.spawn_cpu(move || {
136                    let (sequence_id, chunk) = chunk?;
137                    // Compute the stats for the chunk prior to compression
138                    chunk
139                        .statistics()
140                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
141                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
142                })
143            })
144            .buffered(self.concurrency);
145
146        self.child
147            .write_stream(
148                ctx,
149                segment_sink,
150                SequentialStreamAdapter::new(dtype, stream).sendable(),
151                eof,
152                handle,
153            )
154            .await
155    }
156
157    fn buffered_bytes(&self) -> u64 {
158        self.child.buffered_bytes()
159    }
160}