vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::ArrayContext;
9use vortex_array::ArrayRef;
10use vortex_array::VortexSessionExecute;
11use vortex_array::expr::stats::Stat;
12use vortex_btrblocks::BtrBlocksCompressor;
13use vortex_error::VortexResult;
14use vortex_io::session::RuntimeSessionExt;
15use vortex_session::VortexSession;
16
17use crate::LayoutRef;
18use crate::LayoutStrategy;
19use crate::segments::SegmentSinkRef;
20use crate::sequence::SendableSequentialStream;
21use crate::sequence::SequencePointer;
22use crate::sequence::SequentialStreamAdapter;
23use crate::sequence::SequentialStreamExt;
24
25pub trait CompressorPlugin: Send + Sync + 'static {
29 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef>;
30}
31
32impl CompressorPlugin for Arc<dyn CompressorPlugin> {
33 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
34 self.as_ref().compress_chunk(chunk)
35 }
36}
37
38impl<F> CompressorPlugin for F
39where
40 F: Fn(&ArrayRef) -> VortexResult<ArrayRef> + Send + Sync + 'static,
41{
42 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
43 self(chunk)
44 }
45}
46
47impl CompressorPlugin for BtrBlocksCompressor {
48 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
49 self.compress(chunk)
50 }
51}
52
53#[derive(Clone)]
55pub struct CompressingStrategy {
56 child: Arc<dyn LayoutStrategy>,
57 compressor: Arc<dyn CompressorPlugin>,
58 stats: Arc<[Stat]>,
59 concurrency: usize,
60}
61
62impl CompressingStrategy {
63 pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
65 Self {
66 child: Arc::new(child),
67 compressor: Arc::new(compressor),
68 stats: Stat::all().collect(),
69 concurrency: std::thread::available_parallelism()
70 .map(|v| v.get())
71 .unwrap_or(1),
72 }
73 }
74
75 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
76 self.concurrency = concurrency;
77 self
78 }
79
80 pub fn with_stats(mut self, stats: &[Stat]) -> Self {
83 self.stats = stats.into();
84 self
85 }
86}
87
88#[async_trait]
89impl LayoutStrategy for CompressingStrategy {
90 async fn write_stream(
91 &self,
92 ctx: ArrayContext,
93 segment_sink: SegmentSinkRef,
94 stream: SendableSequentialStream,
95 eof: SequencePointer,
96 session: &VortexSession,
97 ) -> VortexResult<LayoutRef> {
98 let dtype = stream.dtype().clone();
99 let compressor = Arc::clone(&self.compressor);
100 let stats = Arc::clone(&self.stats);
101 let session = session.clone();
102 let compute_session = session.clone();
103
104 let handle = session.handle();
105 let stream = stream
106 .map(move |chunk| {
107 let compressor = Arc::clone(&compressor);
108 let stats = Arc::clone(&stats);
109 let session = compute_session.clone();
110 handle.spawn_cpu(move || {
111 let (sequence_id, chunk) = chunk?;
112 chunk
114 .statistics()
115 .compute_all(&stats, &mut session.create_execution_ctx())?;
116 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
117 })
118 })
119 .buffered(self.concurrency);
120
121 self.child
122 .write_stream(
123 ctx,
124 segment_sink,
125 SequentialStreamAdapter::new(dtype, stream).sendable(),
126 eof,
127 &session,
128 )
129 .await
130 }
131
132 fn buffered_bytes(&self) -> u64 {
133 self.child.buffered_bytes()
134 }
135}