Skip to main content

vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::ArrayContext;
9use vortex_array::ArrayRef;
10use vortex_array::VortexSessionExecute;
11use vortex_array::expr::stats::Stat;
12use vortex_btrblocks::BtrBlocksCompressor;
13use vortex_error::VortexResult;
14use vortex_io::session::RuntimeSessionExt;
15use vortex_session::VortexSession;
16
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::segments::SegmentSinkRef;
20use crate::sequence::SendableSequentialStream;
21use crate::sequence::SequencePointer;
22use crate::sequence::SequentialStreamAdapter;
23use crate::sequence::SequentialStreamExt;
24
25/// A boxed compressor function from arrays into compressed arrays.
26///
27/// API consumers are free to implement this trait to provide new plugin compressors.
28pub trait CompressorPlugin: Send + Sync + 'static {
29    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef>;
30}
31
32impl CompressorPlugin for Arc<dyn CompressorPlugin> {
33    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
34        self.as_ref().compress_chunk(chunk)
35    }
36}
37
38impl<F> CompressorPlugin for F
39where
40    F: Fn(&ArrayRef) -> VortexResult<ArrayRef> + Send + Sync + 'static,
41{
42    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
43        self(chunk)
44    }
45}
46
47impl CompressorPlugin for BtrBlocksCompressor {
48    fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
49        self.compress(chunk)
50    }
51}
52
53/// A layout writer that compresses chunks.
54#[derive(Clone)]
55pub struct CompressingStrategy {
56    child: Arc<dyn LayoutStrategy>,
57    compressor: Arc<dyn CompressorPlugin>,
58    stats: Arc<[Stat]>,
59    concurrency: usize,
60}
61
62impl CompressingStrategy {
63    /// Create a new compressing layout strategy with the given child strategy and compressor.
64    pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
65        Self {
66            child: Arc::new(child),
67            compressor: Arc::new(compressor),
68            stats: Stat::all().collect(),
69            concurrency: std::thread::available_parallelism()
70                .map(|v| v.get())
71                .unwrap_or(1),
72        }
73    }
74
75    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
76        self.concurrency = concurrency;
77        self
78    }
79
80    /// Override the set of statistics computed on each chunk before compression.
81    /// Defaults to `Stat::all()`.
82    pub fn with_stats(mut self, stats: &[Stat]) -> Self {
83        self.stats = stats.into();
84        self
85    }
86}
87
88#[async_trait]
89impl LayoutStrategy for CompressingStrategy {
90    async fn write_stream(
91        &self,
92        ctx: ArrayContext,
93        segment_sink: SegmentSinkRef,
94        stream: SendableSequentialStream,
95        eof: SequencePointer,
96        session: &VortexSession,
97    ) -> VortexResult<LayoutRef> {
98        let dtype = stream.dtype().clone();
99        let compressor = Arc::clone(&self.compressor);
100        let stats = Arc::clone(&self.stats);
101        let session = session.clone();
102        let compute_session = session.clone();
103
104        let handle = session.handle();
105        let stream = stream
106            .map(move |chunk| {
107                let compressor = Arc::clone(&compressor);
108                let stats = Arc::clone(&stats);
109                let session = compute_session.clone();
110                handle.spawn_cpu(move || {
111                    let (sequence_id, chunk) = chunk?;
112                    // Compute the stats for the chunk prior to compression
113                    chunk
114                        .statistics()
115                        .compute_all(&stats, &mut session.create_execution_ctx())?;
116                    Ok((sequence_id, compressor.compress_chunk(&chunk)?))
117                })
118            })
119            .buffered(self.concurrency);
120
121        self.child
122            .write_stream(
123                ctx,
124                segment_sink,
125                SequentialStreamAdapter::new(dtype, stream).sendable(),
126                eof,
127                &session,
128            )
129            .await
130    }
131
132    fn buffered_bytes(&self) -> u64 {
133        self.child.buffered_bytes()
134    }
135}