conjure_runtime/
host_metrics.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use hyper::StatusCode;
15use parking_lot::Mutex;
16use std::collections::hash_map;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use witchcraft_metrics::{Meter, Timer};
21
22/// A collection of metrics about requests to service hosts.
23#[derive(Default)]
24pub struct HostMetricsRegistry(Mutex<Arc<HashMap<HostId, Arc<HostMetrics>>>>);
25
26impl HostMetricsRegistry {
27    /// Returns a new, empty registry.
28    pub fn new() -> HostMetricsRegistry {
29        Default::default()
30    }
31
32    pub(crate) fn get(&self, service: &str, host: &str, port: u16) -> Arc<HostMetrics> {
33        let key = HostId {
34            service: service.to_string(),
35            host: host.to_string(),
36            port,
37        };
38
39        Arc::make_mut(&mut *self.0.lock())
40            .entry(key)
41            .or_insert_with(|| Arc::new(HostMetrics::new(service, host, port)))
42            .clone()
43    }
44
45    /// Returns a snapshot of the hosts in the registry.
46    ///
47    /// Modifications to the registry after this method is called will not affect the contents of the returned `Hosts`.
48    pub fn hosts(&self) -> Hosts {
49        let mut map = self.0.lock();
50
51        // use this as an opportunity to clear out dead nodes
52        Arc::make_mut(&mut *map).retain(|_, v| Arc::strong_count(v) > 1);
53
54        Hosts(map.clone())
55    }
56}
57
58#[derive(Clone, PartialEq, Eq, Hash)]
59struct HostId {
60    service: String,
61    host: String,
62    port: u16,
63}
64
65/// Metrics about requests made to a specific host of a service.
66pub struct HostMetrics {
67    service_name: String,
68    hostname: String,
69    port: u16,
70    last_update: Mutex<Instant>,
71    response_1xx: Timer,
72    response_2xx: Timer,
73    response_3xx: Timer,
74    response_4xx: Timer,
75    response_5xx: Timer,
76    response_qos: Timer,
77    response_other: Timer,
78    io_error: Meter,
79}
80
81impl HostMetrics {
82    pub(crate) fn new(service: &str, host: &str, port: u16) -> HostMetrics {
83        HostMetrics {
84            service_name: service.to_string(),
85            hostname: host.to_string(),
86            port,
87            last_update: Mutex::new(Instant::now()),
88            response_1xx: Timer::default(),
89            response_2xx: Timer::default(),
90            response_3xx: Timer::default(),
91            response_4xx: Timer::default(),
92            response_5xx: Timer::default(),
93            response_qos: Timer::default(),
94            response_other: Timer::default(),
95            io_error: Meter::default(),
96        }
97    }
98
99    pub(crate) fn update(&self, status: StatusCode, duration: Duration) {
100        *self.last_update.lock() = Instant::now();
101        #[allow(clippy::match_overlapping_arm)]
102        let timer = match status.as_u16() {
103            429 | 503 => &self.response_qos,
104            100..=199 => &self.response_1xx,
105            200..=299 => &self.response_2xx,
106            300..=399 => &self.response_3xx,
107            400..=499 => &self.response_4xx,
108            500..=599 => &self.response_5xx,
109            _ => &self.response_other,
110        };
111        timer.update(duration);
112    }
113
114    pub(crate) fn update_io_error(&self) {
115        *self.last_update.lock() = Instant::now();
116        self.io_error.mark(1);
117    }
118
119    /// Returns the name of the service running on this host.
120    pub fn service_name(&self) -> &str {
121        &self.service_name
122    }
123
124    /// Returns the hostname of the node.
125    pub fn hostname(&self) -> &str {
126        &self.hostname
127    }
128
129    /// Returns the port of the node.
130    pub fn port(&self) -> u16 {
131        self.port
132    }
133
134    /// Returns the time of the last update to the node's health metrics.
135    pub fn last_update(&self) -> Instant {
136        *self.last_update.lock()
137    }
138
139    /// Returns a timer recording requests to this host which returned a 1xx HTTP response.
140    pub fn response_1xx(&self) -> &Timer {
141        &self.response_1xx
142    }
143
144    /// Returns a timer recording requests to this host which returned a 2xx HTTP response.
145    pub fn response_2xx(&self) -> &Timer {
146        &self.response_2xx
147    }
148
149    /// Returns a timer recording requests to this host which returned a 3xx HTTP response.
150    pub fn response_3xx(&self) -> &Timer {
151        &self.response_3xx
152    }
153
154    /// Returns a timer recording requests to this host which returned a 4xx HTTP response (other than 429).
155    pub fn response_4xx(&self) -> &Timer {
156        &self.response_4xx
157    }
158
159    /// Returns a timer recording requests to this host which returned a 5xx HTTP response (other than 503).
160    pub fn response_5xx(&self) -> &Timer {
161        &self.response_5xx
162    }
163
164    /// Returns a timer recording requests to this host which returned a QoS error (a 429 or 503).
165    pub fn response_qos(&self) -> &Timer {
166        &self.response_qos
167    }
168
169    /// Returns a timer recording requests to this host which returned an HTTP response not in the range 100-599.
170    pub fn response_other(&self) -> &Timer {
171        &self.response_other
172    }
173
174    /// Returns a meter recording requests to this host which returned an IO error.
175    pub fn io_error(&self) -> &Meter {
176        &self.io_error
177    }
178}
179
180/// A snapshot of the nodes in a registry.
181pub struct Hosts(Arc<HashMap<HostId, Arc<HostMetrics>>>);
182
183impl Hosts {
184    /// Returns an iterator over the nodes.
185    pub fn iter(&self) -> HostsIter<'_> {
186        HostsIter(self.0.values())
187    }
188}
189
190impl<'a> IntoIterator for &'a Hosts {
191    type Item = &'a HostMetrics;
192    type IntoIter = HostsIter<'a>;
193
194    fn into_iter(self) -> HostsIter<'a> {
195        self.iter()
196    }
197}
198
199/// An iterator over host metrics.
200pub struct HostsIter<'a>(hash_map::Values<'a, HostId, Arc<HostMetrics>>);
201
202impl<'a> Iterator for HostsIter<'a> {
203    type Item = &'a HostMetrics;
204
205    #[inline]
206    fn next(&mut self) -> Option<&'a HostMetrics> {
207        self.0.next().map(|h| &**h)
208    }
209
210    fn size_hint(&self) -> (usize, Option<usize>) {
211        self.0.size_hint()
212    }
213}
214
215#[cfg(test)]
216mod test {
217    use crate::HostMetricsRegistry;
218    use hyper::StatusCode;
219    use std::collections::HashMap;
220    use std::time::Duration;
221
222    #[test]
223    fn node_gc() {
224        let registry = HostMetricsRegistry::new();
225
226        let health1 = registry.get("service", "localhost", 1234);
227        let health2 = registry.get("service", "localhost", 5678);
228
229        health1.update(StatusCode::OK, Duration::from_secs(1));
230        health2.update(StatusCode::INTERNAL_SERVER_ERROR, Duration::from_secs(1));
231
232        let snapshot = registry.hosts();
233        let nodes = snapshot
234            .iter()
235            .map(|h| (h.port(), h))
236            .collect::<HashMap<_, _>>();
237        assert_eq!(nodes.len(), 2);
238        assert_eq!(nodes[&1234].response_2xx().count(), 1);
239        assert_eq!(nodes[&5678].response_5xx().count(), 1);
240        drop(nodes);
241        drop(snapshot);
242
243        drop(health2);
244
245        let snapshot = registry.hosts();
246        let nodes = snapshot
247            .iter()
248            .map(|h| (h.port(), h))
249            .collect::<HashMap<_, _>>();
250        assert_eq!(nodes.len(), 1);
251        assert_eq!(nodes[&1234].response_2xx().count(), 1);
252    }
253}