vortex-btrblocks 0.70.0

BtrBlocks style compressor
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Temporal compression scheme using datetime-part decomposition.

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::aggregate_fn::fns::is_constant::is_constant;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::ExtensionArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::TemporalArray;
use vortex_array::arrays::extension::ExtensionArrayExt;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::dtype::extension::Matcher;
use vortex_array::extension::datetime::AnyTemporal;
use vortex_array::extension::datetime::TemporalMetadata;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_datetime_parts::DateTimeParts;
use vortex_datetime_parts::TemporalParts;
use vortex_datetime_parts::split_temporal;
use vortex_error::VortexResult;

use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;

/// Compression scheme for temporal timestamp arrays via datetime-part decomposition.
///
/// Splits timestamps into days, seconds, and subseconds components, compresses each
/// independently, and wraps the result in a `DateTimePartsArray`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct TemporalScheme;

impl Scheme for TemporalScheme {
    fn scheme_name(&self) -> &'static str {
        "vortex.ext.temporal"
    }

    fn matches(&self, canonical: &Canonical) -> bool {
        let Canonical::Extension(ext) = canonical else {
            return false;
        };

        let ext_dtype = ext.ext_dtype();

        matches!(
            AnyTemporal::try_match(ext_dtype),
            Some(TemporalMetadata::Timestamp(..))
        )
    }

    /// Children: days=0, seconds=1, subseconds=2.
    fn num_children(&self) -> usize {
        3
    }

    fn expected_compression_ratio(
        &self,
        _data: &ArrayAndStats,
        _compress_ctx: CompressorContext,
        _exec_ctx: &mut ExecutionCtx,
    ) -> CompressionEstimate {
        // Temporal compression (splitting into parts) is almost always beneficial.
        CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
    }

    fn compress(
        &self,
        compressor: &CascadingCompressor,
        data: &ArrayAndStats,
        compress_ctx: CompressorContext,
        exec_ctx: &mut ExecutionCtx,
    ) -> VortexResult<ArrayRef> {
        let array = data.array().clone();
        let ext_array = array.execute::<ExtensionArray>(exec_ctx)?;
        let temporal_array = TemporalArray::try_from(ext_array.clone().into_array())?;

        // Check for constant array and return early if so.
        let is_constant = is_constant(&ext_array.clone().into_array(), exec_ctx)?;

        if is_constant {
            return Ok(
                ConstantArray::new(ext_array.execute_scalar(0, exec_ctx)?, ext_array.len())
                    .into_array(),
            );
        }

        let dtype = temporal_array.dtype().clone();
        let TemporalParts {
            days,
            seconds,
            subseconds,
        } = split_temporal(temporal_array, exec_ctx)?;

        let days_primitive = days.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
        let days = compressor.compress_child(
            &days_primitive.into_array(),
            &compress_ctx,
            self.id(),
            0,
            exec_ctx,
        )?;
        let seconds_primitive = seconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
        let seconds = compressor.compress_child(
            &seconds_primitive.into_array(),
            &compress_ctx,
            self.id(),
            1,
            exec_ctx,
        )?;
        let subseconds_primitive = subseconds.execute::<PrimitiveArray>(exec_ctx)?.narrow()?;
        let subseconds = compressor.compress_child(
            &subseconds_primitive.into_array(),
            &compress_ctx,
            self.id(),
            2,
            exec_ctx,
        )?;

        Ok(DateTimeParts::try_new(dtype, days, seconds, subseconds)?.into_array())
    }
}