vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::ArrayContext;
9use vortex_array::ArrayRef;
10use vortex_array::expr::stats::Stat;
11use vortex_btrblocks::BtrBlocksCompressor;
12use vortex_error::VortexResult;
13use vortex_io::session::RuntimeSessionExt;
14use vortex_session::VortexSession;
15
16use crate::LayoutRef;
17use crate::LayoutStrategy;
18use crate::segments::SegmentSinkRef;
19use crate::sequence::SendableSequentialStream;
20use crate::sequence::SequencePointer;
21use crate::sequence::SequentialStreamAdapter;
22use crate::sequence::SequentialStreamExt;
23
24pub trait CompressorPlugin: Send + Sync + 'static {
28 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef>;
29}
30
31impl CompressorPlugin for Arc<dyn CompressorPlugin> {
32 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
33 self.as_ref().compress_chunk(chunk)
34 }
35}
36
37impl<F> CompressorPlugin for F
38where
39 F: Fn(&ArrayRef) -> VortexResult<ArrayRef> + Send + Sync + 'static,
40{
41 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
42 self(chunk)
43 }
44}
45
46impl CompressorPlugin for BtrBlocksCompressor {
47 fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
48 self.compress(chunk)
49 }
50}
51
52#[derive(Clone)]
54pub struct CompressingStrategy {
55 child: Arc<dyn LayoutStrategy>,
56 compressor: Arc<dyn CompressorPlugin>,
57 concurrency: usize,
58}
59
60impl CompressingStrategy {
61 pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
63 Self {
64 child: Arc::new(child),
65 compressor: Arc::new(compressor),
66 concurrency: std::thread::available_parallelism()
67 .map(|v| v.get())
68 .unwrap_or(1),
69 }
70 }
71
72 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
73 self.concurrency = concurrency;
74 self
75 }
76}
77
78#[async_trait]
79impl LayoutStrategy for CompressingStrategy {
80 async fn write_stream(
81 &self,
82 ctx: ArrayContext,
83 segment_sink: SegmentSinkRef,
84 stream: SendableSequentialStream,
85 eof: SequencePointer,
86 session: &VortexSession,
87 ) -> VortexResult<LayoutRef> {
88 let dtype = stream.dtype().clone();
89 let compressor = Arc::clone(&self.compressor);
90
91 let handle = session.handle();
92 let stream = stream
93 .map(move |chunk| {
94 let compressor = Arc::clone(&compressor);
95 handle.spawn_cpu(move || {
96 let (sequence_id, chunk) = chunk?;
97 chunk
99 .statistics()
100 .compute_all(&Stat::all().collect::<Vec<_>>())?;
101 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
102 })
103 })
104 .buffered(self.concurrency);
105
106 self.child
107 .write_stream(
108 ctx,
109 segment_sink,
110 SequentialStreamAdapter::new(dtype, stream).sendable(),
111 eof,
112 session,
113 )
114 .await
115 }
116
117 fn buffered_bytes(&self) -> u64 {
118 self.child.buffered_bytes()
119 }
120}