vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::stats::Stat;
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_btrblocks::BtrBlocksCompressor;
11use vortex_error::VortexResult;
12use vortex_io::runtime::Handle;
13
14use crate::segments::SegmentSinkRef;
15use crate::sequence::{
16 SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt,
17};
18use crate::{LayoutRef, LayoutStrategy};
19
20pub trait CompressorPlugin: Send + Sync + 'static {
27 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
28}
29
30impl CompressorPlugin for Arc<dyn CompressorPlugin> {
31 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
32 self.as_ref().compress_chunk(chunk)
33 }
34}
35
36impl<F> CompressorPlugin for F
37where
38 F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
39{
40 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
41 self(chunk)
42 }
43}
44
45impl CompressorPlugin for BtrBlocksCompressor {
46 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
47 self.compress(chunk)
48 }
49}
50
51#[cfg(feature = "zstd")]
52impl CompressorPlugin for crate::layouts::compact::CompactCompressor {
53 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
54 self.compress(chunk)
55 }
56}
57
58#[derive(Clone)]
60pub struct CompressingStrategy {
61 child: Arc<dyn LayoutStrategy>,
62 compressor: Arc<dyn CompressorPlugin>,
63 concurrency: usize,
64}
65
66impl CompressingStrategy {
67 pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
74 Self::new(
75 child,
76 Arc::new(BtrBlocksCompressor {
77 exclude_int_dict_encoding,
78 }),
79 )
80 }
81
82 #[cfg(feature = "zstd")]
90 pub fn new_compact<S: LayoutStrategy>(
91 child: S,
92 compressor: crate::layouts::compact::CompactCompressor,
93 ) -> Self {
94 Self::new(child, Arc::new(compressor))
95 }
96
97 pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
99 Self::new(child, Arc::new(compressor))
100 }
101
102 fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
103 Self {
104 child: Arc::new(child),
105 compressor,
106 concurrency: std::thread::available_parallelism()
107 .map(|v| v.get())
108 .unwrap_or(1),
109 }
110 }
111
112 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
113 self.concurrency = concurrency;
114 self
115 }
116}
117
118#[async_trait]
119impl LayoutStrategy for CompressingStrategy {
120 async fn write_stream(
121 &self,
122 ctx: ArrayContext,
123 segment_sink: SegmentSinkRef,
124 stream: SendableSequentialStream,
125 eof: SequencePointer,
126 handle: Handle,
127 ) -> VortexResult<LayoutRef> {
128 let dtype = stream.dtype().clone();
129 let compressor = self.compressor.clone();
130
131 let handle2 = handle.clone();
132 let stream = stream
133 .map(move |chunk| {
134 let compressor = compressor.clone();
135 handle2.spawn_cpu(move || {
136 let (sequence_id, chunk) = chunk?;
137 chunk
139 .statistics()
140 .compute_all(&Stat::all().collect::<Vec<_>>())?;
141 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
142 })
143 })
144 .buffered(self.concurrency);
145
146 self.child
147 .write_stream(
148 ctx,
149 segment_sink,
150 SequentialStreamAdapter::new(dtype, stream).sendable(),
151 eof,
152 handle,
153 )
154 .await
155 }
156
157 fn buffered_bytes(&self) -> u64 {
158 self.child.buffered_bytes()
159 }
160}