vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{FutureExt as _, StreamExt as _};
8use vortex_array::stats::Stat;
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_btrblocks::BtrBlocksCompressor;
11use vortex_error::VortexResult;
12
13use crate::segments::SequenceWriter;
14use crate::{
15 LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
16 SequentialStreamExt as _, TaskExecutor, TaskExecutorExt as _,
17};
18
19pub trait CompressorPlugin: Send + Sync + 'static {
26 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
27}
28
29impl CompressorPlugin for Arc<dyn CompressorPlugin> {
30 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
31 self.as_ref().compress_chunk(chunk)
32 }
33}
34
35impl<F> CompressorPlugin for F
36where
37 F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
38{
39 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
40 self(chunk)
41 }
42}
43
44impl CompressorPlugin for BtrBlocksCompressor {
45 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
46 BtrBlocksCompressor::compress(self, chunk)
47 }
48}
49
50#[cfg(feature = "zstd")]
51impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
52 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
53 self.compress(chunk)
54 }
55}
56
57#[derive(Clone)]
59pub struct CompressingStrategy<S> {
60 child: S,
61 compressor: Arc<dyn CompressorPlugin>,
62 executor: Arc<dyn TaskExecutor>,
63 parallelism: usize,
64}
65
66impl<S: LayoutStrategy> CompressingStrategy<S> {
67 pub fn new_btrblocks(child: S, executor: Arc<dyn TaskExecutor>, parallelism: usize) -> Self {
71 Self {
72 child,
73 compressor: Arc::new(BtrBlocksCompressor),
74 executor,
75 parallelism,
76 }
77 }
78
79 #[cfg(feature = "zstd")]
87 pub fn new_compact(
88 child: S,
89 compressor: crate::layouts::compact::CompactCompressor,
90 executor: Arc<dyn TaskExecutor>,
91 parallelism: usize,
92 ) -> Self {
93 Self {
94 child,
95 compressor: Arc::new(compressor),
96 executor,
97 parallelism,
98 }
99 }
100
101 pub fn new_opaque<C: CompressorPlugin>(
103 child: S,
104 compressor: C,
105 executor: Arc<dyn TaskExecutor>,
106 parallelism: usize,
107 ) -> Self {
108 Self {
109 child,
110 compressor: Arc::new(compressor),
111 executor,
112 parallelism,
113 }
114 }
115}
116
117#[async_trait]
118impl<S> LayoutStrategy for CompressingStrategy<S>
119where
120 S: LayoutStrategy,
121{
122 async fn write_stream(
123 &self,
124 ctx: &ArrayContext,
125 sequence_writer: SequenceWriter,
126 stream: SendableSequentialStream,
127 ) -> VortexResult<LayoutRef> {
128 let dtype = stream.dtype().clone();
129 let compressor = self.compressor.clone();
130 let executor = self.executor.clone();
131
132 let stream = stream
133 .map(move |chunk| {
134 let compressor = compressor.clone();
135 async move {
136 let (sequence_id, chunk) = chunk?;
137 chunk
139 .statistics()
140 .compute_all(&Stat::all().collect::<Vec<_>>())?;
141 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
142 }
143 .boxed()
144 })
145 .map(move |compress_future| executor.spawn(compress_future))
146 .buffered(self.parallelism);
147
148 self.child
149 .write_stream(
150 ctx,
151 sequence_writer,
152 SequentialStreamAdapter::new(dtype, stream).sendable(),
153 )
154 .await
155 }
156}