cstats_core/stats/
aggregator.rs1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6
7use crate::{api::MetricValue, Error, Result};
8
9use super::{utils, MetricSummary, StatsRecord, StatsSummary};
10
11#[derive(Debug, Clone)]
13pub struct StatsAggregator {
14 records: Vec<StatsRecord>,
15}
16
17impl StatsAggregator {
18 pub fn new() -> Self {
20 Self {
21 records: Vec::new(),
22 }
23 }
24
25 pub fn add_record(&mut self, record: StatsRecord) {
27 self.records.push(record);
28 }
29
30 pub fn add_records(&mut self, records: Vec<StatsRecord>) {
32 self.records.extend(records);
33 }
34
35 pub fn clear(&mut self) {
37 self.records.clear();
38 }
39
40 pub fn record_count(&self) -> usize {
42 self.records.len()
43 }
44
45 pub fn summarize(&self) -> Result<StatsSummary> {
47 if self.records.is_empty() {
48 return Err(Error::statistics("No records to summarize"));
49 }
50
51 let time_range = self.get_time_range();
52 let sources = self.get_unique_sources();
53 let metrics = self.aggregate_metrics()?;
54
55 Ok(StatsSummary {
56 count: self.records.len() as u64,
57 time_range,
58 metrics,
59 sources,
60 })
61 }
62
63 pub fn summarize_time_range(
65 &self,
66 start: DateTime<Utc>,
67 end: DateTime<Utc>,
68 ) -> Result<StatsSummary> {
69 let filtered_records: Vec<_> = self
70 .records
71 .iter()
72 .filter(|record| record.timestamp >= start && record.timestamp <= end)
73 .collect();
74
75 if filtered_records.is_empty() {
76 return Err(Error::statistics("No records in specified time range"));
77 }
78
79 let mut temp_aggregator = StatsAggregator::new();
80 temp_aggregator.records = filtered_records.into_iter().cloned().collect();
81 temp_aggregator.summarize()
82 }
83
84 pub fn summarize_by_source(&self) -> Result<HashMap<String, StatsSummary>> {
86 let mut summaries = HashMap::new();
87
88 let mut records_by_source: HashMap<String, Vec<StatsRecord>> = HashMap::new();
90 for record in &self.records {
91 records_by_source
92 .entry(record.source.clone())
93 .or_default()
94 .push(record.clone());
95 }
96
97 for (source, records) in records_by_source {
99 let mut aggregator = StatsAggregator::new();
100 aggregator.add_records(records);
101 summaries.insert(source, aggregator.summarize()?);
102 }
103
104 Ok(summaries)
105 }
106
107 pub fn filter_by_source(&self, source: &str) -> Vec<&StatsRecord> {
109 self.records
110 .iter()
111 .filter(|record| record.source == source)
112 .collect()
113 }
114
115 pub fn filter_by_time_range(
117 &self,
118 start: DateTime<Utc>,
119 end: DateTime<Utc>,
120 ) -> Vec<&StatsRecord> {
121 self.records
122 .iter()
123 .filter(|record| record.timestamp >= start && record.timestamp <= end)
124 .collect()
125 }
126
127 fn get_time_range(&self) -> (DateTime<Utc>, DateTime<Utc>) {
129 let mut min_time = self.records[0].timestamp;
130 let mut max_time = self.records[0].timestamp;
131
132 for record in &self.records {
133 if record.timestamp < min_time {
134 min_time = record.timestamp;
135 }
136 if record.timestamp > max_time {
137 max_time = record.timestamp;
138 }
139 }
140
141 (min_time, max_time)
142 }
143
144 fn get_unique_sources(&self) -> Vec<String> {
146 let mut sources: Vec<String> = self
147 .records
148 .iter()
149 .map(|record| record.source.clone())
150 .collect();
151 sources.sort();
152 sources.dedup();
153 sources
154 }
155
156 fn aggregate_metrics(&self) -> Result<HashMap<String, MetricSummary>> {
158 let mut metric_values: HashMap<String, Vec<f64>> = HashMap::new();
159
160 for record in &self.records {
162 for (name, value) in &record.metrics {
163 let numeric_value = match value {
164 MetricValue::Integer(i) => *i as f64,
165 MetricValue::Float(f) => *f,
166 MetricValue::Duration(d) => *d as f64,
167 MetricValue::Boolean(b) => {
168 if *b {
169 1.0
170 } else {
171 0.0
172 }
173 }
174 MetricValue::String(_) => continue, MetricValue::Histogram { sum, .. } => *sum,
176 };
177
178 metric_values
179 .entry(name.clone())
180 .or_default()
181 .push(numeric_value);
182 }
183 }
184
185 let mut summaries = HashMap::new();
187 for (name, mut values) in metric_values {
188 if values.is_empty() {
189 continue;
190 }
191
192 values.sort_by(|a, b| a.partial_cmp(b).unwrap());
193
194 let min = values[0];
195 let max = values[values.len() - 1];
196 let sum = values.iter().sum();
197 let count = values.len() as u64;
198 let avg = sum / count as f64;
199
200 let std_dev = if values.len() > 1 {
201 Some(utils::std_deviation(&values))
202 } else {
203 None
204 };
205
206 let percentiles = if values.len() >= 4 {
207 Some([
208 utils::percentile(&values, 50.0), utils::percentile(&values, 90.0), utils::percentile(&values, 95.0), utils::percentile(&values, 99.0), ])
213 } else {
214 None
215 };
216
217 summaries.insert(
218 name.clone(),
219 MetricSummary {
220 name,
221 min,
222 max,
223 avg,
224 sum,
225 count,
226 std_dev,
227 percentiles,
228 },
229 );
230 }
231
232 Ok(summaries)
233 }
234}
235
236impl Default for StatsAggregator {
237 fn default() -> Self {
238 Self::new()
239 }
240}