vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{FutureExt as _, StreamExt as _};
8use vortex_array::stats::Stat;
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_btrblocks::BtrBlocksCompressor;
11use vortex_error::VortexResult;
12
13use crate::segments::SequenceWriter;
14use crate::{
15 LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
16 SequentialStreamExt as _, TaskExecutor, TaskExecutorExt as _,
17};
18
19pub trait CompressorPlugin: Send + Sync + 'static {
26 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
27}
28
29impl CompressorPlugin for Arc<dyn CompressorPlugin> {
30 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
31 self.as_ref().compress_chunk(chunk)
32 }
33}
34
35impl<F> CompressorPlugin for F
36where
37 F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
38{
39 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
40 self(chunk)
41 }
42}
43
44impl CompressorPlugin for BtrBlocksCompressor {
45 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
46 self.compress(chunk)
47 }
48}
49
50#[cfg(feature = "zstd")]
51impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
52 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
53 self.compress(chunk)
54 }
55}
56
57#[derive(Clone)]
59pub struct CompressingStrategy<S> {
60 child: S,
61 compressor: Arc<dyn CompressorPlugin>,
62 executor: Arc<dyn TaskExecutor>,
63 parallelism: usize,
64}
65
66impl<S: LayoutStrategy> CompressingStrategy<S> {
67 pub fn new_btrblocks(
74 child: S,
75 executor: Arc<dyn TaskExecutor>,
76 parallelism: usize,
77 exclude_int_dict_encoding: bool,
78 ) -> Self {
79 Self {
80 child,
81 compressor: Arc::new(BtrBlocksCompressor {
82 exclude_int_dict_encoding,
83 }),
84 executor,
85 parallelism,
86 }
87 }
88
89 #[cfg(feature = "zstd")]
97 pub fn new_compact(
98 child: S,
99 compressor: crate::layouts::compact::CompactCompressor,
100 executor: Arc<dyn TaskExecutor>,
101 parallelism: usize,
102 ) -> Self {
103 Self {
104 child,
105 compressor: Arc::new(compressor),
106 executor,
107 parallelism,
108 }
109 }
110
111 pub fn new_opaque<C: CompressorPlugin>(
113 child: S,
114 compressor: C,
115 executor: Arc<dyn TaskExecutor>,
116 parallelism: usize,
117 ) -> Self {
118 Self {
119 child,
120 compressor: Arc::new(compressor),
121 executor,
122 parallelism,
123 }
124 }
125}
126
127#[async_trait]
128impl<S> LayoutStrategy for CompressingStrategy<S>
129where
130 S: LayoutStrategy,
131{
132 async fn write_stream(
133 &self,
134 ctx: &ArrayContext,
135 sequence_writer: SequenceWriter,
136 stream: SendableSequentialStream,
137 ) -> VortexResult<LayoutRef> {
138 let dtype = stream.dtype().clone();
139 let compressor = self.compressor.clone();
140 let executor = self.executor.clone();
141
142 let stream = stream
143 .map(move |chunk| {
144 let compressor = compressor.clone();
145 async move {
146 let (sequence_id, chunk) = chunk?;
147 chunk
149 .statistics()
150 .compute_all(&Stat::all().collect::<Vec<_>>())?;
151 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
152 }
153 .boxed()
154 })
155 .map(move |compress_future| executor.spawn(compress_future))
156 .buffered(self.parallelism);
157
158 self.child
159 .write_stream(
160 ctx,
161 sequence_writer,
162 SequentialStreamAdapter::new(dtype, stream).sendable(),
163 )
164 .await
165 }
166}