hyprstream_core/metrics/aggregation.rs
1//! Metric-specific aggregation functionality.
2//!
3//! This module provides metric-specific implementations of aggregation functions
4//! while leveraging the core aggregation framework from `crate::aggregation`.
5//! It handles the specific requirements of metric aggregation, such as:
6//! - Running window calculations (sum, avg, count)
7//! - Metric-specific SQL query generation
8//! - Direct aggregation of MetricRecord instances
9//!
10//! The implementation reuses the generic aggregation types and query building
11//! functionality from the core aggregation module while adding metric-specific
12//! logic and optimizations.
13
14use crate::metrics::MetricRecord;
15use crate::aggregation::{AggregateFunction, GroupBy, build_aggregate_query};
16use tonic::Status;
17
18/// The standard metric value columns used in aggregation queries
19const METRIC_VALUE_COLUMNS: [&str; 3] = [
20 "value_running_window_sum",
21 "value_running_window_avg",
22 "value_running_window_count"
23];
24
25/// Applies the aggregation function to a set of metrics.
26///
27/// This function implements metric-specific aggregation by operating directly
28/// on MetricRecord instances. It uses the appropriate running window value
29/// based on the aggregation function type.
30///
31/// # Arguments
32///
33/// * `function` - The aggregation function to apply
34/// * `metrics` - The set of metrics to aggregate
35///
36/// # Returns
37///
38/// The aggregated value as a float, or an error if the operation fails
39pub fn apply_function(function: AggregateFunction, metrics: &[MetricRecord]) -> Result<f64, Status> {
40 if metrics.is_empty() {
41 return Ok(0.0);
42 }
43
44 match function {
45 AggregateFunction::Sum => Ok(metrics.iter().map(|m| m.value_running_window_sum).sum()),
46 AggregateFunction::Avg => {
47 let sum: f64 = metrics.iter().map(|m| m.value_running_window_avg).sum();
48 Ok(sum / metrics.len() as f64)
49 },
50 AggregateFunction::Min => Ok(metrics
51 .iter()
52 .map(|m| m.value_running_window_sum)
53 .fold(f64::INFINITY, f64::min)),
54 AggregateFunction::Max => Ok(metrics
55 .iter()
56 .map(|m| m.value_running_window_sum)
57 .fold(f64::NEG_INFINITY, f64::max)),
58 AggregateFunction::Count => Ok(metrics.len() as f64),
59 }
60}
61
62/// Builds a SQL query for metrics aggregation.
63///
64/// This function specializes the generic aggregate query builder for metrics
65/// by providing the metric-specific value columns and table name. It reuses
66/// the core query building logic while adding metric-specific context.
67///
68/// # Arguments
69///
70/// * `function` - The aggregation function to apply
71/// * `group_by` - The grouping specification
72/// * `from_timestamp` - The start of the time range
73/// * `to_timestamp` - The optional end of the time range
74///
75/// # Returns
76///
77/// A SQL query string optimized for metric aggregation
78pub fn build_metrics_query(
79 function: AggregateFunction,
80 group_by: &GroupBy,
81 from_timestamp: i64,
82 to_timestamp: Option<i64>,
83) -> String {
84 let columns = METRIC_VALUE_COLUMNS.iter()
85 .map(|&c| c.to_string())
86 .collect::<Vec<_>>();
87
88 let column_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
89 build_aggregate_query(
90 "metrics",
91 function,
92 group_by,
93 &column_refs,
94 Some(from_timestamp),
95 to_timestamp,
96 )
97}