cstats_core/stats/
aggregator.rs

1//! Statistics aggregation functionality
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6
7use crate::{api::MetricValue, Error, Result};
8
9use super::{utils, MetricSummary, StatsRecord, StatsSummary};
10
11/// Statistics aggregator for processing and summarizing collected data
12#[derive(Debug, Clone)]
13pub struct StatsAggregator {
14    records: Vec<StatsRecord>,
15}
16
17impl StatsAggregator {
18    /// Create a new aggregator
19    pub fn new() -> Self {
20        Self {
21            records: Vec::new(),
22        }
23    }
24
25    /// Add a statistics record to the aggregator
26    pub fn add_record(&mut self, record: StatsRecord) {
27        self.records.push(record);
28    }
29
30    /// Add multiple statistics records to the aggregator
31    pub fn add_records(&mut self, records: Vec<StatsRecord>) {
32        self.records.extend(records);
33    }
34
35    /// Clear all records from the aggregator
36    pub fn clear(&mut self) {
37        self.records.clear();
38    }
39
40    /// Get the number of records in the aggregator
41    pub fn record_count(&self) -> usize {
42        self.records.len()
43    }
44
45    /// Generate a summary of all records
46    pub fn summarize(&self) -> Result<StatsSummary> {
47        if self.records.is_empty() {
48            return Err(Error::statistics("No records to summarize"));
49        }
50
51        let time_range = self.get_time_range();
52        let sources = self.get_unique_sources();
53        let metrics = self.aggregate_metrics()?;
54
55        Ok(StatsSummary {
56            count: self.records.len() as u64,
57            time_range,
58            metrics,
59            sources,
60        })
61    }
62
63    /// Summarize records within a specific time range
64    pub fn summarize_time_range(
65        &self,
66        start: DateTime<Utc>,
67        end: DateTime<Utc>,
68    ) -> Result<StatsSummary> {
69        let filtered_records: Vec<_> = self
70            .records
71            .iter()
72            .filter(|record| record.timestamp >= start && record.timestamp <= end)
73            .collect();
74
75        if filtered_records.is_empty() {
76            return Err(Error::statistics("No records in specified time range"));
77        }
78
79        let mut temp_aggregator = StatsAggregator::new();
80        temp_aggregator.records = filtered_records.into_iter().cloned().collect();
81        temp_aggregator.summarize()
82    }
83
84    /// Summarize records by source
85    pub fn summarize_by_source(&self) -> Result<HashMap<String, StatsSummary>> {
86        let mut summaries = HashMap::new();
87
88        // Group records by source
89        let mut records_by_source: HashMap<String, Vec<StatsRecord>> = HashMap::new();
90        for record in &self.records {
91            records_by_source
92                .entry(record.source.clone())
93                .or_default()
94                .push(record.clone());
95        }
96
97        // Create summary for each source
98        for (source, records) in records_by_source {
99            let mut aggregator = StatsAggregator::new();
100            aggregator.add_records(records);
101            summaries.insert(source, aggregator.summarize()?);
102        }
103
104        Ok(summaries)
105    }
106
107    /// Get records filtered by source
108    pub fn filter_by_source(&self, source: &str) -> Vec<&StatsRecord> {
109        self.records
110            .iter()
111            .filter(|record| record.source == source)
112            .collect()
113    }
114
115    /// Get records filtered by time range
116    pub fn filter_by_time_range(
117        &self,
118        start: DateTime<Utc>,
119        end: DateTime<Utc>,
120    ) -> Vec<&StatsRecord> {
121        self.records
122            .iter()
123            .filter(|record| record.timestamp >= start && record.timestamp <= end)
124            .collect()
125    }
126
127    /// Get the time range covered by all records
128    fn get_time_range(&self) -> (DateTime<Utc>, DateTime<Utc>) {
129        let mut min_time = self.records[0].timestamp;
130        let mut max_time = self.records[0].timestamp;
131
132        for record in &self.records {
133            if record.timestamp < min_time {
134                min_time = record.timestamp;
135            }
136            if record.timestamp > max_time {
137                max_time = record.timestamp;
138            }
139        }
140
141        (min_time, max_time)
142    }
143
144    /// Get unique sources from all records
145    fn get_unique_sources(&self) -> Vec<String> {
146        let mut sources: Vec<String> = self
147            .records
148            .iter()
149            .map(|record| record.source.clone())
150            .collect();
151        sources.sort();
152        sources.dedup();
153        sources
154    }
155
156    /// Aggregate metrics from all records
157    fn aggregate_metrics(&self) -> Result<HashMap<String, MetricSummary>> {
158        let mut metric_values: HashMap<String, Vec<f64>> = HashMap::new();
159
160        // Collect all metric values
161        for record in &self.records {
162            for (name, value) in &record.metrics {
163                let numeric_value = match value {
164                    MetricValue::Integer(i) => *i as f64,
165                    MetricValue::Float(f) => *f,
166                    MetricValue::Duration(d) => *d as f64,
167                    MetricValue::Boolean(b) => {
168                        if *b {
169                            1.0
170                        } else {
171                            0.0
172                        }
173                    }
174                    MetricValue::String(_) => continue, // Skip string values
175                    MetricValue::Histogram { sum, .. } => *sum,
176                };
177
178                metric_values
179                    .entry(name.clone())
180                    .or_default()
181                    .push(numeric_value);
182            }
183        }
184
185        // Calculate summary statistics for each metric
186        let mut summaries = HashMap::new();
187        for (name, mut values) in metric_values {
188            if values.is_empty() {
189                continue;
190            }
191
192            values.sort_by(|a, b| a.partial_cmp(b).unwrap());
193
194            let min = values[0];
195            let max = values[values.len() - 1];
196            let sum = values.iter().sum();
197            let count = values.len() as u64;
198            let avg = sum / count as f64;
199
200            let std_dev = if values.len() > 1 {
201                Some(utils::std_deviation(&values))
202            } else {
203                None
204            };
205
206            let percentiles = if values.len() >= 4 {
207                Some([
208                    utils::percentile(&values, 50.0), // 50th percentile (median)
209                    utils::percentile(&values, 90.0), // 90th percentile
210                    utils::percentile(&values, 95.0), // 95th percentile
211                    utils::percentile(&values, 99.0), // 99th percentile
212                ])
213            } else {
214                None
215            };
216
217            summaries.insert(
218                name.clone(),
219                MetricSummary {
220                    name,
221                    min,
222                    max,
223                    avg,
224                    sum,
225                    count,
226                    std_dev,
227                    percentiles,
228                },
229            );
230        }
231
232        Ok(summaries)
233    }
234}
235
236impl Default for StatsAggregator {
237    fn default() -> Self {
238        Self::new()
239    }
240}