Skip to main content

vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::Array;
9use vortex_array::ArrayContext;
10use vortex_array::ArrayRef;
11use vortex_array::expr::stats::Stat;
12use vortex_btrblocks::BtrBlocksCompressor;
13use vortex_btrblocks::BtrBlocksCompressorBuilder;
14use vortex_btrblocks::IntCode;
15use vortex_error::VortexResult;
16use vortex_io::runtime::Handle;
17
18use crate::LayoutRef;
19use crate::LayoutStrategy;
20use crate::segments::SegmentSinkRef;
21use crate::sequence::SendableSequentialStream;
22use crate::sequence::SequencePointer;
23use crate::sequence::SequentialStreamAdapter;
24use crate::sequence::SequentialStreamExt;
25
26/// A boxed compressor function from arrays into compressed arrays.
27///
28/// API consumers are free to implement this trait to provide new plugin compressors.
29pub trait CompressorPlugin: Send + Sync + 'static {
30    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
31}
32
33impl CompressorPlugin for Arc<dyn CompressorPlugin> {
34    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
35        self.as_ref().compress_chunk(chunk)
36    }
37}
38
39impl<F> CompressorPlugin for F
40where
41    F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
42{
43    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
44        self(chunk)
45    }
46}
47
48impl CompressorPlugin for BtrBlocksCompressor {
49    fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
50        self.compress(chunk)
51    }
52}
53
54/// A layout writer that compresses chunks.
55#[derive(Clone)]
56pub struct CompressingStrategy {
57    child: Arc<dyn LayoutStrategy>,
58    compressor: Arc<dyn CompressorPlugin>,
59    concurrency: usize,
60}
61
62impl CompressingStrategy {
63    /// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
64    ///
65    /// This provides a good balance between decoding speed and small file size.
66    ///
67    /// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays,
68    /// which is useful when compressing dictionary codes to avoid recursive dictionary encoding.
69    pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
70        let compressor = if exclude_int_dict_encoding {
71            BtrBlocksCompressorBuilder::default()
72                .exclude_int([IntCode::Dict])
73                .build()
74        } else {
75            BtrBlocksCompressor::default()
76        };
77        Self::new(child, Arc::new(compressor))
78    }
79
80    /// Create a new compressor from a plugin interface.
81    pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
82        Self::new(child, Arc::new(compressor))
83    }
84
85    fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
86        Self {
87            child: Arc::new(child),
88            compressor,
89            concurrency: std::thread::available_parallelism()
90                .map(|v| v.get())
91                .unwrap_or(1),
92        }
93    }
94
95    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
96        self.concurrency = concurrency;
97        self
98    }
99}
100
101#[async_trait]
102impl LayoutStrategy for CompressingStrategy {
103    async fn write_stream(
104        &self,
105        ctx: ArrayContext,
106        segment_sink: SegmentSinkRef,
107        stream: SendableSequentialStream,
108        eof: SequencePointer,
109        handle: Handle,
110    ) -> VortexResult<LayoutRef> {
111        let dtype = stream.dtype().clone();
112        let compressor = self.compressor.clone();
113
114        let handle2 = handle.clone();
115        let stream = stream
116            .map(move |chunk| {
117                let compressor = compressor.clone();
118                handle2.spawn_cpu(move || {
119                    let (sequence_id, chunk) = chunk?;
120                    // Compute the stats for the chunk prior to compression
121                    chunk
122                        .statistics()
123                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
124                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
125                })
126            })
127            .buffered(self.concurrency);
128
129        self.child
130            .write_stream(
131                ctx,
132                segment_sink,
133                SequentialStreamAdapter::new(dtype, stream).sendable(),
134                eof,
135                handle,
136            )
137            .await
138    }
139
140    fn buffered_bytes(&self) -> u64 {
141        self.child.buffered_bytes()
142    }
143}