vortex_layout/layouts/
compressed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use arcref::ArcRef;
7use futures::{FutureExt as _, StreamExt as _};
8use vortex_array::ArrayContext;
9use vortex_array::stats::Stat;
10use vortex_btrblocks::BtrBlocksCompressor;
11
12use crate::segments::SequenceWriter;
13use crate::{
14    LayoutStrategy, SendableLayoutFuture, SendableSequentialStream, SequentialStreamAdapter,
15    SequentialStreamExt as _, TaskExecutor, TaskExecutorExt as _,
16};
17
18/// A layout writer that compresses chunks using a sampling compressor.
19pub struct BtrBlocksCompressedStrategy {
20    child: ArcRef<dyn LayoutStrategy>,
21    executor: Arc<dyn TaskExecutor>,
22    parallelism: usize,
23}
24
25impl BtrBlocksCompressedStrategy {
26    pub fn new(
27        child: ArcRef<dyn LayoutStrategy>,
28        executor: Arc<dyn TaskExecutor>,
29        parallelism: usize,
30    ) -> Self {
31        Self {
32            child,
33            executor,
34            parallelism,
35        }
36    }
37}
38
39impl LayoutStrategy for BtrBlocksCompressedStrategy {
40    fn write_stream(
41        &self,
42        ctx: &ArrayContext,
43        sequence_writer: SequenceWriter,
44        stream: SendableSequentialStream,
45    ) -> SendableLayoutFuture {
46        let executor = self.executor.clone();
47
48        let dtype = stream.dtype().clone();
49        let stream = stream
50            .map(|chunk| {
51                async {
52                    let (sequence_id, chunk) = chunk?;
53                    // Compute the stats for the chunk prior to compression
54                    chunk
55                        .statistics()
56                        .compute_all(&Stat::all().collect::<Vec<_>>())?;
57                    Ok((sequence_id, BtrBlocksCompressor.compress(&chunk)?))
58                }
59                .boxed()
60            })
61            .map(move |compress_future| executor.spawn(compress_future))
62            .buffered(self.parallelism);
63
64        self.child.write_stream(
65            ctx,
66            sequence_writer,
67            SequentialStreamAdapter::new(dtype, stream).sendable(),
68        )
69    }
70}