Skip to main content

scouter_observability/
lib.rs

1pub mod error;
2use crate::error::ObservabilityError;
3use ndarray::Array1;
4use ndarray_stats::interpolate::Nearest;
5use ndarray_stats::Quantile1dExt;
6use noisy_float::types::n64;
7use pyo3::prelude::*;
8use rayon::iter::IntoParallelIterator;
9use rayon::iter::ParallelIterator;
10use scouter_types::{
11    LatencyMetrics, ObservabilityMetrics, RouteMetrics, ServerRecord, ServerRecords,
12};
13use std::collections::HashMap;
14use tracing::{debug, error};
15
16#[derive(Clone, Debug)]
17struct RouteLatency {
18    request_latency: Vec<f64>,
19    request_count: i64,
20    error_count: i64,
21    error_latency: f64,
22    status_codes: HashMap<usize, i64>,
23}
24
25#[pyclass]
26#[derive(Clone, Debug)]
27pub struct Observer {
28    uid: String,
29    request_count: i64,
30    error_count: i64,
31    request_latency: HashMap<String, RouteLatency>,
32}
33
34#[pymethods]
35impl Observer {
36    #[new]
37    pub fn new(uid: String) -> Self {
38        Observer {
39            uid,
40            request_count: 0,
41            error_count: 0,
42            request_latency: HashMap::new(),
43        }
44    }
45
46    fn increment_request_count(&mut self) {
47        self.request_count += 1;
48    }
49
50    fn increment_error_count(&mut self, status: &str) {
51        if status != "OK" {
52            self.error_count += 1;
53        }
54    }
55
56    fn update_route_latency(
57        &mut self,
58        route: &str,
59        latency: f64,
60        status: &str,
61        status_code: usize,
62    ) -> Result<(), ObservabilityError> {
63        // handling OK status
64        if status == "OK" {
65            // insert latency for route if it doesn't exist, otherwise increment
66            if let Some(route_latency) = self.request_latency.get_mut(route) {
67                route_latency.request_latency.push(latency);
68                route_latency.request_count += 1;
69            } else {
70                self.request_latency.insert(
71                    route.to_string(),
72                    RouteLatency {
73                        request_latency: vec![latency],
74                        request_count: 1,
75                        error_count: 0,
76                        error_latency: 0.0,
77                        status_codes: HashMap::new(),
78                    },
79                );
80            }
81
82        // handling errors
83        } else {
84            // insert latency for route if it doesn't exist, otherwise increment
85            if let Some(route_latency) = self.request_latency.get_mut(route) {
86                route_latency.error_latency += latency;
87                route_latency.error_count += 1;
88            } else {
89                self.request_latency.insert(
90                    route.to_string(),
91                    RouteLatency {
92                        request_latency: vec![],
93                        request_count: 0,
94                        error_count: 1,
95                        error_latency: latency,
96                        status_codes: HashMap::new(),
97                    },
98                );
99            }
100        }
101
102        // insert status code if it doesn't exist, otherwise increment
103        // route should exist at this point
104        let route_latency = self
105            .request_latency
106            .get_mut(route)
107            .ok_or(ObservabilityError::RouteNotFound(route.to_string()))?;
108        if let Some(status_code_count) = route_latency.status_codes.get_mut(&status_code) {
109            *status_code_count += 1;
110        } else {
111            route_latency.status_codes.insert(status_code, 1);
112        }
113
114        Ok(())
115    }
116
117    pub fn increment(
118        &mut self,
119        route: &str,
120        latency: f64,
121        status_code: usize,
122    ) -> Result<(), ObservabilityError> {
123        let status = if (200..400).contains(&status_code) {
124            "OK"
125        } else {
126            "ERROR"
127        };
128
129        self.increment_request_count();
130        self.update_route_latency(route, latency, status, status_code)
131            .map_err(|e| {
132                error!("Failed to update route latency: {:?}", e);
133            })
134            .ok();
135        self.increment_error_count(status);
136
137        Ok(())
138    }
139
140    pub fn collect_metrics(&self) -> Result<Option<ServerRecords>, ObservabilityError> {
141        if self.request_count == 0 {
142            return Ok(None);
143        }
144
145        debug!("Collecting metrics: {:?}", self.request_latency);
146
147        let route_metrics = self
148            .request_latency
149            .clone()
150            .into_par_iter()
151            .map(|(route, route_latency)| {
152                let mut latency_array = Array1::from_vec(
153                    route_latency
154                        .request_latency
155                        .iter()
156                        .map(|&x| n64(x))
157                        .collect::<Vec<_>>(),
158                );
159                let qs = &[n64(0.05), n64(0.25), n64(0.5), n64(0.95), n64(0.99)];
160                let quantiles =
161                    latency_array.quantiles_mut(&Array1::from_vec(qs.to_vec()), &Nearest);
162
163                match quantiles {
164                    Ok(quantiles) => Ok(RouteMetrics {
165                        metrics: LatencyMetrics {
166                            p5: quantiles[0].into(),
167                            p25: quantiles[1].into(),
168                            p50: quantiles[2].into(),
169                            p95: quantiles[3].into(),
170                            p99: quantiles[4].into(),
171                        },
172                        request_count: route_latency.request_count,
173                        error_count: route_latency.error_count,
174                        error_latency: route_latency.error_latency,
175                        status_codes: route_latency.status_codes.clone(),
176                        route_name: route,
177                    }),
178                    // its ok if route fails, but we want to know why
179                    Err(e) => Err(ObservabilityError::QuantileError(e.to_string())),
180                }
181            })
182            .collect::<Vec<Result<RouteMetrics, ObservabilityError>>>();
183
184        // check if any route failed (log it and filter it out)
185        let route_metrics = route_metrics
186            .into_iter()
187            .filter_map(|x| match x {
188                Ok(route_metrics) => Some(route_metrics),
189                Err(e) => {
190                    debug!("Failed to collect metrics for route: {:?}", e);
191                    None
192                }
193            })
194            .collect::<Vec<RouteMetrics>>();
195
196        // check if there are no metrics and exit early
197        if route_metrics.is_empty() {
198            return Ok(None);
199        }
200
201        let record = ServerRecord::Observability(ObservabilityMetrics {
202            uid: self.uid.clone(),
203            request_count: self.request_count,
204            error_count: self.error_count,
205            route_metrics,
206            entity_id: 0,
207        });
208
209        Ok(Some(ServerRecords {
210            records: vec![record],
211        }))
212    }
213
214    pub fn reset_metrics(&mut self) {
215        self.request_count = 0;
216        self.error_count = 0;
217
218        // Clear request latency for each route
219        for (_, route_latency) in self.request_latency.iter_mut() {
220            route_latency.request_latency = vec![];
221            route_latency.request_count = 0;
222            route_latency.error_count = 0;
223            route_latency.error_latency = 0.0;
224            route_latency.status_codes.clear();
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use rand::Rng;
233
234    const UID: &str = "test-uid";
235
236    #[test]
237    fn test_increment_request_count() {
238        let mut observer = Observer::new(UID.to_string());
239        observer.increment_request_count();
240        assert_eq!(observer.request_count, 1);
241    }
242
243    #[test]
244    fn test_increment_error_count() {
245        let mut observer = Observer::new(UID.to_string());
246        observer.increment_error_count("ERROR");
247        assert_eq!(observer.error_count, 1);
248        observer.increment_error_count("OK");
249        assert_eq!(observer.error_count, 1);
250    }
251
252    #[test]
253    fn test_update_route_latency() {
254        let mut observer = Observer::new(UID.to_string());
255        observer
256            .update_route_latency("/home", 100.0, "OK", 200)
257            .unwrap();
258        let sum_latency = observer
259            .request_latency
260            .get("/home")
261            .unwrap()
262            .request_latency
263            .iter()
264            .sum::<f64>();
265        assert_eq!(sum_latency, 100.0);
266        assert_eq!(
267            observer.request_latency.get("/home").unwrap().request_count,
268            1
269        );
270
271        observer
272            .update_route_latency("/home", 50.0, "OK", 200)
273            .unwrap();
274        let sum_latency = observer
275            .request_latency
276            .get("/home")
277            .unwrap()
278            .request_latency
279            .iter()
280            .sum::<f64>();
281        assert_eq!(sum_latency, 150.0);
282        assert_eq!(
283            observer.request_latency.get("/home").unwrap().request_count,
284            2
285        );
286
287        observer
288            .update_route_latency("/home", 50.0, "ERROR", 500)
289            .unwrap();
290        let sum_latency = observer
291            .request_latency
292            .get("/home")
293            .unwrap()
294            .request_latency
295            .iter()
296            .sum::<f64>();
297        assert_eq!(sum_latency, 150.0);
298        assert_eq!(
299            observer.request_latency.get("/home").unwrap().error_latency,
300            50.0
301        );
302        assert_eq!(
303            observer.request_latency.get("/home").unwrap().request_count,
304            2
305        );
306        assert_eq!(
307            observer.request_latency.get("/home").unwrap().error_count,
308            1
309        );
310
311        let status_codes = &observer.request_latency.get("/home").unwrap().status_codes;
312        assert_eq!(status_codes.get(&200).unwrap(), &2);
313        assert_eq!(status_codes.get(&500).unwrap(), &1);
314    }
315
316    #[test]
317    fn test_collect_metrics() {
318        //populate 3 routes with different latencies (n = 100)
319        let mut observer = Observer::new(UID.to_string());
320        for i in 0..100 {
321            // generate random latencies
322            let num1 = rand::thread_rng().gen_range(0..100);
323            let num2 = rand::thread_rng().gen_range(0..100);
324            let num3 = rand::thread_rng().gen_range(0..100);
325            observer.increment("/home", num1 as f64, 200).unwrap();
326            observer.increment("/home", 50.0 + i as f64, 404).unwrap();
327            observer.increment("/about", num2 as f64, 200).unwrap();
328            observer.increment("/contact", num3 as f64, 200).unwrap();
329        }
330
331        let metrics = observer.collect_metrics().unwrap().unwrap();
332        metrics.model_dump_json();
333        metrics.__str__();
334
335        // get record
336
337        let metrics = metrics.records[0].clone();
338
339        // check observability metrics
340        let record = match metrics {
341            ServerRecord::Observability(record) => record,
342            _ => panic!("Expected observability record"),
343        };
344
345        assert_eq!(record.request_count, 400);
346        assert_eq!(record.error_count, 100);
347        assert_eq!(record.uid, UID);
348
349        let route_metrics = record.route_metrics;
350
351        // check route metrics. Filter to get home route metrics
352        let home_metrics = route_metrics
353            .iter()
354            .find(|x| x.route_name == "/home")
355            .unwrap();
356
357        assert_eq!(home_metrics.request_count, 100);
358        assert_eq!(home_metrics.error_count, 100);
359    }
360
361    #[test]
362    fn test_increment() {
363        let mut observer = Observer::new(UID.to_string());
364        observer.increment("/home", 100.0, 200).unwrap();
365        assert_eq!(observer.request_count, 1);
366        assert_eq!(observer.error_count, 0);
367        let sum_latency = observer
368            .request_latency
369            .get("/home")
370            .unwrap()
371            .request_latency
372            .iter()
373            .sum::<f64>();
374        assert_eq!(sum_latency, 100.0);
375
376        assert_eq!(
377            observer.request_latency.get("/home").unwrap().request_count,
378            1
379        );
380
381        observer.increment("/home", 50.0, 500).unwrap();
382        assert_eq!(observer.request_count, 2);
383        assert_eq!(observer.error_count, 1);
384        let sum_latency = observer
385            .request_latency
386            .get("/home")
387            .unwrap()
388            .request_latency
389            .iter()
390            .sum::<f64>();
391        assert_eq!(sum_latency, 100.0);
392
393        assert_eq!(
394            observer.request_latency.get("/home").unwrap().error_latency,
395            50.0
396        );
397        assert_eq!(
398            observer.request_latency.get("/home").unwrap().request_count,
399            1
400        );
401        assert_eq!(
402            observer.request_latency.get("/home").unwrap().error_count,
403            1
404        );
405    }
406
407    #[test]
408    fn test_reset_metrics() {
409        let mut observer = Observer::new(UID.to_string());
410        observer.increment("/home", 100.0, 200).unwrap();
411        observer.increment("/home", 50.0, 500).unwrap();
412
413        observer.reset_metrics();
414        assert_eq!(observer.request_count, 0);
415        assert_eq!(observer.error_count, 0);
416        assert!(observer
417            .request_latency
418            .get("/home")
419            .unwrap()
420            .request_latency
421            .is_empty());
422        assert!(observer.request_latency.get("/home").unwrap().error_latency == 0.0);
423        assert!(observer.request_latency.get("/home").unwrap().request_count == 0);
424        assert!(observer.request_latency.get("/home").unwrap().error_count == 0);
425    }
426}