1pub mod error;
2use crate::error::ObservabilityError;
3use ndarray::Array1;
4use ndarray_stats::interpolate::Nearest;
5use ndarray_stats::Quantile1dExt;
6use noisy_float::types::n64;
7use pyo3::prelude::*;
8use rayon::iter::IntoParallelIterator;
9use rayon::iter::ParallelIterator;
10use scouter_types::{
11 LatencyMetrics, ObservabilityMetrics, RouteMetrics, ServerRecord, ServerRecords,
12};
13use std::collections::HashMap;
14use tracing::{debug, error};
15
16#[derive(Clone, Debug)]
17struct RouteLatency {
18 request_latency: Vec<f64>,
19 request_count: i64,
20 error_count: i64,
21 error_latency: f64,
22 status_codes: HashMap<usize, i64>,
23}
24
25#[pyclass]
26#[derive(Clone, Debug)]
27pub struct Observer {
28 space: String,
29 name: String,
30 version: String,
31 request_count: i64,
32 error_count: i64,
33 request_latency: HashMap<String, RouteLatency>,
34}
35
36#[pymethods]
37impl Observer {
38 #[new]
39 pub fn new(space: String, name: String, version: String) -> Self {
40 Observer {
41 space,
42 name,
43 version,
44 request_count: 0,
45 error_count: 0,
46 request_latency: HashMap::new(),
47 }
48 }
49
50 fn increment_request_count(&mut self) {
51 self.request_count += 1;
52 }
53
54 fn increment_error_count(&mut self, status: &str) {
55 if status != "OK" {
56 self.error_count += 1;
57 }
58 }
59
60 fn update_route_latency(
61 &mut self,
62 route: &str,
63 latency: f64,
64 status: &str,
65 status_code: usize,
66 ) -> Result<(), ObservabilityError> {
67 if status == "OK" {
69 if let Some(route_latency) = self.request_latency.get_mut(route) {
71 route_latency.request_latency.push(latency);
72 route_latency.request_count += 1;
73 } else {
74 self.request_latency.insert(
75 route.to_string(),
76 RouteLatency {
77 request_latency: vec![latency],
78 request_count: 1,
79 error_count: 0,
80 error_latency: 0.0,
81 status_codes: HashMap::new(),
82 },
83 );
84 }
85
86 } else {
88 if let Some(route_latency) = self.request_latency.get_mut(route) {
90 route_latency.error_latency += latency;
91 route_latency.error_count += 1;
92 } else {
93 self.request_latency.insert(
94 route.to_string(),
95 RouteLatency {
96 request_latency: vec![],
97 request_count: 0,
98 error_count: 1,
99 error_latency: latency,
100 status_codes: HashMap::new(),
101 },
102 );
103 }
104 }
105
106 let route_latency = self
109 .request_latency
110 .get_mut(route)
111 .ok_or(ObservabilityError::RouteNotFound(route.to_string()))?;
112 if let Some(status_code_count) = route_latency.status_codes.get_mut(&status_code) {
113 *status_code_count += 1;
114 } else {
115 route_latency.status_codes.insert(status_code, 1);
116 }
117
118 Ok(())
119 }
120
121 pub fn increment(
122 &mut self,
123 route: &str,
124 latency: f64,
125 status_code: usize,
126 ) -> Result<(), ObservabilityError> {
127 let status = if (200..400).contains(&status_code) {
128 "OK"
129 } else {
130 "ERROR"
131 };
132
133 self.increment_request_count();
134 self.update_route_latency(route, latency, status, status_code)
135 .map_err(|e| {
136 error!("Failed to update route latency: {:?}", e);
137 })
138 .ok();
139 self.increment_error_count(status);
140
141 Ok(())
142 }
143
144 pub fn collect_metrics(&self) -> Result<Option<ServerRecords>, ObservabilityError> {
145 if self.request_count == 0 {
146 return Ok(None);
147 }
148
149 debug!("Collecting metrics: {:?}", self.request_latency);
150
151 let route_metrics = self
152 .request_latency
153 .clone()
154 .into_par_iter()
155 .map(|(route, route_latency)| {
156 let mut latency_array = Array1::from_vec(
157 route_latency
158 .request_latency
159 .iter()
160 .map(|&x| n64(x))
161 .collect::<Vec<_>>(),
162 );
163 let qs = &[n64(0.05), n64(0.25), n64(0.5), n64(0.95), n64(0.99)];
164 let quantiles =
165 latency_array.quantiles_mut(&Array1::from_vec(qs.to_vec()), &Nearest);
166
167 match quantiles {
168 Ok(quantiles) => Ok(RouteMetrics {
169 metrics: LatencyMetrics {
170 p5: quantiles[0].into(),
171 p25: quantiles[1].into(),
172 p50: quantiles[2].into(),
173 p95: quantiles[3].into(),
174 p99: quantiles[4].into(),
175 },
176 request_count: route_latency.request_count,
177 error_count: route_latency.error_count,
178 error_latency: route_latency.error_latency,
179 status_codes: route_latency.status_codes.clone(),
180 route_name: route,
181 }),
182 Err(e) => Err(ObservabilityError::QuantileError(e.to_string())),
184 }
185 })
186 .collect::<Vec<Result<RouteMetrics, ObservabilityError>>>();
187
188 let route_metrics = route_metrics
190 .into_iter()
191 .filter_map(|x| match x {
192 Ok(route_metrics) => Some(route_metrics),
193 Err(e) => {
194 debug!("Failed to collect metrics for route: {:?}", e);
195 None
196 }
197 })
198 .collect::<Vec<RouteMetrics>>();
199
200 if route_metrics.is_empty() {
202 return Ok(None);
203 }
204
205 let record = ServerRecord::Observability(ObservabilityMetrics {
206 space: self.space.clone(),
207 name: self.name.clone(),
208 version: self.version.clone(),
209 request_count: self.request_count,
210 error_count: self.error_count,
211 route_metrics,
212 });
213
214 Ok(Some(ServerRecords {
215 records: vec![record],
216 }))
217 }
218
219 pub fn reset_metrics(&mut self) {
220 self.request_count = 0;
221 self.error_count = 0;
222
223 for (_, route_latency) in self.request_latency.iter_mut() {
225 route_latency.request_latency = vec![];
226 route_latency.request_count = 0;
227 route_latency.error_count = 0;
228 route_latency.error_latency = 0.0;
229 route_latency.status_codes.clear();
230 }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use rand::Rng;
238
239 const SPACE: &str = "test";
240 const NAME: &str = "test";
241 const VERSION: &str = "test";
242
243 #[test]
244 fn test_increment_request_count() {
245 let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
246 observer.increment_request_count();
247 assert_eq!(observer.request_count, 1);
248 }
249
250 #[test]
251 fn test_increment_error_count() {
252 let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
253 observer.increment_error_count("ERROR");
254 assert_eq!(observer.error_count, 1);
255 observer.increment_error_count("OK");
256 assert_eq!(observer.error_count, 1);
257 }
258
259 #[test]
260 fn test_update_route_latency() {
261 let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
262 observer
263 .update_route_latency("/home", 100.0, "OK", 200)
264 .unwrap();
265 let sum_latency = observer
266 .request_latency
267 .get("/home")
268 .unwrap()
269 .request_latency
270 .iter()
271 .sum::<f64>();
272 assert_eq!(sum_latency, 100.0);
273 assert_eq!(
274 observer.request_latency.get("/home").unwrap().request_count,
275 1
276 );
277
278 observer
279 .update_route_latency("/home", 50.0, "OK", 200)
280 .unwrap();
281 let sum_latency = observer
282 .request_latency
283 .get("/home")
284 .unwrap()
285 .request_latency
286 .iter()
287 .sum::<f64>();
288 assert_eq!(sum_latency, 150.0);
289 assert_eq!(
290 observer.request_latency.get("/home").unwrap().request_count,
291 2
292 );
293
294 observer
295 .update_route_latency("/home", 50.0, "ERROR", 500)
296 .unwrap();
297 let sum_latency = observer
298 .request_latency
299 .get("/home")
300 .unwrap()
301 .request_latency
302 .iter()
303 .sum::<f64>();
304 assert_eq!(sum_latency, 150.0);
305 assert_eq!(
306 observer.request_latency.get("/home").unwrap().error_latency,
307 50.0
308 );
309 assert_eq!(
310 observer.request_latency.get("/home").unwrap().request_count,
311 2
312 );
313 assert_eq!(
314 observer.request_latency.get("/home").unwrap().error_count,
315 1
316 );
317
318 let status_codes = &observer.request_latency.get("/home").unwrap().status_codes;
319 assert_eq!(status_codes.get(&200).unwrap(), &2);
320 assert_eq!(status_codes.get(&500).unwrap(), &1);
321 }
322
323 #[test]
324 fn test_collect_metrics() {
325 let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
327 for i in 0..100 {
328 let num1 = rand::thread_rng().gen_range(0..100);
330 let num2 = rand::thread_rng().gen_range(0..100);
331 let num3 = rand::thread_rng().gen_range(0..100);
332 observer.increment("/home", num1 as f64, 200).unwrap();
333 observer.increment("/home", 50.0 + i as f64, 404).unwrap();
334 observer.increment("/about", num2 as f64, 200).unwrap();
335 observer.increment("/contact", num3 as f64, 200).unwrap();
336 }
337
338 let metrics = observer.collect_metrics().unwrap().unwrap();
339 metrics.model_dump_json();
340 metrics.__str__();
341
342 let metrics = metrics.records[0].clone();
345
346 let record = match metrics {
348 ServerRecord::Observability(record) => record,
349 _ => panic!("Expected observability record"),
350 };
351
352 assert_eq!(record.request_count, 400);
353 assert_eq!(record.error_count, 100);
354 assert_eq!(record.space, SPACE);
355 assert_eq!(record.name, NAME);
356 assert_eq!(record.version, VERSION);
357
358 let route_metrics = record.route_metrics;
359
360 let home_metrics = route_metrics
362 .iter()
363 .find(|x| x.route_name == "/home")
364 .unwrap();
365
366 assert_eq!(home_metrics.request_count, 100);
367 assert_eq!(home_metrics.error_count, 100);
368 }
369
370 #[test]
371 fn test_increment() {
372 let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
373 observer.increment("/home", 100.0, 200).unwrap();
374 assert_eq!(observer.request_count, 1);
375 assert_eq!(observer.error_count, 0);
376 let sum_latency = observer
377 .request_latency
378 .get("/home")
379 .unwrap()
380 .request_latency
381 .iter()
382 .sum::<f64>();
383 assert_eq!(sum_latency, 100.0);
384
385 assert_eq!(
386 observer.request_latency.get("/home").unwrap().request_count,
387 1
388 );
389
390 observer.increment("/home", 50.0, 500).unwrap();
391 assert_eq!(observer.request_count, 2);
392 assert_eq!(observer.error_count, 1);
393 let sum_latency = observer
394 .request_latency
395 .get("/home")
396 .unwrap()
397 .request_latency
398 .iter()
399 .sum::<f64>();
400 assert_eq!(sum_latency, 100.0);
401
402 assert_eq!(
403 observer.request_latency.get("/home").unwrap().error_latency,
404 50.0
405 );
406 assert_eq!(
407 observer.request_latency.get("/home").unwrap().request_count,
408 1
409 );
410 assert_eq!(
411 observer.request_latency.get("/home").unwrap().error_count,
412 1
413 );
414 }
415
416 #[test]
417 fn test_reset_metrics() {
418 let mut observer = Observer::new(SPACE.to_string(), NAME.to_string(), VERSION.to_string());
419 observer.increment("/home", 100.0, 200).unwrap();
420 observer.increment("/home", 50.0, 500).unwrap();
421
422 observer.reset_metrics();
423 assert_eq!(observer.request_count, 0);
424 assert_eq!(observer.error_count, 0);
425 assert!(observer
426 .request_latency
427 .get("/home")
428 .unwrap()
429 .request_latency
430 .is_empty());
431 assert!(observer.request_latency.get("/home").unwrap().error_latency == 0.0);
432 assert!(observer.request_latency.get("/home").unwrap().request_count == 0);
433 assert!(observer.request_latency.get("/home").unwrap().error_count == 0);
434 }
435}