vortex_btrblocks/schemes/
temporal.rs1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::LEGACY_SESSION;
10use vortex_array::ToCanonical;
11use vortex_array::VortexSessionExecute;
12use vortex_array::aggregate_fn::fns::is_constant::is_constant;
13use vortex_array::arrays::ConstantArray;
14use vortex_array::arrays::TemporalArray;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::dtype::extension::Matcher;
17use vortex_array::extension::datetime::AnyTemporal;
18use vortex_array::extension::datetime::TemporalMetadata;
19use vortex_compressor::estimate::CompressionEstimate;
20use vortex_compressor::estimate::EstimateVerdict;
21use vortex_datetime_parts::DateTimeParts;
22use vortex_datetime_parts::TemporalParts;
23use vortex_datetime_parts::split_temporal;
24use vortex_error::VortexResult;
25
26use crate::ArrayAndStats;
27use crate::CascadingCompressor;
28use crate::CompressorContext;
29use crate::Scheme;
30use crate::SchemeExt;
31
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
37pub struct TemporalScheme;
38
39impl Scheme for TemporalScheme {
40 fn scheme_name(&self) -> &'static str {
41 "vortex.ext.temporal"
42 }
43
44 fn matches(&self, canonical: &Canonical) -> bool {
45 let Canonical::Extension(ext) = canonical else {
46 return false;
47 };
48
49 let ext_dtype = ext.ext_dtype();
50
51 matches!(
52 AnyTemporal::try_match(ext_dtype),
53 Some(TemporalMetadata::Timestamp(..))
54 )
55 }
56
57 fn num_children(&self) -> usize {
59 3
60 }
61
62 fn expected_compression_ratio(
63 &self,
64 _data: &mut ArrayAndStats,
65 _ctx: CompressorContext,
66 ) -> CompressionEstimate {
67 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
69 }
70
71 fn compress(
72 &self,
73 compressor: &CascadingCompressor,
74 data: &mut ArrayAndStats,
75 ctx: CompressorContext,
76 ) -> VortexResult<ArrayRef> {
77 let array = data.array().clone();
78 let ext_array = array.to_extension();
79 let temporal_array = TemporalArray::try_from(ext_array.clone().into_array())?;
80
81 let is_constant = is_constant(
83 &ext_array.clone().into_array(),
84 &mut compressor.execution_ctx(),
85 )?;
86
87 if is_constant {
88 return Ok(ConstantArray::new(
89 ext_array.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?,
90 ext_array.len(),
91 )
92 .into_array());
93 }
94
95 let dtype = temporal_array.dtype().clone();
96 let TemporalParts {
97 days,
98 seconds,
99 subseconds,
100 } = split_temporal(temporal_array)?;
101
102 let days = compressor.compress_child(
103 &days.to_primitive().narrow()?.into_array(),
104 &ctx,
105 self.id(),
106 0,
107 )?;
108 let seconds = compressor.compress_child(
109 &seconds.to_primitive().narrow()?.into_array(),
110 &ctx,
111 self.id(),
112 1,
113 )?;
114 let subseconds = compressor.compress_child(
115 &subseconds.to_primitive().narrow()?.into_array(),
116 &ctx,
117 self.id(),
118 2,
119 )?;
120
121 Ok(DateTimeParts::try_new(dtype, days, seconds, subseconds)?.into_array())
122 }
123}