use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt as _;
use parking_lot::Mutex;
use vortex_array::ArrayContext;
use vortex_array::IntoArray;
use vortex_array::expr::stats::Stat;
use vortex_array::stats::PRUNING_STATS;
use vortex_error::VortexResult;
use vortex_io::session::RuntimeSessionExt;
use vortex_session::VortexSession;
use crate::IntoLayout;
use crate::LayoutRef;
use crate::LayoutStrategy;
use crate::layouts::zoned::ZonedLayout;
use crate::layouts::zoned::zone_map::StatsAccumulator;
use crate::segments::SegmentSinkRef;
use crate::sequence::SendableSequentialStream;
use crate::sequence::SequencePointer;
use crate::sequence::SequentialArrayStreamExt;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;
pub struct ZonedLayoutOptions {
pub block_size: usize,
pub stats: Arc<[Stat]>,
pub max_variable_length_statistics_size: usize,
pub concurrency: usize,
}
impl Default for ZonedLayoutOptions {
fn default() -> Self {
Self {
block_size: 8192,
stats: PRUNING_STATS.into(),
max_variable_length_statistics_size: 64,
concurrency: std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1),
}
}
}
pub struct ZonedStrategy {
child: Arc<dyn LayoutStrategy>,
stats: Arc<dyn LayoutStrategy>,
options: ZonedLayoutOptions,
}
impl ZonedStrategy {
pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
child: Child,
stats: Stats,
options: ZonedLayoutOptions,
) -> Self {
Self {
child: Arc::new(child),
stats: Arc::new(stats),
options,
}
}
}
#[async_trait]
impl LayoutStrategy for ZonedStrategy {
async fn write_stream(
&self,
ctx: ArrayContext,
segment_sink: SegmentSinkRef,
stream: SendableSequentialStream,
mut eof: SequencePointer,
session: &VortexSession,
) -> VortexResult<LayoutRef> {
let stats = Arc::clone(&self.options.stats);
let handle = session.handle();
let handle2 = handle.clone();
let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
stream.dtype(),
&stats,
self.options.max_variable_length_statistics_size,
)));
let stream = SequentialStreamAdapter::new(
stream.dtype().clone(),
stream
.map(move |chunk| {
let stats = Arc::clone(&stats);
handle2.spawn_cpu(move || {
let (sequence_id, chunk) = chunk?;
chunk.statistics().compute_all(&stats)?;
VortexResult::Ok((sequence_id, chunk))
})
})
.buffered(self.options.concurrency),
)
.sendable();
let stats_accumulator2 = Arc::clone(&stats_accumulator);
let stream = SequentialStreamAdapter::new(
stream.dtype().clone(),
stream.map(move |item| {
let (sequence_id, chunk) = item?;
stats_accumulator2
.lock()
.push_chunk_without_compute(&chunk)?;
Ok((sequence_id, chunk))
}),
)
.sendable();
let block_size = self.options.block_size;
let data_eof = eof.split_off();
let data_layout = self
.child
.write_stream(
ctx.clone(),
Arc::clone(&segment_sink),
stream,
data_eof,
session,
)
.await?;
let Some(stats_table) = stats_accumulator.lock().as_stats_table()? else {
return Ok(data_layout);
};
let stats_stream = stats_table
.array()
.clone()
.into_array()
.to_array_stream()
.sequenced(eof.split_off());
let zones_layout = self
.stats
.write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, session)
.await?;
Ok(ZonedLayout::new(
data_layout,
zones_layout,
block_size,
Arc::clone(stats_table.present_stats()),
)
.into_layout())
}
fn buffered_bytes(&self) -> u64 {
self.child.buffered_bytes() + self.stats.buffered_bytes()
}
}