vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::ArrayContext;
9use vortex_array::ArrayRef;
10use vortex_array::ExecutionCtx;
11use vortex_array::VortexSessionExecute;
12use vortex_array::expr::stats::Stat;
13use vortex_btrblocks::BtrBlocksCompressor;
14use vortex_error::VortexResult;
15use vortex_io::session::RuntimeSessionExt;
16use vortex_session::VortexSession;
17use vortex_utils::parallelism::get_available_parallelism;
18
19use crate::LayoutRef;
20use crate::LayoutStrategy;
21use crate::segments::SegmentSinkRef;
22use crate::sequence::SendableSequentialStream;
23use crate::sequence::SequencePointer;
24use crate::sequence::SequentialStreamAdapter;
25use crate::sequence::SequentialStreamExt;
26
27pub trait CompressorPlugin: Send + Sync + 'static {
31 fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef>;
32}
33
34impl CompressorPlugin for Arc<dyn CompressorPlugin> {
35 fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
36 self.as_ref().compress_chunk(chunk, ctx)
37 }
38}
39
40impl<F> CompressorPlugin for F
41where
42 F: Fn(&ArrayRef, &mut ExecutionCtx) -> VortexResult<ArrayRef> + Send + Sync + 'static,
43{
44 fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
45 self(chunk, ctx)
46 }
47}
48
49impl CompressorPlugin for BtrBlocksCompressor {
50 fn compress_chunk(&self, chunk: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
51 self.compress(chunk, ctx)
52 }
53}
54
55#[derive(Clone)]
57pub struct CompressingStrategy {
58 child: Arc<dyn LayoutStrategy>,
59 compressor: Arc<dyn CompressorPlugin>,
60 stats: Arc<[Stat]>,
61 concurrency: usize,
62}
63
64impl CompressingStrategy {
65 pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
67 Self {
68 child: Arc::new(child),
69 compressor: Arc::new(compressor),
70 stats: Stat::all().collect(),
71 concurrency: get_available_parallelism().unwrap_or(1),
72 }
73 }
74
75 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
76 self.concurrency = concurrency;
77 self
78 }
79
80 pub fn with_stats(mut self, stats: &[Stat]) -> Self {
83 self.stats = stats.into();
84 self
85 }
86}
87
88#[async_trait]
89impl LayoutStrategy for CompressingStrategy {
90 async fn write_stream(
91 &self,
92 ctx: ArrayContext,
93 segment_sink: SegmentSinkRef,
94 stream: SendableSequentialStream,
95 eof: SequencePointer,
96 session: &VortexSession,
97 ) -> VortexResult<LayoutRef> {
98 let dtype = stream.dtype().clone();
99 let compressor = Arc::clone(&self.compressor);
100 let stats = Arc::clone(&self.stats);
101 let session = session.clone();
102 let compute_session = session.clone();
103
104 let handle = session.handle();
105 let stream = stream
106 .map(move |chunk| {
107 let compressor = Arc::clone(&compressor);
108 let stats = Arc::clone(&stats);
109 let session = compute_session.clone();
110 handle.spawn_cpu(move || {
111 let (sequence_id, chunk) = chunk?;
112 let mut ctx = session.create_execution_ctx();
113 chunk.statistics().compute_all(&stats, &mut ctx)?;
115 Ok((sequence_id, compressor.compress_chunk(&chunk, &mut ctx)?))
116 })
117 })
118 .buffered(self.concurrency);
119
120 self.child
121 .write_stream(
122 ctx,
123 segment_sink,
124 SequentialStreamAdapter::new(dtype, stream).sendable(),
125 eof,
126 &session,
127 )
128 .await
129 }
130
131 fn buffered_bytes(&self) -> u64 {
132 self.child.buffered_bytes()
133 }
134}