vortex_layout/layouts/
compressed.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use vortex_array::Array;
9use vortex_array::ArrayContext;
10use vortex_array::ArrayRef;
11use vortex_array::expr::stats::Stat;
12use vortex_btrblocks::BtrBlocksCompressor;
13use vortex_btrblocks::BtrBlocksCompressorBuilder;
14use vortex_btrblocks::IntCode;
15use vortex_error::VortexResult;
16use vortex_io::runtime::Handle;
17
18use crate::LayoutRef;
19use crate::LayoutStrategy;
20use crate::segments::SegmentSinkRef;
21use crate::sequence::SendableSequentialStream;
22use crate::sequence::SequencePointer;
23use crate::sequence::SequentialStreamAdapter;
24use crate::sequence::SequentialStreamExt;
25
26pub trait CompressorPlugin: Send + Sync + 'static {
30 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
31}
32
33impl CompressorPlugin for Arc<dyn CompressorPlugin> {
34 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
35 self.as_ref().compress_chunk(chunk)
36 }
37}
38
39impl<F> CompressorPlugin for F
40where
41 F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
42{
43 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
44 self(chunk)
45 }
46}
47
48impl CompressorPlugin for BtrBlocksCompressor {
49 fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
50 self.compress(chunk)
51 }
52}
53
54#[derive(Clone)]
56pub struct CompressingStrategy {
57 child: Arc<dyn LayoutStrategy>,
58 compressor: Arc<dyn CompressorPlugin>,
59 concurrency: usize,
60}
61
62impl CompressingStrategy {
63 pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
70 let compressor = if exclude_int_dict_encoding {
71 BtrBlocksCompressorBuilder::default()
72 .exclude_int([IntCode::Dict])
73 .build()
74 } else {
75 BtrBlocksCompressor::default()
76 };
77 Self::new(child, Arc::new(compressor))
78 }
79
80 pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
82 Self::new(child, Arc::new(compressor))
83 }
84
85 fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
86 Self {
87 child: Arc::new(child),
88 compressor,
89 concurrency: std::thread::available_parallelism()
90 .map(|v| v.get())
91 .unwrap_or(1),
92 }
93 }
94
95 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
96 self.concurrency = concurrency;
97 self
98 }
99}
100
101#[async_trait]
102impl LayoutStrategy for CompressingStrategy {
103 async fn write_stream(
104 &self,
105 ctx: ArrayContext,
106 segment_sink: SegmentSinkRef,
107 stream: SendableSequentialStream,
108 eof: SequencePointer,
109 handle: Handle,
110 ) -> VortexResult<LayoutRef> {
111 let dtype = stream.dtype().clone();
112 let compressor = self.compressor.clone();
113
114 let handle2 = handle.clone();
115 let stream = stream
116 .map(move |chunk| {
117 let compressor = compressor.clone();
118 handle2.spawn_cpu(move || {
119 let (sequence_id, chunk) = chunk?;
120 chunk
122 .statistics()
123 .compute_all(&Stat::all().collect::<Vec<_>>())?;
124 Ok((sequence_id, compressor.compress_chunk(&chunk)?))
125 })
126 })
127 .buffered(self.concurrency);
128
129 self.child
130 .write_stream(
131 ctx,
132 segment_sink,
133 SequentialStreamAdapter::new(dtype, stream).sendable(),
134 eof,
135 handle,
136 )
137 .await
138 }
139
140 fn buffered_bytes(&self) -> u64 {
141 self.child.buffered_bytes()
142 }
143}