1pub mod error;
2use crate::error::ObservabilityError;
3use ndarray::Array1;
4use ndarray_stats::interpolate::Nearest;
5use ndarray_stats::Quantile1dExt;
6use noisy_float::types::n64;
7use pyo3::prelude::*;
8use rayon::iter::IntoParallelIterator;
9use rayon::iter::ParallelIterator;
10use scouter_types::{
11 LatencyMetrics, ObservabilityMetrics, RouteMetrics, ServerRecord, ServerRecords,
12};
13use std::collections::HashMap;
14use tracing::{debug, error};
15
16#[derive(Clone, Debug)]
17struct RouteLatency {
18 request_latency: Vec<f64>,
19 request_count: i64,
20 error_count: i64,
21 error_latency: f64,
22 status_codes: HashMap<usize, i64>,
23}
24
25#[pyclass]
26#[derive(Clone, Debug)]
27pub struct Observer {
28 uid: String,
29 request_count: i64,
30 error_count: i64,
31 request_latency: HashMap<String, RouteLatency>,
32}
33
34#[pymethods]
35impl Observer {
36 #[new]
37 pub fn new(uid: String) -> Self {
38 Observer {
39 uid,
40 request_count: 0,
41 error_count: 0,
42 request_latency: HashMap::new(),
43 }
44 }
45
46 fn increment_request_count(&mut self) {
47 self.request_count += 1;
48 }
49
50 fn increment_error_count(&mut self, status: &str) {
51 if status != "OK" {
52 self.error_count += 1;
53 }
54 }
55
56 fn update_route_latency(
57 &mut self,
58 route: &str,
59 latency: f64,
60 status: &str,
61 status_code: usize,
62 ) -> Result<(), ObservabilityError> {
63 if status == "OK" {
65 if let Some(route_latency) = self.request_latency.get_mut(route) {
67 route_latency.request_latency.push(latency);
68 route_latency.request_count += 1;
69 } else {
70 self.request_latency.insert(
71 route.to_string(),
72 RouteLatency {
73 request_latency: vec![latency],
74 request_count: 1,
75 error_count: 0,
76 error_latency: 0.0,
77 status_codes: HashMap::new(),
78 },
79 );
80 }
81
82 } else {
84 if let Some(route_latency) = self.request_latency.get_mut(route) {
86 route_latency.error_latency += latency;
87 route_latency.error_count += 1;
88 } else {
89 self.request_latency.insert(
90 route.to_string(),
91 RouteLatency {
92 request_latency: vec![],
93 request_count: 0,
94 error_count: 1,
95 error_latency: latency,
96 status_codes: HashMap::new(),
97 },
98 );
99 }
100 }
101
102 let route_latency = self
105 .request_latency
106 .get_mut(route)
107 .ok_or(ObservabilityError::RouteNotFound(route.to_string()))?;
108 if let Some(status_code_count) = route_latency.status_codes.get_mut(&status_code) {
109 *status_code_count += 1;
110 } else {
111 route_latency.status_codes.insert(status_code, 1);
112 }
113
114 Ok(())
115 }
116
117 pub fn increment(
118 &mut self,
119 route: &str,
120 latency: f64,
121 status_code: usize,
122 ) -> Result<(), ObservabilityError> {
123 let status = if (200..400).contains(&status_code) {
124 "OK"
125 } else {
126 "ERROR"
127 };
128
129 self.increment_request_count();
130 self.update_route_latency(route, latency, status, status_code)
131 .map_err(|e| {
132 error!("Failed to update route latency: {:?}", e);
133 })
134 .ok();
135 self.increment_error_count(status);
136
137 Ok(())
138 }
139
140 pub fn collect_metrics(&self) -> Result<Option<ServerRecords>, ObservabilityError> {
141 if self.request_count == 0 {
142 return Ok(None);
143 }
144
145 debug!("Collecting metrics: {:?}", self.request_latency);
146
147 let route_metrics = self
148 .request_latency
149 .clone()
150 .into_par_iter()
151 .map(|(route, route_latency)| {
152 let mut latency_array = Array1::from_vec(
153 route_latency
154 .request_latency
155 .iter()
156 .map(|&x| n64(x))
157 .collect::<Vec<_>>(),
158 );
159 let qs = &[n64(0.05), n64(0.25), n64(0.5), n64(0.95), n64(0.99)];
160 let quantiles =
161 latency_array.quantiles_mut(&Array1::from_vec(qs.to_vec()), &Nearest);
162
163 match quantiles {
164 Ok(quantiles) => Ok(RouteMetrics {
165 metrics: LatencyMetrics {
166 p5: quantiles[0].into(),
167 p25: quantiles[1].into(),
168 p50: quantiles[2].into(),
169 p95: quantiles[3].into(),
170 p99: quantiles[4].into(),
171 },
172 request_count: route_latency.request_count,
173 error_count: route_latency.error_count,
174 error_latency: route_latency.error_latency,
175 status_codes: route_latency.status_codes.clone(),
176 route_name: route,
177 }),
178 Err(e) => Err(ObservabilityError::QuantileError(e.to_string())),
180 }
181 })
182 .collect::<Vec<Result<RouteMetrics, ObservabilityError>>>();
183
184 let route_metrics = route_metrics
186 .into_iter()
187 .filter_map(|x| match x {
188 Ok(route_metrics) => Some(route_metrics),
189 Err(e) => {
190 debug!("Failed to collect metrics for route: {:?}", e);
191 None
192 }
193 })
194 .collect::<Vec<RouteMetrics>>();
195
196 if route_metrics.is_empty() {
198 return Ok(None);
199 }
200
201 let record = ServerRecord::Observability(ObservabilityMetrics {
202 uid: self.uid.clone(),
203 request_count: self.request_count,
204 error_count: self.error_count,
205 route_metrics,
206 entity_id: 0,
207 });
208
209 Ok(Some(ServerRecords {
210 records: vec![record],
211 }))
212 }
213
214 pub fn reset_metrics(&mut self) {
215 self.request_count = 0;
216 self.error_count = 0;
217
218 for (_, route_latency) in self.request_latency.iter_mut() {
220 route_latency.request_latency = vec![];
221 route_latency.request_count = 0;
222 route_latency.error_count = 0;
223 route_latency.error_latency = 0.0;
224 route_latency.status_codes.clear();
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use rand::Rng;
233
234 const UID: &str = "test-uid";
235
236 #[test]
237 fn test_increment_request_count() {
238 let mut observer = Observer::new(UID.to_string());
239 observer.increment_request_count();
240 assert_eq!(observer.request_count, 1);
241 }
242
243 #[test]
244 fn test_increment_error_count() {
245 let mut observer = Observer::new(UID.to_string());
246 observer.increment_error_count("ERROR");
247 assert_eq!(observer.error_count, 1);
248 observer.increment_error_count("OK");
249 assert_eq!(observer.error_count, 1);
250 }
251
252 #[test]
253 fn test_update_route_latency() {
254 let mut observer = Observer::new(UID.to_string());
255 observer
256 .update_route_latency("/home", 100.0, "OK", 200)
257 .unwrap();
258 let sum_latency = observer
259 .request_latency
260 .get("/home")
261 .unwrap()
262 .request_latency
263 .iter()
264 .sum::<f64>();
265 assert_eq!(sum_latency, 100.0);
266 assert_eq!(
267 observer.request_latency.get("/home").unwrap().request_count,
268 1
269 );
270
271 observer
272 .update_route_latency("/home", 50.0, "OK", 200)
273 .unwrap();
274 let sum_latency = observer
275 .request_latency
276 .get("/home")
277 .unwrap()
278 .request_latency
279 .iter()
280 .sum::<f64>();
281 assert_eq!(sum_latency, 150.0);
282 assert_eq!(
283 observer.request_latency.get("/home").unwrap().request_count,
284 2
285 );
286
287 observer
288 .update_route_latency("/home", 50.0, "ERROR", 500)
289 .unwrap();
290 let sum_latency = observer
291 .request_latency
292 .get("/home")
293 .unwrap()
294 .request_latency
295 .iter()
296 .sum::<f64>();
297 assert_eq!(sum_latency, 150.0);
298 assert_eq!(
299 observer.request_latency.get("/home").unwrap().error_latency,
300 50.0
301 );
302 assert_eq!(
303 observer.request_latency.get("/home").unwrap().request_count,
304 2
305 );
306 assert_eq!(
307 observer.request_latency.get("/home").unwrap().error_count,
308 1
309 );
310
311 let status_codes = &observer.request_latency.get("/home").unwrap().status_codes;
312 assert_eq!(status_codes.get(&200).unwrap(), &2);
313 assert_eq!(status_codes.get(&500).unwrap(), &1);
314 }
315
316 #[test]
317 fn test_collect_metrics() {
318 let mut observer = Observer::new(UID.to_string());
320 for i in 0..100 {
321 let num1 = rand::thread_rng().gen_range(0..100);
323 let num2 = rand::thread_rng().gen_range(0..100);
324 let num3 = rand::thread_rng().gen_range(0..100);
325 observer.increment("/home", num1 as f64, 200).unwrap();
326 observer.increment("/home", 50.0 + i as f64, 404).unwrap();
327 observer.increment("/about", num2 as f64, 200).unwrap();
328 observer.increment("/contact", num3 as f64, 200).unwrap();
329 }
330
331 let metrics = observer.collect_metrics().unwrap().unwrap();
332 metrics.model_dump_json();
333 metrics.__str__();
334
335 let metrics = metrics.records[0].clone();
338
339 let record = match metrics {
341 ServerRecord::Observability(record) => record,
342 _ => panic!("Expected observability record"),
343 };
344
345 assert_eq!(record.request_count, 400);
346 assert_eq!(record.error_count, 100);
347 assert_eq!(record.uid, UID);
348
349 let route_metrics = record.route_metrics;
350
351 let home_metrics = route_metrics
353 .iter()
354 .find(|x| x.route_name == "/home")
355 .unwrap();
356
357 assert_eq!(home_metrics.request_count, 100);
358 assert_eq!(home_metrics.error_count, 100);
359 }
360
361 #[test]
362 fn test_increment() {
363 let mut observer = Observer::new(UID.to_string());
364 observer.increment("/home", 100.0, 200).unwrap();
365 assert_eq!(observer.request_count, 1);
366 assert_eq!(observer.error_count, 0);
367 let sum_latency = observer
368 .request_latency
369 .get("/home")
370 .unwrap()
371 .request_latency
372 .iter()
373 .sum::<f64>();
374 assert_eq!(sum_latency, 100.0);
375
376 assert_eq!(
377 observer.request_latency.get("/home").unwrap().request_count,
378 1
379 );
380
381 observer.increment("/home", 50.0, 500).unwrap();
382 assert_eq!(observer.request_count, 2);
383 assert_eq!(observer.error_count, 1);
384 let sum_latency = observer
385 .request_latency
386 .get("/home")
387 .unwrap()
388 .request_latency
389 .iter()
390 .sum::<f64>();
391 assert_eq!(sum_latency, 100.0);
392
393 assert_eq!(
394 observer.request_latency.get("/home").unwrap().error_latency,
395 50.0
396 );
397 assert_eq!(
398 observer.request_latency.get("/home").unwrap().request_count,
399 1
400 );
401 assert_eq!(
402 observer.request_latency.get("/home").unwrap().error_count,
403 1
404 );
405 }
406
407 #[test]
408 fn test_reset_metrics() {
409 let mut observer = Observer::new(UID.to_string());
410 observer.increment("/home", 100.0, 200).unwrap();
411 observer.increment("/home", 50.0, 500).unwrap();
412
413 observer.reset_metrics();
414 assert_eq!(observer.request_count, 0);
415 assert_eq!(observer.error_count, 0);
416 assert!(observer
417 .request_latency
418 .get("/home")
419 .unwrap()
420 .request_latency
421 .is_empty());
422 assert!(observer.request_latency.get("/home").unwrap().error_latency == 0.0);
423 assert!(observer.request_latency.get("/home").unwrap().request_count == 0);
424 assert!(observer.request_latency.get("/home").unwrap().error_count == 0);
425 }
426}