vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::Array;
9use vortex_array::ArrayContext;
10use vortex_array::ArrayRef;
11use vortex_array::expr::stats::Stat;
12use vortex_btrblocks::BtrBlocksCompressor;
13use vortex_error::VortexResult;
14use vortex_io::runtime::Handle;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStreamAdapter;
22use crate::sequence::SequentialStreamExt;
23
24pub trait CompressorPlugin: Send + Sync + 'static {
31 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
32}
33
34impl CompressorPlugin for Arc<dyn CompressorPlugin> {
35 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
36 self.as_ref().compress_chunk(chunk)
37 }
38}
39
40impl<F> CompressorPlugin for F
41where
42 F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
43{
44 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
45 self(chunk)
46 }
47}
48
49impl CompressorPlugin for BtrBlocksCompressor {
50 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
51 self.compress(chunk)
52 }
53}
54
55#[cfg(feature = "zstd")]
56impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
57 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
58 self.compress(chunk)
59 }
60}
61
62#[derive(Clone)]
64pub struct CompressingStrategy {
65 child: Arc<dyn LayoutStrategy>,
66 compressor: Arc<dyn CompressorPlugin>,
67 concurrency: usize,
68}
69
70impl CompressingStrategy {
71 pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
78 Self::new(
79 child,
80 Arc::new(BtrBlocksCompressor {
81 exclude_int_dict_encoding,
82 }),
83 )
84 }
85
86 #[cfg(feature = "zstd")]
94 pub fn new_compact<S: LayoutStrategy>(
95 child: S,
96 compressor: crate::layouts::compact::CompactCompressor,
97 ) -> Self {
98 Self::new(child, Arc::new(compressor))
99 }
100
101 pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
103 Self::new(child, Arc::new(compressor))
104 }
105
106 fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
107 Self {
108 child: Arc::new(child),
109 compressor,
110 concurrency: std::thread::available_parallelism()
111 .map(|v| v.get())
112 .unwrap_or(1),
113 }
114 }
115
116 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
117 self.concurrency = concurrency;
118 self
119 }
120}
121
122#[async_trait]
123impl LayoutStrategy for CompressingStrategy {
124 async fn write_stream(
125 &self,
126 ctx: ArrayContext,
127 segment_sink: SegmentSinkRef,
128 stream: SendableSequentialStream,
129 eof: SequencePointer,
130 handle: Handle,
131 ) -> VortexResult<LayoutRef> {
132 let dtype = stream.dtype().clone();
133 let compressor = self.compressor.clone();
134
135 let handle2 = handle.clone();
136 let stream = stream
137 .map(move |chunk| {
138 let compressor = compressor.clone();
139 handle2.spawn_cpu(move || {
140 let (sequence_id, chunk) = chunk?;
141 chunk
143 .statistics()
144 .compute_all(&Stat::all().collect::<Vec<_>>())?;
145 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
146 })
147 })
148 .buffered(self.concurrency);
149
150 self.child
151 .write_stream(
152 ctx,
153 segment_sink,
154 SequentialStreamAdapter::new(dtype, stream).sendable(),
155 eof,
156 handle,
157 )
158 .await
159 }
160
161 fn buffered_bytes(&self) -> u64 {
162 self.child.buffered_bytes()
163 }
164}