vortex_btrblocks/schemes/
temporal.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::aggregate_fn::fns::is_constant::is_constant;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::ExtensionArray;
13use vortex_array::arrays::PrimitiveArray;
14use vortex_array::arrays::TemporalArray;
15use vortex_array::arrays::extension::ExtensionArrayExt;
16use vortex_array::arrays::primitive::PrimitiveArrayExt;
17use vortex_array::dtype::extension::Matcher;
18use vortex_array::extension::datetime::AnyTemporal;
19use vortex_array::extension::datetime::TemporalMetadata;
20use vortex_compressor::estimate::CompressionEstimate;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_datetime_parts::DateTimeParts;
23use vortex_datetime_parts::TemporalParts;
24use vortex_datetime_parts::split_temporal;
25use vortex_error::VortexResult;
26
27use crate::ArrayAndStats;
28use crate::CascadingCompressor;
29use crate::CompressorContext;
30use crate::Scheme;
31use crate::SchemeExt;
32
33#[derive(Debug, Copy, Clone, PartialEq, Eq)]
38pub struct TemporalScheme;
39
40impl Scheme for TemporalScheme {
41 fn scheme_name(&self) -> &'static str {
42 "vortex.ext.temporal"
43 }
44
45 fn matches(&self, canonical: &Canonical) -> bool {
46 let Canonical::Extension(ext) = canonical else {
47 return false;
48 };
49
50 let ext_dtype = ext.ext_dtype();
51
52 matches!(
53 AnyTemporal::try_match(ext_dtype),
54 Some(TemporalMetadata::Timestamp(..))
55 )
56 }
57
58 fn num_children(&self) -> usize {
60 3
61 }
62
63 fn expected_compression_ratio(
64 &self,
65 _data: &ArrayAndStats,
66 _compress_ctx: CompressorContext,
67 _exec_ctx: &mut ExecutionCtx,
68 ) -> CompressionEstimate {
69 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
71 }
72
73 fn compress(
74 &self,
75 compressor: &CascadingCompressor,
76 data: &ArrayAndStats,
77 compress_ctx: CompressorContext,
78 exec_ctx: &mut ExecutionCtx,
79 ) -> VortexResult<ArrayRef> {
80 let array = data.array().clone();
81 let ext_array = array.execute::<ExtensionArray>(exec_ctx)?;
82 let temporal_array = TemporalArray::try_from(ext_array.clone().into_array())?;
83
84 let is_constant = is_constant(&ext_array.clone().into_array(), exec_ctx)?;
86
87 if is_constant {
88 return Ok(
89 ConstantArray::new(ext_array.execute_scalar(0, exec_ctx)?, ext_array.len())
90 .into_array(),
91 );
92 }
93
94 let dtype = temporal_array.dtype().clone();
95 let TemporalParts {
96 days,
97 seconds,
98 subseconds,
99 } = split_temporal(temporal_array, exec_ctx)?;
100
101 let days_primitive = days.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
102 let days = compressor.compress_child(
103 &days_primitive.into_array(),
104 &compress_ctx,
105 self.id(),
106 0,
107 exec_ctx,
108 )?;
109 let seconds_primitive = seconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
110 let seconds = compressor.compress_child(
111 &seconds_primitive.into_array(),
112 &compress_ctx,
113 self.id(),
114 1,
115 exec_ctx,
116 )?;
117 let subseconds_primitive = subseconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
118 let subseconds = compressor.compress_child(
119 &subseconds_primitive.into_array(),
120 &compress_ctx,
121 self.id(),
122 2,
123 exec_ctx,
124 )?;
125
126 Ok(DateTimeParts::try_new(dtype, days, seconds, subseconds)?.into_array())
127 }
128}