Skip to main content

vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::ArrayContext;
9use vortex_array::ArrayRef;
10use vortex_array::expr::stats::Stat;
11use vortex_btrblocks::BtrBlocksCompressor;
12use vortex_error::VortexResult;
13use vortex_io::session::RuntimeSessionExt;
14use vortex_session::VortexSession;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStreamAdapter;
22use crate::sequence::SequentialStreamExt;
23
24/// A boxed compressor function from arrays into compressed arrays.
25///
26/// API consumers are free to implement this trait to provide new plugin compressors.
27pub trait CompressorPlugin: Send + Sync + 'static {
28    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef>;
29}
30
31impl CompressorPlugin for Arc<dyn CompressorPlugin> {
32    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
33        self.as_ref().compress_chunk(chunk)
34    }
35}
36
37impl<F> CompressorPlugin for F
38where
39    F: Fn(&ArrayRef) -> VortexResult<ArrayRef> + Send + Sync + 'static,
40{
41    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
42        self(chunk)
43    }
44}
45
46impl CompressorPlugin for BtrBlocksCompressor {
47    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
48        self.compress(chunk)
49    }
50}
51
52/// A layout writer that compresses chunks.
53#[derive(Clone)]
54pub struct CompressingStrategy {
55    child: Arc<dyn LayoutStrategy>,
56    compressor: Arc<dyn CompressorPlugin>,
57    concurrency: usize,
58}
59
60impl CompressingStrategy {
61    /// Create a new compressing layout strategy with the given child strategy and compressor.
62    pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
63        Self {
64            child: Arc::new(child),
65            compressor: Arc::new(compressor),
66            concurrency: std::thread::available_parallelism()
67                .map(|v| v.get())
68                .unwrap_or(1),
69        }
70    }
71
72    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
73        self.concurrency = concurrency;
74        self
75    }
76}
77
78#[async_trait]
79impl LayoutStrategy for CompressingStrategy {
80    async fn write_stream(
81        &self,
82        ctx: ArrayContext,
83        segment_sink: SegmentSinkRef,
84        stream: SendableSequentialStream,
85        eof: SequencePointer,
86        session: &VortexSession,
87    ) -> VortexResult<LayoutRef> {
88        let dtype = stream.dtype().clone();
89        let compressor = Arc::clone(&self.compressor);
90
91        let handle = session.handle();
92        let stream = stream
93            .map(move |chunk| {
94                let compressor = Arc::clone(&compressor);
95                handle.spawn_cpu(move || {
96                    let (sequence_id, chunk) = chunk?;
97                    // Compute the stats for the chunk prior to compression
98                    chunk
99                        .statistics()
100                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
101                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
102                })
103            })
104            .buffered(self.concurrency);
105
106        self.child
107            .write_stream(
108                ctx,
109                segment_sink,
110                SequentialStreamAdapter::new(dtype, stream).sendable(),
111                eof,
112                session,
113            )
114            .await
115    }
116
117    fn buffered_bytes(&self) -> u64 {
118        self.child.buffered_bytes()
119    }
120}