hyprstream_core/metrics/
aggregation.rs

1//! Metric-specific aggregation functionality.
2//!
3//! This module provides metric-specific implementations of aggregation functions
4//! while leveraging the core aggregation framework from `crate::aggregation`.
5//! It handles the specific requirements of metric aggregation, such as:
6//! - Running window calculations (sum, avg, count)
7//! - Metric-specific SQL query generation
8//! - Direct aggregation of MetricRecord instances
9//!
10//! The implementation reuses the generic aggregation types and query building
11//! functionality from the core aggregation module while adding metric-specific
12//! logic and optimizations.
13
14use crate::metrics::MetricRecord;
15use crate::aggregation::{AggregateFunction, GroupBy, build_aggregate_query};
16use tonic::Status;
17
18/// The standard metric value columns used in aggregation queries
19const METRIC_VALUE_COLUMNS: [&str; 3] = [
20    "value_running_window_sum",
21    "value_running_window_avg",
22    "value_running_window_count"
23];
24
25/// Applies the aggregation function to a set of metrics.
26///
27/// This function implements metric-specific aggregation by operating directly
28/// on MetricRecord instances. It uses the appropriate running window value
29/// based on the aggregation function type.
30///
31/// # Arguments
32///
33/// * `function` - The aggregation function to apply
34/// * `metrics` - The set of metrics to aggregate
35///
36/// # Returns
37///
38/// The aggregated value as a float, or an error if the operation fails
39pub fn apply_function(function: AggregateFunction, metrics: &[MetricRecord]) -> Result<f64, Status> {
40    if metrics.is_empty() {
41        return Ok(0.0);
42    }
43
44    match function {
45        AggregateFunction::Sum => Ok(metrics.iter().map(|m| m.value_running_window_sum).sum()),
46        AggregateFunction::Avg => {
47            let sum: f64 = metrics.iter().map(|m| m.value_running_window_avg).sum();
48            Ok(sum / metrics.len() as f64)
49        },
50        AggregateFunction::Min => Ok(metrics
51            .iter()
52            .map(|m| m.value_running_window_sum)
53            .fold(f64::INFINITY, f64::min)),
54        AggregateFunction::Max => Ok(metrics
55            .iter()
56            .map(|m| m.value_running_window_sum)
57            .fold(f64::NEG_INFINITY, f64::max)),
58        AggregateFunction::Count => Ok(metrics.len() as f64),
59    }
60}
61
62/// Builds a SQL query for metrics aggregation.
63///
64/// This function specializes the generic aggregate query builder for metrics
65/// by providing the metric-specific value columns and table name. It reuses
66/// the core query building logic while adding metric-specific context.
67///
68/// # Arguments
69///
70/// * `function` - The aggregation function to apply
71/// * `group_by` - The grouping specification
72/// * `from_timestamp` - The start of the time range
73/// * `to_timestamp` - The optional end of the time range
74///
75/// # Returns
76///
77/// A SQL query string optimized for metric aggregation
78pub fn build_metrics_query(
79    function: AggregateFunction,
80    group_by: &GroupBy,
81    from_timestamp: i64,
82    to_timestamp: Option<i64>,
83) -> String {
84    let columns = METRIC_VALUE_COLUMNS.iter()
85        .map(|&c| c.to_string())
86        .collect::<Vec<_>>();
87
88    let column_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
89    build_aggregate_query(
90        "metrics",
91        function,
92        group_by,
93        &column_refs,
94        Some(from_timestamp),
95        to_timestamp,
96    )
97}