scouter_observability/
lib.rs

1pub mod error;
2use crate::error::ObservabilityError;
3use ndarray::Array1;
4use ndarray_stats::interpolate::Nearest;
5use ndarray_stats::Quantile1dExt;
6use noisy_float::types::n64;
7use pyo3::prelude::*;
8use rayon::iter::IntoParallelIterator;
9use rayon::iter::ParallelIterator;
10use scouter_types::{
11    LatencyMetrics, ObservabilityMetrics, RouteMetrics, ServerRecord, ServerRecords,
12};
13use std::collections::HashMap;
14use tracing::{debug, error};
15
16#[derive(Clone, Debug)]
17struct RouteLatency {
18    request_latency: Vec<f64>,
19    request_count: i64,
20    error_count: i64,
21    error_latency: f64,
22    status_codes: HashMap<usize, i64>,
23}
24
25#[pyclass]
26#[derive(Clone, Debug)]
27pub struct Observer {
28    space: String,
29    name: String,
30    version: String,
31    request_count: i64,
32    error_count: i64,
33    request_latency: HashMap<String, RouteLatency>,
34}
35
36#[pymethods]
37impl Observer {
38    #[new]
39    pub fn new(space: String, name: String, version: String) -> Self {
40        Observer {
41            space,
42            name,
43            version,
44            request_count: 0,
45            error_count: 0,
46            request_latency: HashMap::new(),
47        }
48    }
49
50    fn increment_request_count(&mut self) {
51        self.request_count += 1;
52    }
53
54    fn increment_error_count(&mut self, status: &str) {
55        if status != "OK" {
56            self.error_count += 1;
57        }
58    }
59
60    fn update_route_latency(
61        &mut self,
62        route: &str,
63        latency: f64,
64        status: &str,
65        status_code: usize,
66    ) -> Result<(), ObservabilityError> {
67        // handling OK status
68        if status == "OK" {
69            // insert latency for route if it doesn't exist, otherwise increment
70            if let Some(route_latency) = self.request_latency.get_mut(route) {
71                route_latency.request_latency.push(latency);
72                route_latency.request_count += 1;
73            } else {
74                self.request_latency.insert(
75                    route.to_string(),
76                    RouteLatency {
77                        request_latency: vec![latency],
78                        request_count: 1,
79                        error_count: 0,
80                        error_latency: 0.0,
81                        status_codes: HashMap::new(),
82                    },
83                );
84            }
85
86        // handling errors
87        } else {
88            // insert latency for route if it doesn't exist, otherwise increment
89            if let Some(route_latency) = self.request_latency.get_mut(route) {
90                route_latency.error_latency += latency;
91                route_latency.error_count += 1;
92            } else {
93                self.request_latency.insert(
94                    route.to_string(),
95                    RouteLatency {
96                        request_latency: vec![],
97                        request_count: 0,
98                        error_count: 1,
99                        error_latency: latency,
100                        status_codes: HashMap::new(),
101                    },
102                );
103            }
104        }
105
106        // insert status code if it doesn't exist, otherwise increment
107        // route should exist at this point
108        let route_latency = self
109            .request_latency
110            .get_mut(route)
111            .ok_or(ObservabilityError::RouteNotFound(route.to_string()))?;
112        if let Some(status_code_count) = route_latency.status_codes.get_mut(&status_code) {
113            *status_code_count += 1;
114        } else {
115            route_latency.status_codes.insert(status_code, 1);
116        }
117
118        Ok(())
119    }
120
121    pub fn increment(
122        &mut self,
123        route: &str,
124        latency: f64,
125        status_code: usize,
126    ) -> Result<(), ObservabilityError> {
127        let status = if (200..400).contains(&status_code) {
128            "OK"
129        } else {
130            "ERROR"
131        };
132
133        self.increment_request_count();
134        self.update_route_latency(route, latency, status, status_code)
135            .map_err(|e| {
136                error!("Failed to update route latency: {:?}", e);
137            })
138            .ok();
139        self.increment_error_count(status);
140
141        Ok(())
142    }
143
144    pub fn collect_metrics(&self) -> Result<Option<ServerRecords>, ObservabilityError> {
145        if self.request_count == 0 {
146            return Ok(None);
147        }
148
149        debug!("Collecting metrics: {:?}", self.request_latency);
150
151        let route_metrics = self
152            .request_latency
153            .clone()
154            .into_par_iter()
155            .map(|(route, route_latency)| {
156                let mut latency_array = Array1::from_vec(
157                    route_latency
158                        .request_latency
159                        .iter()
160                        .map(|&x| n64(x))
161                        .collect::<Vec<_>>(),
162                );
163                let qs = &[n64(0.05), n64(0.25), n64(0.5), n64(0.95), n64(0.99)];
164                let quantiles =
165                    latency_array.quantiles_mut(&Array1::from_vec(qs.to_vec()), &Nearest);
166
167                match quantiles {
168                    Ok(quantiles) => Ok(RouteMetrics {
169                        metrics: LatencyMetrics {
170                            p5: quantiles[0].into(),
171                            p25: quantiles[1].into(),
172                            p50: quantiles[2].into(),
173                            p95: quantiles[3].into(),
174                            p99: quantiles[4].into(),
175                        },
176                        request_count: route_latency.request_count,
177                        error_count: route_latency.error_count,
178                        error_latency: route_latency.error_latency,
179                        status_codes: route_latency.status_codes.clone(),
180                        route_name: route,
181                    }),
182                    // its ok if route fails, but we want to know why
183                    Err(e) => Err(ObservabilityError::QuantileError(e.to_string())),
184                }
185            })
186            .collect::<Vec<Result<RouteMetrics, ObservabilityError>>>();
187
188        // check if any route failed (log it and filter it out)
189        let route_metrics = route_metrics
190            .into_iter()
191            .filter_map(|x| match x {
192                Ok(route_metrics) => Some(route_metrics),
193                Err(e) => {
194                    debug!("Failed to collect metrics for route: {:?}", e);
195                    None
196                }
197            })
198            .collect::<Vec<RouteMetrics>>();
199
200        // check if there are no metrics and exit early
201        if route_metrics.is_empty() {
202            return Ok(None);
203        }
204
205        let record = ServerRecord::Observability(ObservabilityMetrics {
206            space: self.space.clone(),
207            name: self.name.clone(),
208            version: self.version.clone(),
209            request_count: self.request_count,
210            error_count: self.error_count,
211            route_metrics,
212        });
213
214        Ok(Some(ServerRecords {
215            records: vec![record],
216        }))
217    }
218
219    pub fn reset_metrics(&mut self) {
220        self.request_count = 0;
221        self.error_count = 0;
222
223        // Clear request latency for each route
224        for (_, route_latency) in self.request_latency.iter_mut() {
225            route_latency.request_latency = vec![];
226            route_latency.request_count = 0;
227            route_latency.error_count = 0;
228            route_latency.error_latency = 0.0;
229            route_latency.status_codes.clear();
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use rand::Rng;
238
239    const SPACE: &str = "test";
240    const NAME: &str = "test";
241    const VERSION: &str = "test";
242
243    #[test]
244    fn test_increment_request_count() {
245        let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
246        observer.increment_request_count();
247        assert_eq!(observer.request_count, 1);
248    }
249
250    #[test]
251    fn test_increment_error_count() {
252        let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
253        observer.increment_error_count("ERROR");
254        assert_eq!(observer.error_count, 1);
255        observer.increment_error_count("OK");
256        assert_eq!(observer.error_count, 1);
257    }
258
259    #[test]
260    fn test_update_route_latency() {
261        let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
262        observer
263            .update_route_latency("/home", 100.0, "OK", 200)
264            .unwrap();
265        let sum_latency = observer
266            .request_latency
267            .get("/home")
268            .unwrap()
269            .request_latency
270            .iter()
271            .sum::<f64>();
272        assert_eq!(sum_latency, 100.0);
273        assert_eq!(
274            observer.request_latency.get("/home").unwrap().request_count,
275            1
276        );
277
278        observer
279            .update_route_latency("/home", 50.0, "OK", 200)
280            .unwrap();
281        let sum_latency = observer
282            .request_latency
283            .get("/home")
284            .unwrap()
285            .request_latency
286            .iter()
287            .sum::<f64>();
288        assert_eq!(sum_latency, 150.0);
289        assert_eq!(
290            observer.request_latency.get("/home").unwrap().request_count,
291            2
292        );
293
294        observer
295            .update_route_latency("/home", 50.0, "ERROR", 500)
296            .unwrap();
297        let sum_latency = observer
298            .request_latency
299            .get("/home")
300            .unwrap()
301            .request_latency
302            .iter()
303            .sum::<f64>();
304        assert_eq!(sum_latency, 150.0);
305        assert_eq!(
306            observer.request_latency.get("/home").unwrap().error_latency,
307            50.0
308        );
309        assert_eq!(
310            observer.request_latency.get("/home").unwrap().request_count,
311            2
312        );
313        assert_eq!(
314            observer.request_latency.get("/home").unwrap().error_count,
315            1
316        );
317
318        let status_codes = &observer.request_latency.get("/home").unwrap().status_codes;
319        assert_eq!(status_codes.get(&200).unwrap(), &2);
320        assert_eq!(status_codes.get(&500).unwrap(), &1);
321    }
322
323    #[test]
324    fn test_collect_metrics() {
325        //populate 3 routes with different latencies (n = 100)
326        let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
327        for i in 0..100 {
328            // generate random latencies
329            let num1 = rand::thread_rng().gen_range(0..100);
330            let num2 = rand::thread_rng().gen_range(0..100);
331            let num3 = rand::thread_rng().gen_range(0..100);
332            observer.increment("/home", num1 as f64, 200).unwrap();
333            observer.increment("/home", 50.0 + i as f64, 404).unwrap();
334            observer.increment("/about", num2 as f64, 200).unwrap();
335            observer.increment("/contact", num3 as f64, 200).unwrap();
336        }
337
338        let metrics = observer.collect_metrics().unwrap().unwrap();
339        metrics.model_dump_json();
340        metrics.__str__();
341
342        // get record
343
344        let metrics = metrics.records[0].clone();
345
346        // check observability metrics
347        let record = match metrics {
348            ServerRecord::Observability(record) => record,
349            _ => panic!("Expected observability record"),
350        };
351
352        assert_eq!(record.request_count, 400);
353        assert_eq!(record.error_count, 100);
354        assert_eq!(record.space, SPACE);
355        assert_eq!(record.name, NAME);
356        assert_eq!(record.version, VERSION);
357
358        let route_metrics = record.route_metrics;
359
360        // check route metrics. Filter to get home route metrics
361        let home_metrics = route_metrics
362            .iter()
363            .find(|x| x.route_name == "/home")
364            .unwrap();
365
366        assert_eq!(home_metrics.request_count, 100);
367        assert_eq!(home_metrics.error_count, 100);
368    }
369
370    #[test]
371    fn test_increment() {
372        let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
373        observer.increment("/home", 100.0, 200).unwrap();
374        assert_eq!(observer.request_count, 1);
375        assert_eq!(observer.error_count, 0);
376        let sum_latency = observer
377            .request_latency
378            .get("/home")
379            .unwrap()
380            .request_latency
381            .iter()
382            .sum::<f64>();
383        assert_eq!(sum_latency, 100.0);
384
385        assert_eq!(
386            observer.request_latency.get("/home").unwrap().request_count,
387            1
388        );
389
390        observer.increment("/home", 50.0, 500).unwrap();
391        assert_eq!(observer.request_count, 2);
392        assert_eq!(observer.error_count, 1);
393        let sum_latency = observer
394            .request_latency
395            .get("/home")
396            .unwrap()
397            .request_latency
398            .iter()
399            .sum::<f64>();
400        assert_eq!(sum_latency, 100.0);
401
402        assert_eq!(
403            observer.request_latency.get("/home").unwrap().error_latency,
404            50.0
405        );
406        assert_eq!(
407            observer.request_latency.get("/home").unwrap().request_count,
408            1
409        );
410        assert_eq!(
411            observer.request_latency.get("/home").unwrap().error_count,
412            1
413        );
414    }
415
416    #[test]
417    fn test_reset_metrics() {
418        let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
419        observer.increment("/home", 100.0, 200).unwrap();
420        observer.increment("/home", 50.0, 500).unwrap();
421
422        observer.reset_metrics();
423        assert_eq!(observer.request_count, 0);
424        assert_eq!(observer.error_count, 0);
425        assert!(observer
426            .request_latency
427            .get("/home")
428            .unwrap()
429            .request_latency
430            .is_empty());
431        assert!(observer.request_latency.get("/home").unwrap().error_latency == 0.0);
432        assert!(observer.request_latency.get("/home").unwrap().request_count == 0);
433        assert!(observer.request_latency.get("/home").unwrap().error_count == 0);
434    }
435}