1use minmaxlttb::{LttbBuilder, LttbMethod, Point};
12use serde::Serialize;
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15
16#[derive(Debug, Clone, Serialize)]
18pub struct MetricPoint {
19 metric: String,
20 timestamp: i64,
21 value: f64,
22}
23
24#[derive(Serialize)]
26pub struct DownsampledMetrics {
27 pub agent_id: String,
28 pub points: Vec<MetricPoint>,
29 pub metadata: Metadata,
30}
31
32#[derive(Serialize)]
33pub struct Metadata {
34 pub point_count: usize,
35 pub time_range: TimeRange,
36 pub checksum: String,
37}
38
39#[derive(Serialize)]
40pub struct TimeRange {
41 pub start: i64,
42 pub end: i64,
43}
44
45pub fn encode_downsampled(
55 agent_id: String,
56 metrics: HashMap<String, Vec<(i64, f64)>>,
57 threshold: usize,
58) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
59 let points = downsample_metrics(metrics, threshold)?;
61
62 if points.is_empty() {
63 return Err("No metrics to encode".into());
64 }
65
66 let point_count = points.len();
68 let start = points.iter().map(|p| p.timestamp).min().unwrap_or(0);
69 let end = points.iter().map(|p| p.timestamp).max().unwrap_or(0);
70
71 let points_bytes = rmp_serde::to_vec_named(&points)?;
73 let checksum = compute_sha256(&points_bytes);
74
75 let downsampled = DownsampledMetrics {
77 agent_id,
78 points,
79 metadata: Metadata {
80 point_count,
81 time_range: TimeRange { start, end },
82 checksum,
83 },
84 };
85
86 let msgpack_bytes = rmp_serde::to_vec_named(&downsampled)?;
88
89 Ok(msgpack_bytes)
90}
91
92fn downsample_metrics(
94 metrics: HashMap<String, Vec<(i64, f64)>>,
95 threshold: usize,
96) -> Result<Vec<MetricPoint>, Box<dyn std::error::Error>> {
97 let mut all_points = Vec::new();
98
99 for (metric_name, mut points) in metrics {
100 if points.is_empty() {
101 continue;
102 }
103
104 points.sort_by_key(|(ts, _)| *ts);
106
107 if points.len() <= threshold {
109 for (timestamp, value) in points {
111 all_points.push(MetricPoint {
112 metric: metric_name.clone(),
113 timestamp,
114 value,
115 });
116 }
117 continue;
118 }
119
120 let lttb_input: Vec<Point> = points
122 .iter()
123 .map(|(ts, val)| Point::new(*ts as f64, *val))
124 .collect();
125
126 let lttb = LttbBuilder::new()
128 .threshold(threshold)
129 .method(LttbMethod::MinMax)
130 .build();
131
132 let downsampled = lttb.downsample(<tb_input)?;
134
135 for point in downsampled {
137 all_points.push(MetricPoint {
138 metric: metric_name.clone(),
139 timestamp: point.x() as i64,
140 value: point.y(),
141 });
142 }
143 }
144
145 Ok(all_points)
146}
147
148fn compute_sha256(data: &[u8]) -> String {
150 let mut hasher = Sha256::new();
151 hasher.update(data);
152 let result = hasher.finalize();
153 hex::encode(result)
154}
155
156#[derive(Debug, Clone, Serialize)]
165pub struct MetricsSnapshot {
166 pub agent_id: String,
167 pub start: i64, pub end: i64, pub data: SnapshotData,
170}
171
172#[derive(Debug, Clone, Serialize, Default)]
177pub struct SnapshotData {
178 pub cpu_percent: Vec<[f64; 2]>,
179 pub memory_mb: Vec<[f64; 2]>,
180 pub health_score: Vec<[f64; 2]>,
181 pub fd_count: Vec<[f64; 2]>,
182 pub thread_count: Vec<[f64; 2]>,
183 pub load_avg: Vec<[f64; 2]>,
184}
185
186pub fn encode_snapshot(
198 agent_id: String,
199 start: i64,
200 end: i64,
201 metrics: HashMap<String, Vec<(i64, f64)>>,
202 threshold: usize,
203) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
204 let mut data = SnapshotData::default();
205
206 for (metric_name, mut points) in metrics {
208 if points.is_empty() {
209 continue;
210 }
211
212 points.sort_by_key(|(ts, _)| *ts);
214
215 let downsampled_points = if points.len() > threshold {
217 let lttb_input: Vec<Point> = points
219 .iter()
220 .map(|(ts, val)| Point::new(*ts as f64, *val))
221 .collect();
222
223 let lttb = LttbBuilder::new()
225 .threshold(threshold)
226 .method(LttbMethod::MinMax)
227 .build();
228
229 let downsampled = lttb.downsample(<tb_input)?;
231
232 downsampled
234 .iter()
235 .map(|p| [p.x(), p.y()])
236 .collect::<Vec<[f64; 2]>>()
237 } else {
238 points
240 .iter()
241 .map(|(ts, val)| [*ts as f64, *val])
242 .collect::<Vec<[f64; 2]>>()
243 };
244
245 match metric_name.as_str() {
247 "cpu_percent" => data.cpu_percent = downsampled_points,
248 "memory_mb" => data.memory_mb = downsampled_points,
249 "health_score" => data.health_score = downsampled_points,
250 "fd_count" => data.fd_count = downsampled_points,
251 "thread_count" => data.thread_count = downsampled_points,
252 "load_avg" => data.load_avg = downsampled_points,
253 _ => {
254 tracing::warn!("Unknown metric name: {}", metric_name);
255 }
256 }
257 }
258
259 let snapshot = MetricsSnapshot {
261 agent_id,
262 start,
263 end,
264 data,
265 };
266
267 let msgpack_bytes = rmp_serde::to_vec_named(&snapshot)?;
269
270 Ok(msgpack_bytes)
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_lttb_downsampling() {
279 let mut metrics = HashMap::new();
280
281 let mut points = Vec::new();
283 for i in 0..100 {
284 let timestamp = 1000 + (i * 1000); let value = 45.0 + (i as f64 * 0.1); points.push((timestamp, value));
287 }
288
289 metrics.insert("cpu_percent".to_string(), points);
290
291 let downsampled = downsample_metrics(metrics, 25).unwrap();
292
293 assert_eq!(downsampled.len(), 25);
295
296 assert!(downsampled.iter().any(|p| p.value != p.value.round()));
298 }
299
300 #[test]
301 fn test_no_downsampling_when_below_threshold() {
302 let mut metrics = HashMap::new();
303 metrics.insert(
304 "cpu_percent".to_string(),
305 vec![
306 (1000, 45.2),
307 (2000, 45.8),
308 (3000, 46.1),
309 ],
310 );
311
312 let downsampled = downsample_metrics(metrics, 25).unwrap();
313
314 assert_eq!(downsampled.len(), 3);
316 assert_eq!(downsampled[0].value, 45.2);
317 assert_eq!(downsampled[1].value, 45.8);
318 assert_eq!(downsampled[2].value, 46.1);
319 }
320
321 #[test]
322 fn test_multiple_metrics() {
323 let mut metrics = HashMap::new();
324
325 let mut cpu_points = Vec::new();
327 let mut mem_points = Vec::new();
328 for i in 0..50 {
329 let timestamp = 1000 + (i * 1000);
330 cpu_points.push((timestamp, 45.0 + (i as f64 * 0.1)));
331 mem_points.push((timestamp, 1024.0 + (i as f64 * 2.5)));
332 }
333
334 metrics.insert("cpu_percent".to_string(), cpu_points);
335 metrics.insert("memory_mb".to_string(), mem_points);
336
337 let downsampled = downsample_metrics(metrics, 25).unwrap();
338
339 assert_eq!(downsampled.len(), 50);
341
342 let cpu_count = downsampled.iter().filter(|p| p.metric == "cpu_percent").count();
344 let mem_count = downsampled.iter().filter(|p| p.metric == "memory_mb").count();
345 assert_eq!(cpu_count, 25);
346 assert_eq!(mem_count, 25);
347 }
348}