use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::aggregate_fn::fns::is_constant::is_constant;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ExtensionArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::TemporalArray;
use vortex_array::arrays::extension::ExtensionArrayExt;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::dtype::extension::Matcher;
use vortex_array::extension::datetime::AnyTemporal;
use vortex_array::extension::datetime::TemporalMetadata;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_datetime_parts::DateTimeParts;
use vortex_datetime_parts::TemporalParts;
use vortex_datetime_parts::split_temporal;
use vortex_error::VortexResult;
use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct TemporalScheme;
impl Scheme for TemporalScheme {
fn scheme_name(&self) -> &'static str {
"vortex.ext.temporal"
}
fn matches(&self, canonical: &Canonical) -> bool {
let Canonical::Extension(ext) = canonical else {
return false;
};
let ext_dtype = ext.ext_dtype();
matches!(
AnyTemporal::try_match(ext_dtype),
Some(TemporalMetadata::Timestamp(..))
)
}
fn num_children(&self) -> usize {
3
}
fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
}
fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let array = data.array().clone();
let ext_array = array.execute::<ExtensionArray>(exec_ctx)?;
let temporal_array = TemporalArray::try_from(ext_array.clone().into_array())?;
let is_constant = is_constant(&ext_array.clone().into_array(), exec_ctx)?;
if is_constant {
return Ok(
ConstantArray::new(ext_array.execute_scalar(0, exec_ctx)?, ext_array.len())
.into_array(),
);
}
let dtype = temporal_array.dtype().clone();
let TemporalParts {
days,
seconds,
subseconds,
} = split_temporal(temporal_array, exec_ctx)?;
let days_primitive = days.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
let days = compressor.compress_child(
&days_primitive.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
let seconds_primitive = seconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
let seconds = compressor.compress_child(
&seconds_primitive.into_array(),
&compress_ctx,
self.id(),
1,
exec_ctx,
)?;
let subseconds_primitive = subseconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
let subseconds = compressor.compress_child(
&subseconds_primitive.into_array(),
&compress_ctx,
self.id(),
2,
exec_ctx,
)?;
Ok(DateTimeParts::try_new(dtype, days, seconds, subseconds)?.into_array())
}
}