Skip to main content

loa_core/core/metrics/
downsample.rs

1//! LTTB Downsampling for Metrics - Efficient bandwidth reduction with visual fidelity
2//!
3//! This module implements MinMax LTTB (Largest Triangle Three Buckets) downsampling:
4//! 1. Collect metrics at full float precision (no rounding)
5//! 2. Apply MinMax LTTB to reduce point count while preserving visual shape
6//! 3. Encode as MessagePack with metadata and checksum
7//!
8//! MinMax LTTB preserves local minima/maxima and is computationally efficient,
9//! making it ideal for monitoring dashboards where peaks and valleys matter.
10
11use minmaxlttb::{LttbBuilder, LttbMethod, Point};
12use serde::Serialize;
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15
16/// A downsampled metric point
17#[derive(Debug, Clone, Serialize)]
18pub struct MetricPoint {
19    metric: String,
20    timestamp: i64,
21    value: f64,
22}
23
24/// DownsampledMetrics: MessagePack container with LTTB-downsampled points
25#[derive(Serialize)]
26pub struct DownsampledMetrics {
27    pub agent_id: String,
28    pub points: Vec<MetricPoint>,
29    pub metadata: Metadata,
30}
31
32#[derive(Serialize)]
33pub struct Metadata {
34    pub point_count: usize,
35    pub time_range: TimeRange,
36    pub checksum: String,
37}
38
39#[derive(Serialize)]
40pub struct TimeRange {
41    pub start: i64,
42    pub end: i64,
43}
44
45/// Encode metrics with LTTB downsampling
46///
47/// # Arguments
48/// * `agent_id` - Agent identifier
49/// * `metrics` - Map of metric_name -> [(timestamp, value), ...]
50/// * `threshold` - LTTB downsampling threshold (points per metric)
51///
52/// # Returns
53/// MessagePack-encoded downsampled metrics ready for HTTP POST
54pub fn encode_downsampled(
55    agent_id: String,
56    metrics: HashMap<String, Vec<(i64, f64)>>,
57    threshold: usize,
58) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
59    // Step 1: Apply LTTB downsampling to each metric
60    let points = downsample_metrics(metrics, threshold)?;
61
62    if points.is_empty() {
63        return Err("No metrics to encode".into());
64    }
65
66    // Step 2: Calculate metadata
67    let point_count = points.len();
68    let start = points.iter().map(|p| p.timestamp).min().unwrap_or(0);
69    let end = points.iter().map(|p| p.timestamp).max().unwrap_or(0);
70
71    // Step 3: Serialize points to compute checksum
72    let points_bytes = rmp_serde::to_vec_named(&points)?;
73    let checksum = compute_sha256(&points_bytes);
74
75    // Step 4: Build downsampled metrics
76    let downsampled = DownsampledMetrics {
77        agent_id,
78        points,
79        metadata: Metadata {
80            point_count,
81            time_range: TimeRange { start, end },
82            checksum,
83        },
84    };
85
86    // Step 5: Encode as MessagePack
87    let msgpack_bytes = rmp_serde::to_vec_named(&downsampled)?;
88
89    Ok(msgpack_bytes)
90}
91
92/// Apply MinMax LTTB downsampling to each metric independently
93fn downsample_metrics(
94    metrics: HashMap<String, Vec<(i64, f64)>>,
95    threshold: usize,
96) -> Result<Vec<MetricPoint>, Box<dyn std::error::Error>> {
97    let mut all_points = Vec::new();
98
99    for (metric_name, mut points) in metrics {
100        if points.is_empty() {
101            continue;
102        }
103
104        // Sort by timestamp
105        points.sort_by_key(|(ts, _)| *ts);
106
107        // Skip downsampling if we have fewer points than threshold
108        if points.len() <= threshold {
109            // Just convert all points
110            for (timestamp, value) in points {
111                all_points.push(MetricPoint {
112                    metric: metric_name.clone(),
113                    timestamp,
114                    value,
115                });
116            }
117            continue;
118        }
119
120        // Convert to Point type expected by minmaxlttb
121        let lttb_input: Vec<Point> = points
122            .iter()
123            .map(|(ts, val)| Point::new(*ts as f64, *val))
124            .collect();
125
126        // Build MinMax LTTB downsampler
127        let lttb = LttbBuilder::new()
128            .threshold(threshold)
129            .method(LttbMethod::MinMax)
130            .build();
131
132        // Apply MinMax LTTB downsampling
133        let downsampled = lttb.downsample(&lttb_input)?;
134
135        // Convert back to MetricPoint
136        for point in downsampled {
137            all_points.push(MetricPoint {
138                metric: metric_name.clone(),
139                timestamp: point.x() as i64,
140                value: point.y(),
141            });
142        }
143    }
144
145    Ok(all_points)
146}
147
148/// Compute SHA-256 checksum of data
149fn compute_sha256(data: &[u8]) -> String {
150    let mut hasher = Sha256::new();
151    hasher.update(data);
152    let result = hasher.finalize();
153    hex::encode(result)
154}
155
156// ============================================================================
157// METRIC SNAPSHOTS (5-minute windows)
158// ============================================================================
159
160/// A 5-minute metrics snapshot containing all metrics for a window
161///
162/// This is the new format that groups all metrics into a single document
163/// with time-series arrays per metric, rather than individual points.
164#[derive(Debug, Clone, Serialize)]
165pub struct MetricsSnapshot {
166    pub agent_id: String,
167    pub start: i64,  // Window start timestamp (ms)
168    pub end: i64,    // Window end timestamp (ms)
169    pub data: SnapshotData,
170}
171
172/// Snapshot data containing all metric time series
173///
174/// Each metric is stored as an array of [timestamp, value] pairs.
175/// Using `[f64; 2]` for efficient msgpack encoding as fixed-size arrays.
176#[derive(Debug, Clone, Serialize, Default)]
177pub struct SnapshotData {
178    pub cpu_percent: Vec<[f64; 2]>,
179    pub memory_mb: Vec<[f64; 2]>,
180    pub health_score: Vec<[f64; 2]>,
181    pub fd_count: Vec<[f64; 2]>,
182    pub thread_count: Vec<[f64; 2]>,
183    pub load_avg: Vec<[f64; 2]>,
184}
185
186/// Encode metrics as a 5-minute snapshot with LTTB downsampling per metric
187///
188/// # Arguments
189/// * `agent_id` - Agent identifier
190/// * `start` - Window start timestamp (ms)
191/// * `end` - Window end timestamp (ms)
192/// * `metrics` - Map of metric_name -> [(timestamp, value), ...]
193/// * `threshold` - LTTB downsampling threshold (points per metric)
194///
195/// # Returns
196/// MessagePack-encoded snapshot ready for HTTP POST
197pub fn encode_snapshot(
198    agent_id: String,
199    start: i64,
200    end: i64,
201    metrics: HashMap<String, Vec<(i64, f64)>>,
202    threshold: usize,
203) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
204    let mut data = SnapshotData::default();
205
206    // Apply LTTB to each metric independently and map to struct fields
207    for (metric_name, mut points) in metrics {
208        if points.is_empty() {
209            continue;
210        }
211
212        // Sort by timestamp
213        points.sort_by_key(|(ts, _)| *ts);
214
215        // Apply LTTB if above threshold, otherwise keep all points
216        let downsampled_points = if points.len() > threshold {
217            // Convert to Point type expected by minmaxlttb
218            let lttb_input: Vec<Point> = points
219                .iter()
220                .map(|(ts, val)| Point::new(*ts as f64, *val))
221                .collect();
222
223            // Build MinMax LTTB downsampler
224            let lttb = LttbBuilder::new()
225                .threshold(threshold)
226                .method(LttbMethod::MinMax)
227                .build();
228
229            // Apply MinMax LTTB downsampling
230            let downsampled = lttb.downsample(&lttb_input)?;
231
232            // Convert to [timestamp, value] arrays
233            downsampled
234                .iter()
235                .map(|p| [p.x(), p.y()])
236                .collect::<Vec<[f64; 2]>>()
237        } else {
238            // Keep all points as [timestamp, value] arrays
239            points
240                .iter()
241                .map(|(ts, val)| [*ts as f64, *val])
242                .collect::<Vec<[f64; 2]>>()
243        };
244
245        // Map to the correct field based on metric name
246        match metric_name.as_str() {
247            "cpu_percent" => data.cpu_percent = downsampled_points,
248            "memory_mb" => data.memory_mb = downsampled_points,
249            "health_score" => data.health_score = downsampled_points,
250            "fd_count" => data.fd_count = downsampled_points,
251            "thread_count" => data.thread_count = downsampled_points,
252            "load_avg" => data.load_avg = downsampled_points,
253            _ => {
254                tracing::warn!("Unknown metric name: {}", metric_name);
255            }
256        }
257    }
258
259    // Build the snapshot
260    let snapshot = MetricsSnapshot {
261        agent_id,
262        start,
263        end,
264        data,
265    };
266
267    // Encode as MessagePack
268    let msgpack_bytes = rmp_serde::to_vec_named(&snapshot)?;
269
270    Ok(msgpack_bytes)
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_lttb_downsampling() {
279        let mut metrics = HashMap::new();
280
281        // Create 100 points with varying values (simulating CPU usage)
282        let mut points = Vec::new();
283        for i in 0..100 {
284            let timestamp = 1000 + (i * 1000); // 1 second apart
285            let value = 45.0 + (i as f64 * 0.1); // Gradually increasing
286            points.push((timestamp, value));
287        }
288
289        metrics.insert("cpu_percent".to_string(), points);
290
291        let downsampled = downsample_metrics(metrics, 25).unwrap();
292
293        // Should have exactly 25 points (our threshold)
294        assert_eq!(downsampled.len(), 25);
295
296        // Values should maintain float precision (not rounded to integers)
297        assert!(downsampled.iter().any(|p| p.value != p.value.round()));
298    }
299
300    #[test]
301    fn test_no_downsampling_when_below_threshold() {
302        let mut metrics = HashMap::new();
303        metrics.insert(
304            "cpu_percent".to_string(),
305            vec![
306                (1000, 45.2),
307                (2000, 45.8),
308                (3000, 46.1),
309            ],
310        );
311
312        let downsampled = downsample_metrics(metrics, 25).unwrap();
313
314        // Should keep all 3 points since 3 < 25
315        assert_eq!(downsampled.len(), 3);
316        assert_eq!(downsampled[0].value, 45.2);
317        assert_eq!(downsampled[1].value, 45.8);
318        assert_eq!(downsampled[2].value, 46.1);
319    }
320
321    #[test]
322    fn test_multiple_metrics() {
323        let mut metrics = HashMap::new();
324
325        // Create 50 points for each of 2 metrics
326        let mut cpu_points = Vec::new();
327        let mut mem_points = Vec::new();
328        for i in 0..50 {
329            let timestamp = 1000 + (i * 1000);
330            cpu_points.push((timestamp, 45.0 + (i as f64 * 0.1)));
331            mem_points.push((timestamp, 1024.0 + (i as f64 * 2.5)));
332        }
333
334        metrics.insert("cpu_percent".to_string(), cpu_points);
335        metrics.insert("memory_mb".to_string(), mem_points);
336
337        let downsampled = downsample_metrics(metrics, 25).unwrap();
338
339        // Should have 50 total points (25 per metric)
340        assert_eq!(downsampled.len(), 50);
341
342        // Check we have both metrics
343        let cpu_count = downsampled.iter().filter(|p| p.metric == "cpu_percent").count();
344        let mem_count = downsampled.iter().filter(|p| p.metric == "memory_mb").count();
345        assert_eq!(cpu_count, 25);
346        assert_eq!(mem_count, 25);
347    }
348}