metrix/lib.rs
1//! # metrix
2//!
3//! Metrics for monitoring applications and alerting.
4//!
5//! ## Goal
6//!
7//! Applications/services can have a lot of metrics and one of the greatest
8//! challenges is organizing them. This is what `metrix` tries to help with.
9//!
10//! **Metrix** does not aim for providing exact numbers and aims for
11//! applications monitoring only.
12//!
13//! This crate is in a very **early** stage and the API might still change.
14//! There may be backends provided for monitoring solutions in the future
15//! but currently only a snapshot that can be
16//! serialized to JSON is provided.
17//!
18//! ## How does it work
19//!
20//! **Metrix** is based on observations collected while running your
21//! application. These observations will then be sent to a backend where
22//! the actual metrics(counters etc.) are updated. For the metrics configured
23//! a snapshot can be queried.
24//!
25//! The primary focus of **metrix** is to organize these metrics. There are
26//! several building blocks available. Most of them can have a name that will
27//! then be part of a path within a snapshot.
28//!
29//! ### Labels
30//!
31//! Labels link observations to panels. Labels can be of any type that
32//! implements `Clone + Eq + Send + 'static`. An `enum` is a good choice for a
33//! label.
34//!
35//! ### Observations
36//!
37//! An observation is made somewhere within your application. When an
38//! observation is sent to the backend it must have a label attached. This label
39//! is then matched against the label of a panel to determine whether an
40//! observation is handled for updating or not.
41//!
42//! ### Instruments
43//!
44//! Instruments are gauges, meters, etc. An instrument gets updated by an
45//! observation where an update is meaningful. Instruments are grouped by
46//! `Panel`s.
47//!
48//! You can find instruments in the module `instruments`.
49//!
50//! ### Panels
51//!
52//! A `Panel` groups instruments under same same label. So each instrument
53//! within a panel will be updated by observations that have the same label as
54//! the panel.
55//!
56//! Lets say you defined a label `OutgoingRequests`. If you are interested
57//! in the request rate and the latencies. You would then create a panel with a
58//! label `OutgoingRequests` and add a histogram and a meter.
59//!
60//! ### Cockpit
61//!
62//! A cockpit aggregates multiple `Panel`s. A cockpit can be used to monitor
63//! different tasks/parts of a component or workflow. A cockpit
64//! is bound to a label type.
65//!
66//! An example can be that you have service component that calls an external
67//! HTTP client. You could be interested in successful calls and failed calls
68//! individually. So for both cases you would create a value for your label
69//! and then add two panels to the cockpit.
70//!
71//! Cockpits are in the module `cockpit`.
72//!
73//! ### Processors
74//!
75//! The most important processor is the `TelemetryProcessor`. It has
76//! a label type as a type parameter and consist of a `TelemetryTransmitter`
77//! that sends observations to the backend(used within your app)
78//! and the actual `TelemetryProcessor` that forms the backend and
79//! processes observations. The `TelemetryProcessor`
80//! can **own** several cockpits for a label type.
81//!
82//! There is also a `ProcessorMount` that is label agnostic and can group
83//! several processors. It can also have a name that will be included in the
84//! snapshot.
85//!
86//! The processors can be found the module `processor`.
87//!
88//! ### Driver
89//!
90//! The driver **owns** processors and asks the **owned** processors
91//! to process their messages. You need to add your processors to
92//! a driver to start the machinery. A driver is also a processor
93//! which means it can have a name and it can also be part of another
94//! hierarchy.
95//!
96//! Each driver has its own thread for polling its processors
97//! so even when attached to another
98//! hierarchy all processors registered with the driver will only
99//! be driven by that driver.
100//!
101//!
102//! ## Contributing
103//!
104//! Contributing is welcome. Criticism is also welcome!
105//!
106//! ## License
107//!
108//! Metrix is primarily distributed under the terms of
109//! both the MIT license and the Apache License (Version 2.0).
110//!
111//! Copyright (c) 2018 Christian Douven
112//!
113#[cfg(feature = "log")]
114#[macro_use]
115extern crate log;
116
117use std::time::{Duration, Instant};
118
119use snapshot::Snapshot;
120
121use cockpit::Cockpit;
122use instruments::Panel;
123use processor::TelemetryMessage;
124
125pub use observation::*;
126pub use processor::AggregatesProcessors;
127
128pub mod attached_mount;
129pub mod cockpit;
130pub mod driver;
131pub mod instruments;
132mod observation;
133pub mod processor;
134pub mod snapshot;
135
136pub(crate) mod util;
137
138/// Something that can react on `Observation`s where
139/// the `Label` is the type of the label.
140///
141/// You can use this to implement your own metrics.
142pub trait HandlesObservations: PutsSnapshot + Send + 'static {
143 type Label: Send + 'static;
144 fn handle_observation(&mut self, observation: &Observation<Self::Label>) -> usize;
145}
146
147/// Increments a value by one (e.g. in a `Gauge`)
148#[derive(Debug, Copy, Clone)]
149pub struct Increment;
150/// Increments a value by the given amount (e.g. in a `Gauge`)
151#[derive(Debug, Copy, Clone)]
152pub struct IncrementBy(pub u32);
153/// Decrements a value by one (e.g. in a `Gauge`)
154#[derive(Debug, Copy, Clone)]
155pub struct Decrement;
156/// Decrements a value by the given amount (e.g. in a `Gauge`)
157#[derive(Debug, Copy, Clone)]
158pub struct DecrementBy(pub u32);
159/// Changes a value by the given amount (e.g. in a `Gauge`)
160#[derive(Debug, Copy, Clone)]
161pub struct ChangeBy(pub i64);
162
163/// Transmits telemetry data to the backend.
164///
165/// Implementors should transfer `Observations` to
166/// a backend and manipulate the instruments there to not
167/// to interfere to much with the actual task being measured/observed
168pub trait TransmitsTelemetryData<L> {
169 /// Transit an observation to the backend.
170 fn transmit(&self, observation: Observation<L>) -> &Self;
171
172 /// Observed `count` occurrences at time `timestamp`
173 ///
174 /// Convenience method. Simply calls `transmit`
175 fn observed(&self, label: L, count: u64, timestamp: Instant) -> &Self {
176 self.transmit(Observation::Observed {
177 label,
178 count,
179 timestamp,
180 })
181 }
182
183 /// Observed one occurrence at time `timestamp`
184 ///
185 /// Convenience method. Simply calls `transmit`
186 fn observed_one(&self, label: L, timestamp: Instant) -> &Self {
187 self.transmit(Observation::ObservedOne { label, timestamp })
188 }
189
190 /// Observed one occurrence with value `value` at time `timestamp`
191 ///
192 /// Convenience method. Simply calls `transmit`
193 fn observed_one_value<V: Into<ObservedValue>>(
194 &self,
195 label: L,
196 value: V,
197 timestamp: Instant,
198 ) -> &Self {
199 self.transmit(Observation::ObservedOneValue {
200 label,
201 value: value.into(),
202 timestamp,
203 })
204 }
205
206 /// Sends a `Duration` as an observed value observed at `timestamp`.
207 /// The `Duration` is converted to nanoseconds.
208 fn observed_duration(&self, label: L, duration: Duration, timestamp: Instant) -> &Self {
209 self.observed_one_value(label, duration, timestamp)
210 }
211
212 /// Observed `count` occurrences at now.
213 ///
214 /// Convenience method. Simply calls `observed` with
215 /// the current timestamp.
216 fn observed_now(&self, label: L, count: u64) -> &Self {
217 self.observed(label, count, Instant::now())
218 }
219
220 /// Observed one occurrence now
221 ///
222 /// Convenience method. Simply calls `observed_one` with
223 /// the current timestamp.
224 fn observed_one_now(&self, label: L) -> &Self {
225 self.observed_one(label, Instant::now())
226 }
227
228 /// Observed one occurrence with value `value` now
229 ///
230 /// Convenience method. Simply calls `observed_one_value` with
231 /// the current timestamp.
232 fn observed_one_value_now<V: Into<ObservedValue>>(&self, label: L, value: V) -> &Self {
233 self.observed_one_value(label, value, Instant::now())
234 }
235
236 /// Sends a `Duration` as an observed value observed with the current
237 /// timestamp.
238 ///
239 /// The `Duration` is converted to nanoseconds internally.
240 fn observed_one_duration_now(&self, label: L, duration: Duration) -> &Self {
241 self.observed_duration(label, duration, Instant::now())
242 }
243
244 /// Measures the time from `from` until now.
245 ///
246 /// The resulting duration is an observed value
247 /// with the measured duration in nanoseconds.
248 fn measure_time(&self, label: L, from: Instant) -> &Self {
249 let now = Instant::now();
250 if from <= now {
251 self.observed_duration(label, now - from, now);
252 }
253
254 self
255 }
256
257 /// Add a handler.
258 fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self
259 where
260 L: Send + 'static;
261
262 /// Add a `Copckpit`
263 ///
264 /// If the cockpit has a name and another cockpit with
265 /// the same name is already present the cockpit will
266 /// not be added.
267 fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self;
268
269 fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self;
270
271 /// Add a `Panel` to a `Cockpit` if that `Cockpit` has the
272 /// given name.
273 fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self;
274
275 /// Removes the panel with the given name from a cockpit
276 /// with the given name.
277 ///
278 /// This means the cockpit and the panel must have a name set.
279 fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
280 &self,
281 cockpit_name: U,
282 panel_name: V,
283 ) -> &Self;
284}
285
286/// Transmits `Observation`s to the backend
287///
288/// It is important that the corresponding `TelemetryProcessor`
289/// gets mounted on a driver soon
290/// since otherwise the internal queue will get flooded
291/// with unprocessed observations
292#[derive(Clone)]
293pub struct TelemetryTransmitter<L> {
294 sender: crossbeam_channel::Sender<TelemetryMessage<L>>,
295 // True if we want to block in case the queue
296 // is full. Has no effect on unbounded queue
297 use_send: bool,
298}
299
300impl<L: Send> TelemetryTransmitter<L> {
301 fn send(&self, msg: TelemetryMessage<L>) -> &Self {
302 if self.use_send {
303 if let Err(err) = self.sender.send(msg) {
304 util::log_warning(format!("failed to send telemetry message: {}", err))
305 }
306 } else if let Err(err) = self.sender.try_send(msg) {
307 util::log_warning(format!("failed to send telemetry message: {}", err))
308 }
309 self
310 }
311
312 /// Returns `true` if the internal queue is full.
313 ///
314 /// Always `false` on an unbounded queue
315 pub fn is_queue_full(&self) -> bool {
316 self.sender.is_full()
317 }
318
319 /// Returns `true` if the internal queue is empty.
320 ///
321 /// Always `true` on an unbounded queue
322 pub fn is_queue_empty(&self) -> bool {
323 self.sender.is_empty()
324 }
325
326 pub fn queue_size(&self) -> usize {
327 self.sender.len()
328 }
329
330 /// Returns the capacity of the internal queue.
331 ///
332 /// `None` on an unbounded queue
333 pub fn queue_capacity(&self) -> Option<usize> {
334 self.sender.capacity()
335 }
336}
337
338impl<L: Send + 'static> TransmitsTelemetryData<L> for TelemetryTransmitter<L> {
339 fn transmit(&self, observation: Observation<L>) -> &Self {
340 self.send(TelemetryMessage::Observation(observation))
341 }
342
343 fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self {
344 self.send(TelemetryMessage::AddHandler(Box::new(handler)))
345 }
346
347 fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self {
348 self.send(TelemetryMessage::AddCockpit(cockpit))
349 }
350
351 fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self {
352 self.send(TelemetryMessage::RemoveCockpit(name.into()))
353 }
354
355 fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self {
356 self.send(TelemetryMessage::AddPanelToCockpit {
357 cockpit_name: cockpit_name.into(),
358 panel,
359 })
360 }
361
362 fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
363 &self,
364 cockpit_name: U,
365 panel_name: V,
366 ) -> &Self {
367 self.send(TelemetryMessage::RemovePanelFromCockpit {
368 cockpit_name: cockpit_name.into(),
369 panel_name: panel_name.into(),
370 })
371 }
372}
373
374/// Something that has a title and a description
375///
376/// This is mostly useful for snapshots. When a `Snapshot`
377/// is taken there is usually a parameter `descriptive`
378/// that determines whether title and description should
379/// be part of a `Snapshot`. See also `PutsSnapshot`.
380pub trait Descriptive {
381 fn title(&self) -> Option<&str> {
382 None
383 }
384
385 fn description(&self) -> Option<&str> {
386 None
387 }
388}
389
390/// Implementors are able to write their current data into given `Snapshot`.
391///
392/// Guidelines for writing snapshots:
393///
394/// * A `PutsSnapshot` that has a name should create a new sub snapshot
395/// and add its values there
396///
397/// * A `PutsSnapshot` that does not have a name should add its values
398/// directly to the given snapshot
399///
400/// * When `descriptive` is set to `true` `PutsSnapshot` should put
401/// its `title` and `description` into the same `Snapshot` it put
402/// its values(exception: instruments) thereby not overwriting already
403/// existing descriptions so that the more general top level ones survive.
404///
405/// * When `descriptive` is set to `true` on an instrument the instrument
406/// should put its description into the snapshot it got passed therby adding the
407/// suffixes "_title" and "_description" to its name.
408///
409/// Implementors of this trait can be added to almost all components via
410/// the `add_snapshooter` method which is also defined on trait
411/// `AggregatesProcessors`.
412pub trait PutsSnapshot: Send + 'static {
413 /// Puts the current snapshot values into the given `Snapshot` thereby
414 /// following the guidelines of `PutsSnapshot`.
415 fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool);
416}