vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use arcref::ArcRef;
7use futures::{FutureExt as _, StreamExt as _};
8use vortex_array::ArrayContext;
9use vortex_array::stats::Stat;
10use vortex_btrblocks::BtrBlocksCompressor;
11
12use crate::segments::SequenceWriter;
13use crate::{
14 LayoutStrategy, SendableLayoutFuture, SendableSequentialStream, SequentialStreamAdapter,
15 SequentialStreamExt as _, TaskExecutor, TaskExecutorExt as _,
16};
17
18pub struct BtrBlocksCompressedStrategy {
20 child: ArcRef<dyn LayoutStrategy>,
21 executor: Arc<dyn TaskExecutor>,
22 parallelism: usize,
23}
24
25impl BtrBlocksCompressedStrategy {
26 pub fn new(
27 child: ArcRef<dyn LayoutStrategy>,
28 executor: Arc<dyn TaskExecutor>,
29 parallelism: usize,
30 ) -> Self {
31 Self {
32 child,
33 executor,
34 parallelism,
35 }
36 }
37}
38
39impl LayoutStrategy for BtrBlocksCompressedStrategy {
40 fn write_stream(
41 &self,
42 ctx: &ArrayContext,
43 sequence_writer: SequenceWriter,
44 stream: SendableSequentialStream,
45 ) -> SendableLayoutFuture {
46 let executor = self.executor.clone();
47
48 let dtype = stream.dtype().clone();
49 let stream = stream
50 .map(|chunk| {
51 async {
52 let (sequence_id, chunk) = chunk?;
53 chunk
55 .statistics()
56 .compute_all(&Stat::all().collect::<Vec<_>>())?;
57 Ok((sequence_id, BtrBlocksCompressor.compress(&chunk)?))
58 }
59 .boxed()
60 })
61 .map(move |compress_future| executor.spawn(compress_future))
62 .buffered(self.parallelism);
63
64 self.child.write_stream(
65 ctx,
66 sequence_writer,
67 SequentialStreamAdapter::new(dtype, stream).sendable(),
68 )
69 }
70}