conjure_runtime/
host_metrics.rs1use hyper::StatusCode;
15use parking_lot::Mutex;
16use std::collections::hash_map;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use witchcraft_metrics::{Meter, Timer};
21
22#[derive(Default)]
24pub struct HostMetricsRegistry(Mutex<Arc<HashMap<HostId, Arc<HostMetrics>>>>);
25
26impl HostMetricsRegistry {
27 pub fn new() -> HostMetricsRegistry {
29 Default::default()
30 }
31
32 pub(crate) fn get(&self, service: &str, host: &str, port: u16) -> Arc<HostMetrics> {
33 let key = HostId {
34 service: service.to_string(),
35 host: host.to_string(),
36 port,
37 };
38
39 Arc::make_mut(&mut *self.0.lock())
40 .entry(key)
41 .or_insert_with(|| Arc::new(HostMetrics::new(service, host, port)))
42 .clone()
43 }
44
45 pub fn hosts(&self) -> Hosts {
49 let mut map = self.0.lock();
50
51 Arc::make_mut(&mut *map).retain(|_, v| Arc::strong_count(v) > 1);
53
54 Hosts(map.clone())
55 }
56}
57
58#[derive(Clone, PartialEq, Eq, Hash)]
59struct HostId {
60 service: String,
61 host: String,
62 port: u16,
63}
64
65pub struct HostMetrics {
67 service_name: String,
68 hostname: String,
69 port: u16,
70 last_update: Mutex<Instant>,
71 response_1xx: Timer,
72 response_2xx: Timer,
73 response_3xx: Timer,
74 response_4xx: Timer,
75 response_5xx: Timer,
76 response_qos: Timer,
77 response_other: Timer,
78 io_error: Meter,
79}
80
81impl HostMetrics {
82 pub(crate) fn new(service: &str, host: &str, port: u16) -> HostMetrics {
83 HostMetrics {
84 service_name: service.to_string(),
85 hostname: host.to_string(),
86 port,
87 last_update: Mutex::new(Instant::now()),
88 response_1xx: Timer::default(),
89 response_2xx: Timer::default(),
90 response_3xx: Timer::default(),
91 response_4xx: Timer::default(),
92 response_5xx: Timer::default(),
93 response_qos: Timer::default(),
94 response_other: Timer::default(),
95 io_error: Meter::default(),
96 }
97 }
98
99 pub(crate) fn update(&self, status: StatusCode, duration: Duration) {
100 *self.last_update.lock() = Instant::now();
101 #[allow(clippy::match_overlapping_arm)]
102 let timer = match status.as_u16() {
103 429 | 503 => &self.response_qos,
104 100..=199 => &self.response_1xx,
105 200..=299 => &self.response_2xx,
106 300..=399 => &self.response_3xx,
107 400..=499 => &self.response_4xx,
108 500..=599 => &self.response_5xx,
109 _ => &self.response_other,
110 };
111 timer.update(duration);
112 }
113
114 pub(crate) fn update_io_error(&self) {
115 *self.last_update.lock() = Instant::now();
116 self.io_error.mark(1);
117 }
118
119 pub fn service_name(&self) -> &str {
121 &self.service_name
122 }
123
124 pub fn hostname(&self) -> &str {
126 &self.hostname
127 }
128
129 pub fn port(&self) -> u16 {
131 self.port
132 }
133
134 pub fn last_update(&self) -> Instant {
136 *self.last_update.lock()
137 }
138
139 pub fn response_1xx(&self) -> &Timer {
141 &self.response_1xx
142 }
143
144 pub fn response_2xx(&self) -> &Timer {
146 &self.response_2xx
147 }
148
149 pub fn response_3xx(&self) -> &Timer {
151 &self.response_3xx
152 }
153
154 pub fn response_4xx(&self) -> &Timer {
156 &self.response_4xx
157 }
158
159 pub fn response_5xx(&self) -> &Timer {
161 &self.response_5xx
162 }
163
164 pub fn response_qos(&self) -> &Timer {
166 &self.response_qos
167 }
168
169 pub fn response_other(&self) -> &Timer {
171 &self.response_other
172 }
173
174 pub fn io_error(&self) -> &Meter {
176 &self.io_error
177 }
178}
179
180pub struct Hosts(Arc<HashMap<HostId, Arc<HostMetrics>>>);
182
183impl Hosts {
184 pub fn iter(&self) -> HostsIter<'_> {
186 HostsIter(self.0.values())
187 }
188}
189
190impl<'a> IntoIterator for &'a Hosts {
191 type Item = &'a HostMetrics;
192 type IntoIter = HostsIter<'a>;
193
194 fn into_iter(self) -> HostsIter<'a> {
195 self.iter()
196 }
197}
198
199pub struct HostsIter<'a>(hash_map::Values<'a, HostId, Arc<HostMetrics>>);
201
202impl<'a> Iterator for HostsIter<'a> {
203 type Item = &'a HostMetrics;
204
205 #[inline]
206 fn next(&mut self) -> Option<&'a HostMetrics> {
207 self.0.next().map(|h| &**h)
208 }
209
210 fn size_hint(&self) -> (usize, Option<usize>) {
211 self.0.size_hint()
212 }
213}
214
215#[cfg(test)]
216mod test {
217 use crate::HostMetricsRegistry;
218 use hyper::StatusCode;
219 use std::collections::HashMap;
220 use std::time::Duration;
221
222 #[test]
223 fn node_gc() {
224 let registry = HostMetricsRegistry::new();
225
226 let health1 = registry.get("service", "localhost", 1234);
227 let health2 = registry.get("service", "localhost", 5678);
228
229 health1.update(StatusCode::OK, Duration::from_secs(1));
230 health2.update(StatusCode::INTERNAL_SERVER_ERROR, Duration::from_secs(1));
231
232 let snapshot = registry.hosts();
233 let nodes = snapshot
234 .iter()
235 .map(|h| (h.port(), h))
236 .collect::<HashMap<_, _>>();
237 assert_eq!(nodes.len(), 2);
238 assert_eq!(nodes[&1234].response_2xx().count(), 1);
239 assert_eq!(nodes[&5678].response_5xx().count(), 1);
240 drop(nodes);
241 drop(snapshot);
242
243 drop(health2);
244
245 let snapshot = registry.hosts();
246 let nodes = snapshot
247 .iter()
248 .map(|h| (h.port(), h))
249 .collect::<HashMap<_, _>>();
250 assert_eq!(nodes.len(), 1);
251 assert_eq!(nodes[&1234].response_2xx().count(), 1);
252 }
253}