use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt as _;
use vortex_array::ArrayContext;
use vortex_array::ArrayRef;
use vortex_array::expr::stats::Stat;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_error::VortexResult;
use vortex_io::session::RuntimeSessionExt;
use vortex_session::VortexSession;
use crate::LayoutRef;
use crate::LayoutStrategy;
use crate::segments::SegmentSinkRef;
use crate::sequence::SendableSequentialStream;
use crate::sequence::SequencePointer;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;
pub trait CompressorPlugin: Send + Sync + 'static {
fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef>;
}
impl CompressorPlugin for Arc<dyn CompressorPlugin> {
fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
self.as_ref().compress_chunk(chunk)
}
}
impl<F> CompressorPlugin for F
where
F: Fn(&ArrayRef) -> VortexResult<ArrayRef> + Send + Sync + 'static,
{
fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
self(chunk)
}
}
impl CompressorPlugin for BtrBlocksCompressor {
fn compress_chunk(&self, chunk: &ArrayRef) -> VortexResult<ArrayRef> {
self.compress(chunk)
}
}
#[derive(Clone)]
pub struct CompressingStrategy {
child: Arc<dyn LayoutStrategy>,
compressor: Arc<dyn CompressorPlugin>,
concurrency: usize,
}
impl CompressingStrategy {
pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
Self {
child: Arc::new(child),
compressor: Arc::new(compressor),
concurrency: std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1),
}
}
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
}
#[async_trait]
impl LayoutStrategy for CompressingStrategy {
async fn write_stream(
&self,
ctx: ArrayContext,
segment_sink: SegmentSinkRef,
stream: SendableSequentialStream,
eof: SequencePointer,
session: &VortexSession,
) -> VortexResult<LayoutRef> {
let dtype = stream.dtype().clone();
let compressor = Arc::clone(&self.compressor);
let handle = session.handle();
let stream = stream
.map(move |chunk| {
let compressor = Arc::clone(&compressor);
handle.spawn_cpu(move || {
let (sequence_id, chunk) = chunk?;
chunk
.statistics()
.compute_all(&Stat::all().collect::<Vec<_>>())?;
Ok((sequence_id, compressor.compress_chunk(&chunk)?))
})
})
.buffered(self.concurrency);
self.child
.write_stream(
ctx,
segment_sink,
SequentialStreamAdapter::new(dtype, stream).sendable(),
eof,
session,
)
.await
}
fn buffered_bytes(&self) -> u64 {
self.child.buffered_bytes()
}
}