xapi_rs/lrs/resources/
stats.rs1use crate::MyError;
11use dashmap::DashMap;
12use rocket::{
13 fairing::{Fairing, Info, Kind},
14 get,
15 http::Method,
16 routes,
17 serde::json::Json,
18 Orbit, Rocket, Route,
19};
20use serde::Serialize;
21use std::sync::{
22 atomic::{AtomicU64, Ordering},
23 Arc, OnceLock,
24};
25use tracing::{error, info};
26
27#[derive(Debug, Eq, Hash, PartialEq)]
29struct RouteAttributes {
30 method: Method,
31 path: String,
32 mime: String,
33 rank: isize,
34}
35
36impl From<&Route> for RouteAttributes {
37 fn from(route: &Route) -> RouteAttributes {
38 let mime = if route.format.is_none() {
39 "N/A".to_owned()
40 } else {
41 route.format.as_ref().unwrap().to_string()
42 };
43 RouteAttributes {
44 method: route.method,
45 path: route.uri.origin.path().to_string(),
46 mime,
47 rank: route.rank,
48 }
49 }
50}
51
52#[derive(Debug)]
54struct RouteStats {
55 count: AtomicU64,
57 min: AtomicU64,
59 avg: AtomicU64,
60 max: AtomicU64,
61}
62
63impl Default for RouteStats {
64 fn default() -> Self {
65 Self {
66 count: Default::default(),
67 min: AtomicU64::new(u64::MAX),
68 avg: Default::default(),
69 max: Default::default(),
70 }
71 }
72}
73
74static ENDPOINTS: OnceLock<Arc<DashMap<RouteAttributes, RouteStats>>> = OnceLock::new();
75fn endpoints() -> Arc<DashMap<RouteAttributes, RouteStats>> {
76 ENDPOINTS.get_or_init(|| Arc::new(DashMap::new())).clone()
77}
78
79pub(crate) struct StatsFairing;
81
82#[rocket::async_trait]
83impl Fairing for StatsFairing {
84 fn info(&self) -> Info {
85 Info {
86 name: "Routes Statistics",
87 kind: Kind::Liftoff | Kind::Shutdown,
88 }
89 }
90
91 async fn on_liftoff(&self, r: &Rocket<Orbit>) {
93 for route in r.routes() {
94 let key = RouteAttributes::from(route);
95 endpoints().insert(key, RouteStats::default());
96 }
97 }
98
99 async fn on_shutdown(&self, _: &Rocket<Orbit>) {
101 let stats = endpoints();
102 let (total_count, total_avg): (u64, u64) = stats
103 .iter()
104 .filter(|e| e.count.load(Ordering::Relaxed) > 0)
105 .fold((0, 0), |(sum_count, sum_avg), e| {
106 (
107 sum_count + e.count.load(Ordering::Relaxed),
108 sum_avg + e.avg.load(Ordering::Relaxed),
109 )
110 });
111 let average_duration = if total_count > 0 {
112 total_avg / total_count
113 } else {
114 0
115 };
116 info!("LaRS stats\n{:?}", stats);
117 info!(
118 "*** Total calls = {}; Average duration = {} ns",
119 total_count, average_duration
120 );
121 }
122}
123
124pub(crate) fn update_stats(route: &Route, duration: u64) {
126 let key = RouteAttributes::from(route);
127 let tmp = endpoints();
128 let tmp = tmp.get_mut(&key);
129 match tmp {
130 Some(endpoint) => {
131 endpoint.min.fetch_min(duration, Ordering::Relaxed);
132 endpoint.max.fetch_max(duration, Ordering::Relaxed);
133 let old_count = endpoint.count.fetch_add(1, Ordering::Relaxed);
134 let old_avg = endpoint.avg.fetch_add(0, Ordering::Relaxed);
135 let new_avg = (old_count * old_avg + duration) / (old_count + 1);
136 endpoint.avg.store(new_avg, Ordering::Relaxed);
137 }
138 _ => error!("Failed finding stats for {}", route),
139 }
140}
141
142#[doc(hidden)]
143pub fn routes() -> Vec<rocket::Route> {
144 routes![stats]
145}
146
147#[derive(Debug, Serialize)]
148struct StatsRecord {
149 method: String,
150 path: String,
151 mime: String,
152 rank: isize,
153 count: u64,
154 min: u64,
155 avg: u64,
156 max: u64,
157}
158
159#[get("/")]
160async fn stats() -> Result<Json<Vec<StatsRecord>>, MyError> {
161 let result = endpoints()
162 .iter()
163 .filter(|x| x.count.load(Ordering::Relaxed) > 0)
164 .map(|x| {
165 let (k, v) = x.pair();
166 StatsRecord {
167 method: k.method.to_string(),
168 path: k.path.clone(),
169 mime: k.mime.clone(),
170 rank: k.rank,
171 count: v.count.load(Ordering::Relaxed),
172 min: v.min.load(Ordering::Relaxed),
173 avg: v.avg.load(Ordering::Relaxed),
174 max: v.max.load(Ordering::Relaxed),
175 }
176 })
177 .collect();
178 Ok(Json(result))
179}