Skip to main content

vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::ArrayContext;
9use vortex_array::ArrayRef;
10use vortex_array::ExecutionCtx;
11use vortex_array::VortexSessionExecute;
12use vortex_array::expr::stats::Stat;
13use vortex_btrblocks::BtrBlocksCompressor;
14use vortex_error::VortexResult;
15use vortex_io::session::RuntimeSessionExt;
16use vortex_session::VortexSession;
17use vortex_utils::parallelism::get_available_parallelism;
18
19use crate::LayoutRef;
20use crate::LayoutStrategy;
21use crate::segments::SegmentSinkRef;
22use crate::sequence::SendableSequentialStream;
23use crate::sequence::SequencePointer;
24use crate::sequence::SequentialStreamAdapter;
25use crate::sequence::SequentialStreamExt;
26
27/// A boxed compressor function from arrays into compressed arrays.
28///
29/// API consumers are free to implement this trait to provide new plugin compressors.
30pub trait CompressorPlugin: Send + Sync + 'static {
31    fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef>;
32}
33
34impl CompressorPlugin for Arc<dyn CompressorPlugin> {
35    fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
36        self.as_ref().compress_chunk(chunk, ctx)
37    }
38}
39
40impl<F> CompressorPlugin for F
41where
42    F: Fn(&ArrayRef, &mut ExecutionCtx) -> VortexResult<ArrayRef> + Send + Sync + 'static,
43{
44    fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
45        self(chunk, ctx)
46    }
47}
48
49impl CompressorPlugin for BtrBlocksCompressor {
50    fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
51        self.compress(chunk, ctx)
52    }
53}
54
55/// A layout writer that compresses chunks.
56#[derive(Clone)]
57pub struct CompressingStrategy {
58    child: Arc<dyn LayoutStrategy>,
59    compressor: Arc<dyn CompressorPlugin>,
60    stats: Arc<[Stat]>,
61    concurrency: usize,
62}
63
64impl CompressingStrategy {
65    /// Create a new compressing layout strategy with the given child strategy and compressor.
66    pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
67        Self {
68            child: Arc::new(child),
69            compressor: Arc::new(compressor),
70            stats: Stat::all().collect(),
71            concurrency: get_available_parallelism().unwrap_or(1),
72        }
73    }
74
75    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
76        self.concurrency = concurrency;
77        self
78    }
79
80    /// Override the set of statistics computed on each chunk before compression.
81    /// Defaults to `Stat::all()`.
82    pub fn with_stats(mut self, stats: &[Stat]) -> Self {
83        self.stats = stats.into();
84        self
85    }
86}
87
88#[async_trait]
89impl LayoutStrategy for CompressingStrategy {
90    async fn write_stream(
91        &self,
92        ctx: ArrayContext,
93        segment_sink: SegmentSinkRef,
94        stream: SendableSequentialStream,
95        eof: SequencePointer,
96        session: &VortexSession,
97    ) -> VortexResult<LayoutRef> {
98        let dtype = stream.dtype().clone();
99        let compressor = Arc::clone(&self.compressor);
100        let stats = Arc::clone(&self.stats);
101        let session = session.clone();
102        let compute_session = session.clone();
103
104        let handle = session.handle();
105        let stream = stream
106            .map(move |chunk| {
107                let compressor = Arc::clone(&compressor);
108                let stats = Arc::clone(&stats);
109                let session = compute_session.clone();
110                handle.spawn_cpu(move || {
111                    let (sequence_id, chunk) = chunk?;
112                    let mut ctx = session.create_execution_ctx();
113                    // Compute the stats for the chunk prior to compression
114                    chunk.statistics().compute_all(&stats, &mut ctx)?;
115                    Ok((sequence_id, compressor.compress_chunk(&chunk, &mut ctx)?))
116                })
117            })
118            .buffered(self.concurrency);
119
120        self.child
121            .write_stream(
122                ctx,
123                segment_sink,
124                SequentialStreamAdapter::new(dtype, stream).sendable(),
125                eof,
126                &session,
127            )
128            .await
129    }
130
131    fn buffered_bytes(&self) -> u64 {
132        self.child.buffered_bytes()
133    }
134}