vortex-layout 0.70.0

Vortex layouts provide a way to perform lazy push-down scans over abstract storage
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Read-time pruning support for zoned layouts.

use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::OnceLock;

use futures::FutureExt;
use futures::TryFutureExt;
use futures::future::BoxFuture;
use futures::future::Shared;
use parking_lot::RwLock;
use vortex_array::MaskFuture;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::StructArray;
use vortex_array::dtype::FieldPath;
use vortex_array::dtype::FieldPathSet;
use vortex_array::expr::Expression;
use vortex_array::expr::pruning::checked_pruning_expr;
use vortex_array::expr::root;
use vortex_array::expr::stats::Stat;
use vortex_array::scalar_fn::fns::dynamic::DynamicExprUpdates;
use vortex_error::SharedVortexResult;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_session::VortexSession;
use vortex_utils::aliases::dash_map::DashMap;

use crate::LazyReaderChildren;
use crate::layouts::zoned::ZonedLayout;
use crate::layouts::zoned::zone_map::ZoneMap;

type SharedZoneMap = Shared<BoxFuture<'static, SharedVortexResult<ZoneMap>>>;
pub(super) type SharedPruningResult =
    Shared<BoxFuture<'static, SharedVortexResult<Arc<PruningResult>>>>;
type PredicateCache = Arc<OnceLock<Option<Expression>>>;

pub(super) struct PruningState {
    zone_count: usize,
    row_count: u64,
    zone_len: u64,
    present_stats: Arc<[Stat]>,
    lazy_children: Arc<LazyReaderChildren>,
    session: VortexSession,

    pruning_result: LazyLock<DashMap<Expression, Option<SharedPruningResult>>>,
    zone_map: OnceLock<SharedZoneMap>,
    pruning_predicates: LazyLock<Arc<DashMap<Expression, PredicateCache>>>,
}

impl PruningState {
    pub(super) fn new(
        layout: &ZonedLayout,
        lazy_children: Arc<LazyReaderChildren>,
        session: VortexSession,
    ) -> Self {
        Self {
            zone_count: layout.nzones(),
            row_count: layout.row_count(),
            zone_len: layout.zone_len() as u64,
            present_stats: Arc::clone(layout.present_stats()),
            lazy_children,
            session,
            pruning_result: Default::default(),
            zone_map: Default::default(),
            pruning_predicates: Default::default(),
        }
    }

    pub(super) fn pruning_mask_future(&self, expr: Expression) -> Option<SharedPruningResult> {
        if let Some(result) = self.pruning_result.get(&expr) {
            return result.value().clone();
        }

        self.pruning_result
            .entry(expr.clone())
            .or_insert_with(|| match self.pruning_predicate(expr.clone()) {
                None => {
                    tracing::debug!("No pruning predicate for expr: {expr}");
                    None
                }
                Some(predicate) => {
                    tracing::debug!(
                        "Constructed pruning predicate for expr: {expr}: {predicate:?}"
                    );
                    let zone_map = self.zone_map();
                    let dynamic_updates = DynamicExprUpdates::new(&expr);
                    let session = self.session.clone();

                    Some(
                        async move {
                            let zone_map = zone_map.await?;
                            let initial_mask =
                                zone_map.prune(&predicate, &session).map_err(|err| {
                                    err.with_context(format!(
                                        "While evaluating pruning predicate {} (derived from {})",
                                        predicate, expr
                                    ))
                                })?;
                            Ok(Arc::new(PruningResult {
                                zone_map,
                                predicate,
                                dynamic_updates,
                                latest_result: RwLock::new((0, initial_mask)),
                                session,
                            }))
                        }
                        .boxed()
                        .shared(),
                    )
                }
            })
            .clone()
    }

    fn pruning_predicate(&self, expr: Expression) -> Option<Expression> {
        self.pruning_predicates
            .entry(expr.clone())
            .or_default()
            .get_or_init(move || {
                let available_stats = FieldPathSet::from_iter(
                    self.present_stats
                        .iter()
                        .map(|stat| FieldPath::from_name(stat.name())),
                );
                checked_pruning_expr(&expr, &available_stats).map(|(expr, _)| expr)
            })
            .clone()
    }

    fn zone_map(&self) -> SharedZoneMap {
        self.zone_map
            .get_or_init(move || {
                let zone_count = self.zone_count;
                let zones_eval = self
                    .lazy_children
                    .get(1)
                    .vortex_expect("failed to get zone child")
                    .projection_evaluation(
                        &(0..zone_count as u64),
                        &root(),
                        MaskFuture::new_true(zone_count),
                    )
                    .vortex_expect("Failed construct zone map evaluation");
                let session = self.session.clone();
                let zone_len = self.zone_len;
                let row_count = self.row_count;

                async move {
                    let mut ctx = session.create_execution_ctx();
                    let zones_array = zones_eval.await?.execute::<StructArray>(&mut ctx)?;
                    // SAFETY: zoned layout validation ensures the zones child matches the expected
                    // stats-table schema for `present_stats`.
                    Ok(unsafe { ZoneMap::new_unchecked(zones_array, zone_len, row_count) })
                }
                .map_err(Arc::new)
                .boxed()
                .shared()
            })
            .clone()
    }
}

pub(super) struct PruningResult {
    zone_map: ZoneMap,
    predicate: Expression,
    dynamic_updates: Option<DynamicExprUpdates>,
    latest_result: RwLock<(u64, Mask)>,
    session: VortexSession,
}

impl PruningResult {
    pub(super) fn mask(&self) -> VortexResult<Mask> {
        let Some(dynamic_updates) = &self.dynamic_updates else {
            return Ok(self.latest_result.read().1.clone());
        };

        let version = dynamic_updates.version();

        {
            let read_guard = self.latest_result.read();
            if read_guard.0 >= version {
                return Ok(read_guard.1.clone());
            }
        }

        let mut guard = self.latest_result.write();
        if guard.0 >= version {
            return Ok(guard.1.clone());
        }

        tracing::debug!(
            "Re-computing pruning mask for version {version} on {}",
            self.predicate
        );

        let next_mask = self
            .zone_map
            .prune(&self.predicate, &self.session)
            .map_err(|err| {
                err.with_context(format!(
                    "While evaluating pruning predicate {}",
                    self.predicate
                ))
            })?;
        *guard = (version, next_mask.clone());

        Ok(next_mask)
    }
}