velesdb-core 3.0.0

High-performance vector database engine written in Rust
Documentation
//! HAVING clause evaluation and aggregation result sorting.
//!
//! Extracted from `grouped.rs` for single-responsibility:
//! - HAVING filter evaluation against aggregation results
//! - Sorting grouped results by ORDER BY clause
//! - JSON value comparison utilities for aggregation ordering
//! - Parameter resolution for condition placeholders

// Reason: Numeric casts in aggregation are intentional:
// - All casts are for computing aggregate statistics (sum, avg, count)
// - i64->usize for group limits: limits bounded by MAX_GROUPS (1M)
// - Values bounded by result set size and field cardinality
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_sign_loss)]

use crate::collection::types::Collection;
use crate::velesql::{
    AggregateArg, AggregateFunction, AggregateResult, AggregateType, CompareOp, HavingClause, Value,
};
use std::collections::HashMap;

/// Default maximum number of groups allowed when the query does not specify one
/// (memory protection).
pub(super) const DEFAULT_MAX_GROUPS: usize = 10_000;

/// Server-side hard ceiling on the number of GROUP BY groups (#903).
///
/// This is a SERVER-controlled constant. A query may use `WITH (max_groups=N)`
/// to *lower* its group budget, but `N` is always clamped down to this ceiling
/// so an untrusted query cannot inflate the per-query memory bound. Bounding
/// distinct groups is the DoS guard for GROUP BY aggregation.
pub(super) const SERVER_MAX_GROUPS_CEILING: usize = 1_000_000;

impl Collection {
    /// BUG-3 FIX: Sort aggregation results by ORDER BY clause.
    pub(crate) fn sort_aggregation_results(
        results: &mut [serde_json::Value],
        order_by: &[crate::velesql::SelectOrderBy],
    ) {
        use crate::velesql::OrderByExpr;

        let sort_columns: Vec<(String, bool)> = order_by
            .iter()
            .filter_map(|clause| {
                let column = match &clause.expr {
                    OrderByExpr::Field(name) => name.clone(),
                    OrderByExpr::Aggregate(agg) => Self::aggregation_result_key(agg),
                    // Similarity/Arithmetic ordering not applicable to grouped aggregate rows.
                    OrderByExpr::Similarity(_)
                    | OrderByExpr::SimilarityBare
                    | OrderByExpr::Arithmetic(_) => return None,
                };
                Some((column, clause.descending))
            })
            .collect();

        results.sort_unstable_by(|a, b| {
            for (column, descending) in &sort_columns {
                let val_a = a.get(column);
                let val_b = b.get(column);

                let ordering =
                    crate::collection::search::query::ordering::compare_json_values(val_a, val_b);

                let ordering = if *descending {
                    ordering.reverse()
                } else {
                    ordering
                };

                if ordering != std::cmp::Ordering::Equal {
                    return ordering;
                }
            }
            std::cmp::Ordering::Equal
        });
    }

    /// Extract group key from payload with pre-computed hash (optimized).
    /// Avoids JSON serialization overhead by using direct value hashing.
    pub(super) fn extract_group_key_fast(
        payload: Option<&serde_json::Value>,
        group_by_columns: &[String],
    ) -> super::GroupKey {
        let values: Vec<serde_json::Value> = group_by_columns
            .iter()
            .map(|col| {
                payload
                    .and_then(|p| Self::get_nested_value(p, col).cloned())
                    .unwrap_or(serde_json::Value::Null)
            })
            .collect();
        super::GroupKey::new(values)
    }

    /// Evaluate HAVING clause against aggregation result.
    /// Supports both AND and OR logical operators between conditions.
    pub(super) fn evaluate_having(having: &HavingClause, agg_result: &AggregateResult) -> bool {
        if having.conditions.is_empty() {
            return true;
        }

        // Evaluate first condition
        let mut result = {
            let cond = &having.conditions[0];
            let agg_value = Self::get_aggregate_value(&cond.aggregate, agg_result);
            Self::compare_values(agg_value, cond.operator, &cond.value)
        };

        // Apply remaining conditions with their operators
        for (i, cond) in having.conditions.iter().enumerate().skip(1) {
            let cond_result = {
                let agg_value = Self::get_aggregate_value(&cond.aggregate, agg_result);
                Self::compare_values(agg_value, cond.operator, &cond.value)
            };

            // Get operator (default to AND if not specified - backward compatible)
            let op = having
                .operators
                .get(i - 1)
                .copied()
                .unwrap_or(crate::velesql::LogicalOp::And);

            match op {
                crate::velesql::LogicalOp::And => result = result && cond_result,
                crate::velesql::LogicalOp::Or => result = result || cond_result,
            }
        }

        result
    }

    /// Get aggregate value from result based on function type.
    fn get_aggregate_value(agg: &AggregateFunction, result: &AggregateResult) -> Option<f64> {
        match (&agg.function_type, &agg.argument) {
            (AggregateType::Count, AggregateArg::Wildcard) => Some(result.count as f64),
            (AggregateType::Count, AggregateArg::Column(col)) => {
                // COUNT(column) = number of non-null values for this column
                result.counts.get(col.as_str()).map(|&c| c as f64)
            }
            (AggregateType::Sum, AggregateArg::Column(col)) => {
                result.sums.get(col.as_str()).copied()
            }
            (AggregateType::Avg, AggregateArg::Column(col)) => {
                result.avgs.get(col.as_str()).copied()
            }
            (AggregateType::Min, AggregateArg::Column(col)) => {
                result.mins.get(col.as_str()).copied()
            }
            (AggregateType::Max, AggregateArg::Column(col)) => {
                result.maxs.get(col.as_str()).copied()
            }
            _ => None,
        }
    }

    /// Compare aggregate value against threshold using operator.
    fn compare_values(agg_value: Option<f64>, op: CompareOp, threshold: &Value) -> bool {
        let Some(agg) = agg_value else {
            return false;
        };

        let thresh = match threshold {
            Value::Integer(i) => *i as f64,
            #[allow(clippy::cast_precision_loss)]
            // Reason: aggregate comparison converts to f64; precision loss is
            // acceptable for large u64 values in HAVING threshold context.
            Value::UnsignedInteger(u) => *u as f64,
            Value::Float(f) => *f,
            _ => return false,
        };

        // Use relative epsilon for large values (precision loss in sums)
        // Scale epsilon by max magnitude, with floor of 1.0 for small values
        let relative_epsilon = f64::EPSILON * agg.abs().max(thresh.abs()).max(1.0);

        match op {
            CompareOp::Eq => (agg - thresh).abs() < relative_epsilon,
            CompareOp::NotEq => (agg - thresh).abs() >= relative_epsilon,
            CompareOp::Gt => agg > thresh,
            CompareOp::Gte => agg >= thresh,
            CompareOp::Lt => agg < thresh,
            CompareOp::Lte => agg <= thresh,
        }
    }

    /// Extract max_groups limit from WITH clause (EPIC-040 US-004).
    /// Supports both `max_groups` and `group_limit` option names.
    /// Returns `DEFAULT_MAX_GROUPS` if not specified.
    ///
    /// #903 (DoS hardening): the value supplied by the (untrusted) query is
    /// **clamped down** to [`SERVER_MAX_GROUPS_CEILING`]. The query can lower
    /// the limit but can never raise the server-side memory ceiling above it.
    pub(super) fn extract_max_groups_limit(
        with_clause: Option<&crate::velesql::WithClause>,
    ) -> usize {
        let Some(with) = with_clause else {
            return DEFAULT_MAX_GROUPS;
        };

        for opt in &with.options {
            if opt.key == "max_groups" || opt.key == "group_limit" {
                // Try to parse value as integer
                if let crate::velesql::WithValue::Integer(n) = &opt.value {
                    // Ensure positive, then clamp to the server-side ceiling.
                    // The query CANNOT raise the limit above SERVER_MAX_GROUPS_CEILING.
                    let limit = (*n).max(1) as usize;
                    return limit.min(SERVER_MAX_GROUPS_CEILING);
                }
            }
        }

        DEFAULT_MAX_GROUPS
    }

    /// Resolves parameter placeholders in HAVING threshold values.
    ///
    /// HAVING thresholds live in [`HavingCondition.value`], outside the WHERE
    /// condition tree, so the WHERE resolver never visits them. Without this,
    /// `HAVING COUNT(*) > $n` compares every group against an unresolved
    /// placeholder and silently filters out all groups — even when `$n` is
    /// bound.
    ///
    /// [`HavingCondition.value`]: crate::velesql::HavingCondition
    ///
    /// # Errors
    ///
    /// Returns an error when a referenced parameter is missing from `params`
    /// or has an unsupported type (array/object).
    pub(super) fn resolve_having_params(
        having: &HavingClause,
        params: &HashMap<String, serde_json::Value>,
    ) -> crate::error::Result<HavingClause> {
        let conditions = having
            .conditions
            .iter()
            .map(|cond| {
                Ok(crate::velesql::HavingCondition {
                    aggregate: cond.aggregate.clone(),
                    operator: cond.operator,
                    value: Self::resolve_where_param(&cond.value, params)?,
                })
            })
            .collect::<crate::error::Result<Vec<_>>>()?;
        Ok(HavingClause {
            conditions,
            operators: having.operators.clone(),
        })
    }

    /// BUG-5 FIX: Resolve parameter placeholders in a condition.
    /// Replaces `Value::Parameter("name")` with the actual value from params `HashMap`.
    ///
    /// Delegates leaf resolution to [`Self::resolve_where_param`] (the single
    /// strict resolver) so a missing or unsupported parameter is an error,
    /// never a silent `NULL`.
    ///
    /// # Errors
    ///
    /// Returns an error when a referenced parameter is missing from `params`
    /// or has an unsupported type (array/object).
    pub(crate) fn resolve_condition_params(
        cond: &crate::velesql::Condition,
        params: &HashMap<String, serde_json::Value>,
    ) -> crate::error::Result<crate::velesql::Condition> {
        use crate::velesql::Condition;

        match cond {
            Condition::And(left, right) => Ok(Condition::And(
                Self::resolve_boxed_condition(left, params)?,
                Self::resolve_boxed_condition(right, params)?,
            )),
            Condition::Or(left, right) => Ok(Condition::Or(
                Self::resolve_boxed_condition(left, params)?,
                Self::resolve_boxed_condition(right, params)?,
            )),
            Condition::Not(inner) => Ok(Condition::Not(Self::resolve_boxed_condition(
                inner, params,
            )?)),
            Condition::Group(inner) => Ok(Condition::Group(Self::resolve_boxed_condition(
                inner, params,
            )?)),
            other => Self::resolve_leaf_condition_params(other, params),
        }
    }

    /// Boxes the recursive resolution of a nested condition.
    fn resolve_boxed_condition(
        cond: &crate::velesql::Condition,
        params: &HashMap<String, serde_json::Value>,
    ) -> crate::error::Result<Box<crate::velesql::Condition>> {
        Ok(Box::new(Self::resolve_condition_params(cond, params)?))
    }

    /// Resolves parameters in leaf conditions (Comparison, IN, BETWEEN,
    /// CONTAINS / CONTAINS ANY / CONTAINS ALL).
    ///
    /// The remaining leaf variants are cloned unchanged: they carry no scalar
    /// `Value` operands (geo thresholds are `f64` literals, LIKE/MATCH
    /// patterns are strings, vector parameters are resolved by the vector
    /// pipeline), except `GraphMatch`, whose pattern properties are evaluated
    /// by the MATCH engine rather than by this resolver.
    fn resolve_leaf_condition_params(
        cond: &crate::velesql::Condition,
        params: &HashMap<String, serde_json::Value>,
    ) -> crate::error::Result<crate::velesql::Condition> {
        use crate::velesql::Condition;

        Ok(match cond {
            Condition::Comparison(cmp) => Condition::Comparison(crate::velesql::Comparison {
                column: cmp.column.clone(),
                operator: cmp.operator,
                value: Self::resolve_where_param(&cmp.value, params)?,
            }),
            Condition::In(in_cond) => Condition::In(crate::velesql::InCondition {
                column: in_cond.column.clone(),
                values: Self::resolve_value_list(&in_cond.values, params)?,
                negated: in_cond.negated,
            }),
            Condition::Between(btw) => Condition::Between(crate::velesql::BetweenCondition {
                column: btw.column.clone(),
                low: Self::resolve_where_param(&btw.low, params)?,
                high: Self::resolve_where_param(&btw.high, params)?,
            }),
            Condition::Contains(contains) => {
                Condition::Contains(crate::velesql::ContainsCondition {
                    column: contains.column.clone(),
                    mode: contains.mode,
                    values: Self::resolve_value_list(&contains.values, params)?,
                })
            }
            // These conditions don't have Value parameters to resolve
            other => other.clone(),
        })
    }

    /// Resolves every value in a list via [`Self::resolve_where_param`].
    fn resolve_value_list(
        values: &[Value],
        params: &HashMap<String, serde_json::Value>,
    ) -> crate::error::Result<Vec<Value>> {
        values
            .iter()
            .map(|v| Self::resolve_where_param(v, params))
            .collect()
    }
}