Skip to main content

vortex_btrblocks/schemes/
temporal.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Temporal compression scheme using datetime-part decomposition.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::aggregate_fn::fns::is_constant::is_constant;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::ExtensionArray;
13use vortex_array::arrays::PrimitiveArray;
14use vortex_array::arrays::TemporalArray;
15use vortex_array::arrays::extension::ExtensionArrayExt;
16use vortex_array::arrays::primitive::PrimitiveArrayExt;
17use vortex_array::dtype::extension::Matcher;
18use vortex_array::extension::datetime::AnyTemporal;
19use vortex_array::extension::datetime::TemporalMetadata;
20use vortex_compressor::estimate::CompressionEstimate;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_datetime_parts::DateTimeParts;
23use vortex_datetime_parts::TemporalParts;
24use vortex_datetime_parts::split_temporal;
25use vortex_error::VortexResult;
26
27use crate::ArrayAndStats;
28use crate::CascadingCompressor;
29use crate::CompressorContext;
30use crate::Scheme;
31use crate::SchemeExt;
32
33/// Compression scheme for temporal timestamp arrays via datetime-part decomposition.
34///
35/// Splits timestamps into days, seconds, and subseconds components, compresses each
36/// independently, and wraps the result in a `DateTimePartsArray`.
37#[derive(Debug, Copy, Clone, PartialEq, Eq)]
38pub struct TemporalScheme;
39
40impl Scheme for TemporalScheme {
41    fn scheme_name(&self) -> &'static str {
42        "vortex.ext.temporal"
43    }
44
45    fn matches(&self, canonical: &Canonical) -> bool {
46        let Canonical::Extension(ext) = canonical else {
47            return false;
48        };
49
50        let ext_dtype = ext.ext_dtype();
51
52        matches!(
53            AnyTemporal::try_match(ext_dtype),
54            Some(TemporalMetadata::Timestamp(..))
55        )
56    }
57
58    /// Children: days=0, seconds=1, subseconds=2.
59    fn num_children(&self) -> usize {
60        3
61    }
62
63    fn expected_compression_ratio(
64        &self,
65        _data: &ArrayAndStats,
66        _compress_ctx: CompressorContext,
67        _exec_ctx: &mut ExecutionCtx,
68    ) -> CompressionEstimate {
69        // Temporal compression (splitting into parts) is almost always beneficial.
70        CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
71    }
72
73    fn compress(
74        &self,
75        compressor: &CascadingCompressor,
76        data: &ArrayAndStats,
77        compress_ctx: CompressorContext,
78        exec_ctx: &mut ExecutionCtx,
79    ) -> VortexResult<ArrayRef> {
80        let array = data.array().clone();
81        let ext_array = array.execute::<ExtensionArray>(exec_ctx)?;
82        let temporal_array = TemporalArray::try_from(ext_array.clone().into_array())?;
83
84        // Check for constant array and return early if so.
85        let is_constant = is_constant(&ext_array.clone().into_array(), exec_ctx)?;
86
87        if is_constant {
88            return Ok(
89                ConstantArray::new(ext_array.execute_scalar(0, exec_ctx)?, ext_array.len())
90                    .into_array(),
91            );
92        }
93
94        let dtype = temporal_array.dtype().clone();
95        let TemporalParts {
96            days,
97            seconds,
98            subseconds,
99        } = split_temporal(temporal_array, exec_ctx)?;
100
101        let days_primitive = days.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
102        let days = compressor.compress_child(
103            &days_primitive.into_array(),
104            &compress_ctx,
105            self.id(),
106            0,
107            exec_ctx,
108        )?;
109        let seconds_primitive = seconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
110        let seconds = compressor.compress_child(
111            &seconds_primitive.into_array(),
112            &compress_ctx,
113            self.id(),
114            1,
115            exec_ctx,
116        )?;
117        let subseconds_primitive = subseconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
118        let subseconds = compressor.compress_child(
119            &subseconds_primitive.into_array(),
120            &compress_ctx,
121            self.id(),
122            2,
123            exec_ctx,
124        )?;
125
126        Ok(DateTimeParts::try_new(dtype, days, seconds, subseconds)?.into_array())
127    }
128}