metrix/
lib.rs

1//! # metrix
2//!
3//! Metrics for monitoring applications and alerting.
4//!
5//! ## Goal
6//!
7//! Applications/services can have a lot of metrics and one of the greatest
8//! challenges is organizing them. This is what `metrix` tries to help with.
9//!
10//! **Metrix** does not aim for providing exact numbers and aims for
11//! applications monitoring only.
12//!
13//! This crate is in a very **early** stage and the API might still change.
14//! There may be backends provided for monitoring solutions in the future
15//! but currently only a snapshot that can be
16//! serialized to JSON is provided.
17//!
18//! ## How does it work
19//!
20//! **Metrix** is based on observations collected while running your
21//! application. These observations will then be sent to a backend where
22//! the actual metrics(counters etc.) are updated. For the metrics configured
23//! a snapshot can be queried.
24//!
25//! The primary focus of **metrix** is to organize these metrics. There are
26//! several building blocks available. Most of them can have a name that will
27//! then be part of a path within a snapshot.
28//!
29//! ### Labels
30//!
31//! Labels link observations to panels. Labels can be of any type that
32//! implements `Clone + Eq + Send + 'static`. An `enum` is a good choice for a
33//! label.
34//!
35//! ### Observations
36//!
37//! An observation is made somewhere within your application. When an
38//! observation is sent to the backend it must have a label attached. This label
39//! is then matched against the label of a panel to determine whether an
40//! observation is handled for updating or not.
41//!
42//! ### Instruments
43//!
44//! Instruments are gauges, meters, etc. An instrument gets updated by an
45//! observation where an update is meaningful. Instruments are grouped by
46//! `Panel`s.
47//!
48//! You can find instruments in the module `instruments`.
49//!
50//! ### Panels
51//!
52//! A `Panel` groups instruments under same same label. So each instrument
53//! within a panel will be updated by observations that have the same label as
54//! the panel.
55//!
56//! Lets say you defined a label `OutgoingRequests`. If you are interested
57//! in the request rate and the latencies. You would then create a panel with a
58//! label `OutgoingRequests` and add a histogram and a meter.
59//!
60//! ### Cockpit
61//!
62//! A cockpit aggregates multiple `Panel`s. A cockpit can be used to monitor
63//! different tasks/parts of a component or workflow. A cockpit
64//! is bound to a label type.
65//!
66//! An example can be that you have service component that calls an external
67//! HTTP client. You could be interested in successful calls and failed calls
68//! individually. So for both cases you would create a value for your label
69//! and then add two panels to the cockpit.
70//!
71//! Cockpits are in the module `cockpit`.
72//!
73//! ### Processors
74//!
75//! The most important processor is the `TelemetryProcessor`. It has
76//! a label type as a type parameter and consist of a `TelemetryTransmitter`
77//! that sends observations to the backend(used within your app)
78//! and the actual `TelemetryProcessor` that forms the backend and
79//! processes observations. The `TelemetryProcessor`
80//! can **own** several cockpits for a label type.
81//!
82//! There is also a `ProcessorMount` that is label agnostic and can group
83//! several processors. It can also have a name that will be included in the
84//! snapshot.
85//!
86//! The processors can be found the module `processor`.
87//!
88//! ### Driver
89//!
90//! The driver **owns** processors and asks the **owned** processors
91//! to process their messages. You need to add your processors to
92//! a driver to start the machinery. A driver is also a processor
93//! which means it can have a name and it can also be part of another
94//! hierarchy.
95//!
96//! Each driver has its own thread for polling its processors
97//! so even when attached to another
98//! hierarchy all processors registered with the driver will only
99//! be driven by that driver.
100//!
101//!
102//! ## Contributing
103//!
104//! Contributing is welcome. Criticism is also welcome!
105//!
106//! ## License
107//!
108//! Metrix is primarily distributed under the terms of
109//! both the MIT license and the Apache License (Version 2.0).
110//!
111//! Copyright (c) 2018 Christian Douven
112//!
113#[cfg(feature = "log")]
114#[macro_use]
115extern crate log;
116
117use std::time::{Duration, Instant};
118
119use snapshot::Snapshot;
120
121use cockpit::Cockpit;
122use instruments::Panel;
123use processor::TelemetryMessage;
124
125pub use observation::*;
126pub use processor::AggregatesProcessors;
127
128pub mod attached_mount;
129pub mod cockpit;
130pub mod driver;
131pub mod instruments;
132mod observation;
133pub mod processor;
134pub mod snapshot;
135
136pub(crate) mod util;
137
138/// Something that can react on `Observation`s where
139/// the `Label` is the type of the label.
140///
141/// You can use this to implement your own metrics.
142pub trait HandlesObservations: PutsSnapshot + Send + 'static {
143    type Label: Send + 'static;
144    fn handle_observation(&mut self, observation: &Observation<Self::Label>) -> usize;
145}
146
147/// Increments a value by one (e.g. in a `Gauge`)
148#[derive(Debug, Copy, Clone)]
149pub struct Increment;
150/// Increments a value by the given amount (e.g. in a `Gauge`)
151#[derive(Debug, Copy, Clone)]
152pub struct IncrementBy(pub u32);
153/// Decrements a value by one (e.g. in a `Gauge`)
154#[derive(Debug, Copy, Clone)]
155pub struct Decrement;
156/// Decrements a value by the given amount (e.g. in a `Gauge`)
157#[derive(Debug, Copy, Clone)]
158pub struct DecrementBy(pub u32);
159/// Changes a value by the given amount (e.g. in a `Gauge`)
160#[derive(Debug, Copy, Clone)]
161pub struct ChangeBy(pub i64);
162
163/// Transmits telemetry data to the backend.
164///
165/// Implementors should transfer `Observations` to
166/// a backend and manipulate the instruments there to not
167/// to interfere to much with the actual task being measured/observed
168pub trait TransmitsTelemetryData<L> {
169    /// Transit an observation to the backend.
170    fn transmit(&self, observation: Observation<L>) -> &Self;
171
172    /// Observed `count` occurrences at time `timestamp`
173    ///
174    /// Convenience method. Simply calls `transmit`
175    fn observed(&self, label: L, count: u64, timestamp: Instant) -> &Self {
176        self.transmit(Observation::Observed {
177            label,
178            count,
179            timestamp,
180        })
181    }
182
183    /// Observed one occurrence at time `timestamp`
184    ///
185    /// Convenience method. Simply calls `transmit`
186    fn observed_one(&self, label: L, timestamp: Instant) -> &Self {
187        self.transmit(Observation::ObservedOne { label, timestamp })
188    }
189
190    /// Observed one occurrence with value `value` at time `timestamp`
191    ///
192    /// Convenience method. Simply calls `transmit`
193    fn observed_one_value<V: Into<ObservedValue>>(
194        &self,
195        label: L,
196        value: V,
197        timestamp: Instant,
198    ) -> &Self {
199        self.transmit(Observation::ObservedOneValue {
200            label,
201            value: value.into(),
202            timestamp,
203        })
204    }
205
206    /// Sends a `Duration` as an observed value observed at `timestamp`.
207    /// The `Duration` is converted to nanoseconds.
208    fn observed_duration(&self, label: L, duration: Duration, timestamp: Instant) -> &Self {
209        self.observed_one_value(label, duration, timestamp)
210    }
211
212    /// Observed `count` occurrences at now.
213    ///
214    /// Convenience method. Simply calls `observed` with
215    /// the current timestamp.
216    fn observed_now(&self, label: L, count: u64) -> &Self {
217        self.observed(label, count, Instant::now())
218    }
219
220    /// Observed one occurrence now
221    ///
222    /// Convenience method. Simply calls `observed_one` with
223    /// the current timestamp.
224    fn observed_one_now(&self, label: L) -> &Self {
225        self.observed_one(label, Instant::now())
226    }
227
228    /// Observed one occurrence with value `value` now
229    ///
230    /// Convenience method. Simply calls `observed_one_value` with
231    /// the current timestamp.
232    fn observed_one_value_now<V: Into<ObservedValue>>(&self, label: L, value: V) -> &Self {
233        self.observed_one_value(label, value, Instant::now())
234    }
235
236    /// Sends a `Duration` as an observed value observed with the current
237    /// timestamp.
238    ///
239    /// The `Duration` is converted to nanoseconds internally.
240    fn observed_one_duration_now(&self, label: L, duration: Duration) -> &Self {
241        self.observed_duration(label, duration, Instant::now())
242    }
243
244    /// Measures the time from `from` until now.
245    ///
246    /// The resulting duration is an observed value
247    /// with the measured duration in nanoseconds.
248    fn measure_time(&self, label: L, from: Instant) -> &Self {
249        let now = Instant::now();
250        if from <= now {
251            self.observed_duration(label, now - from, now);
252        }
253
254        self
255    }
256
257    /// Add a handler.
258    fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self
259    where
260        L: Send + 'static;
261
262    /// Add a `Copckpit`
263    ///
264    /// If the cockpit has a name and another cockpit with
265    /// the same name is already present the cockpit will
266    /// not be added.
267    fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self;
268
269    fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self;
270
271    /// Add a `Panel` to a `Cockpit` if that `Cockpit` has the
272    /// given name.
273    fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self;
274
275    /// Removes the panel with the given name from a cockpit
276    /// with the given name.
277    ///
278    /// This means the cockpit and the panel must have a name set.
279    fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
280        &self,
281        cockpit_name: U,
282        panel_name: V,
283    ) -> &Self;
284}
285
286/// Transmits `Observation`s to the backend
287///
288/// It is important that the corresponding `TelemetryProcessor`
289/// gets mounted on a driver soon
290/// since otherwise the internal queue will get flooded
291/// with unprocessed observations
292#[derive(Clone)]
293pub struct TelemetryTransmitter<L> {
294    sender: crossbeam_channel::Sender<TelemetryMessage<L>>,
295    // True if we want to block in case the queue
296    // is full. Has no effect on unbounded queue
297    use_send: bool,
298}
299
300impl<L: Send> TelemetryTransmitter<L> {
301    fn send(&self, msg: TelemetryMessage<L>) -> &Self {
302        if self.use_send {
303            if let Err(err) = self.sender.send(msg) {
304                util::log_warning(format!("failed to send telemetry message: {}", err))
305            }
306        } else if let Err(err) = self.sender.try_send(msg) {
307            util::log_warning(format!("failed to send telemetry message: {}", err))
308        }
309        self
310    }
311
312    /// Returns `true` if the internal queue is full.
313    ///
314    /// Always `false` on an unbounded queue
315    pub fn is_queue_full(&self) -> bool {
316        self.sender.is_full()
317    }
318
319    /// Returns `true` if the internal queue is empty.
320    ///
321    /// Always `true` on an unbounded queue
322    pub fn is_queue_empty(&self) -> bool {
323        self.sender.is_empty()
324    }
325
326    pub fn queue_size(&self) -> usize {
327        self.sender.len()
328    }
329
330    /// Returns the capacity of the internal queue.
331    ///
332    /// `None` on an unbounded queue
333    pub fn queue_capacity(&self) -> Option<usize> {
334        self.sender.capacity()
335    }
336}
337
338impl<L: Send + 'static> TransmitsTelemetryData<L> for TelemetryTransmitter<L> {
339    fn transmit(&self, observation: Observation<L>) -> &Self {
340        self.send(TelemetryMessage::Observation(observation))
341    }
342
343    fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self {
344        self.send(TelemetryMessage::AddHandler(Box::new(handler)))
345    }
346
347    fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self {
348        self.send(TelemetryMessage::AddCockpit(cockpit))
349    }
350
351    fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self {
352        self.send(TelemetryMessage::RemoveCockpit(name.into()))
353    }
354
355    fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self {
356        self.send(TelemetryMessage::AddPanelToCockpit {
357            cockpit_name: cockpit_name.into(),
358            panel,
359        })
360    }
361
362    fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
363        &self,
364        cockpit_name: U,
365        panel_name: V,
366    ) -> &Self {
367        self.send(TelemetryMessage::RemovePanelFromCockpit {
368            cockpit_name: cockpit_name.into(),
369            panel_name: panel_name.into(),
370        })
371    }
372}
373
374/// Something that has a title and a description
375///
376/// This is mostly useful for snapshots. When a `Snapshot`
377/// is taken there is usually a parameter `descriptive`
378/// that determines whether title and description should
379/// be part of a `Snapshot`. See also `PutsSnapshot`.
380pub trait Descriptive {
381    fn title(&self) -> Option<&str> {
382        None
383    }
384
385    fn description(&self) -> Option<&str> {
386        None
387    }
388}
389
390/// Implementors are able to write their current data into given `Snapshot`.
391///
392/// Guidelines for writing snapshots:
393///
394/// * A `PutsSnapshot` that has a name should create a new sub snapshot
395/// and add its values there
396///
397/// * A `PutsSnapshot` that does not have a name should add its values
398/// directly to the given snapshot
399///
400/// * When `descriptive` is set to `true` `PutsSnapshot` should put
401/// its `title` and `description` into the same `Snapshot` it put
402/// its values(exception: instruments) thereby not overwriting already
403/// existing descriptions so that the more general top level ones survive.
404///
405/// * When `descriptive` is set to `true` on an instrument the instrument
406/// should put its description into the snapshot it got passed therby adding the
407/// suffixes "_title" and "_description" to its name.
408///
409/// Implementors of this trait can be added to almost all components via
410/// the `add_snapshooter` method which is also defined on trait
411/// `AggregatesProcessors`.
412pub trait PutsSnapshot: Send + 'static {
413    /// Puts the current snapshot values into the given `Snapshot` thereby
414    /// following the guidelines of `PutsSnapshot`.
415    fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool);
416}