#[cfg(feature = "log")]
#[macro_use]
extern crate log;
use snapshot::Snapshot;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use cockpit::Cockpit;
use instruments::Panel;
use processor::TelemetryMessage;
pub use observation::*;
pub use processor::AggregatesProcessors;
pub mod cockpit;
pub mod driver;
pub mod instruments;
mod observation;
pub mod processor;
pub mod snapshot;
pub(crate) mod util;
pub trait HandlesObservations: PutsSnapshot + Send + 'static {
type Label: Send + 'static;
fn handle_observation(&mut self, observation: &Observation<Self::Label>) -> usize;
}
#[deprecated(since = "0.10.0", note = "use a bool directly")]
pub const TRUE: u64 = 1;
#[deprecated(since = "0.10.0", note = "use a bool directly")]
pub const FALSE: u64 = 0;
#[deprecated(since = "0.10.0", note = "use crate::Increment")]
pub const INCR: u64 = std::u64::MAX;
#[deprecated(since = "0.10.0", note = "use crate::Decrement")]
pub const DECR: u64 = std::u64::MAX - 1;
#[derive(Debug, Copy, Clone)]
pub struct Increment;
#[derive(Debug, Copy, Clone)]
pub struct IncrementBy(pub u32);
#[derive(Debug, Copy, Clone)]
pub struct Decrement;
#[derive(Debug, Copy, Clone)]
pub struct DecrementBy(u32);
#[derive(Debug, Copy, Clone)]
pub struct ChangeBy(pub i64);
pub trait TransmitsTelemetryData<L> {
fn transmit(&self, observation: Observation<L>) -> &Self;
fn observed(&self, label: L, count: u64, timestamp: Instant) -> &Self {
self.transmit(Observation::Observed {
label,
count,
timestamp,
})
}
fn observed_one(&self, label: L, timestamp: Instant) -> &Self {
self.transmit(Observation::ObservedOne { label, timestamp })
}
fn observed_one_value<V: Into<ObservedValue>>(
&self,
label: L,
value: V,
timestamp: Instant,
) -> &Self {
self.transmit(Observation::ObservedOneValue {
label,
value: value.into(),
timestamp,
})
}
fn observed_duration(&self, label: L, duration: Duration, timestamp: Instant) -> &Self {
self.observed_one_value(label, duration, timestamp)
}
fn observed_now(&self, label: L, count: u64) -> &Self {
self.observed(label, count, Instant::now())
}
fn observed_one_now(&self, label: L) -> &Self {
self.observed_one(label, Instant::now())
}
fn observed_one_value_now<V: Into<ObservedValue>>(&self, label: L, value: V) -> &Self {
self.observed_one_value(label, value, Instant::now())
}
fn observed_one_duration_now(&self, label: L, duration: Duration) -> &Self {
self.observed_duration(label, duration, Instant::now())
}
fn measure_time(&self, label: L, from: Instant) -> &Self {
let now = Instant::now();
if from <= now {
self.observed_duration(label, now - from, now);
}
self
}
fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self
where
L: Send + 'static;
fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self;
fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self;
fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self;
fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
&self,
cockpit_name: U,
panel_name: V,
) -> &Self;
}
#[derive(Clone)]
pub struct TelemetryTransmitter<L> {
sender: crossbeam_channel::Sender<TelemetryMessage<L>>,
}
impl<L> TelemetryTransmitter<L>
where
L: Send + 'static,
{
#[deprecated(since = "0.10.16", note = "Use TelemetryTransmitter since it is sync")]
pub fn synced(&self) -> TelemetryTransmitterSync<L> {
TelemetryTransmitterSync {
sender: Arc::new(Mutex::new(self.sender.clone())),
}
}
}
impl<L> TransmitsTelemetryData<L> for TelemetryTransmitter<L> {
fn transmit(&self, observation: Observation<L>) -> &Self {
if let Err(err) = self.sender.send(TelemetryMessage::Observation(observation)) {
util::log_error(format!("Failed to transmit observation: {}", err));
};
self
}
fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self
where
L: Send + 'static,
{
if let Err(err) = self
.sender
.send(TelemetryMessage::AddHandler(Box::new(handler)))
{
util::log_error(format!("Failed to add handler: {}", err));
};
self
}
fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self {
if let Err(err) = self.sender.send(TelemetryMessage::AddCockpit(cockpit)) {
util::log_error(format!("Failed to add cockpit: {}", err));
};
self
}
fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self {
if let Err(err) = self
.sender
.send(TelemetryMessage::RemoveCockpit(name.into()))
{
util::log_error(format!("Failed to remove cockpit: {}", err));
};
self
}
fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self {
if let Err(err) = self.sender.send(TelemetryMessage::AddPanelToCockpit {
cockpit_name: cockpit_name.into(),
panel,
}) {
util::log_error(format!("Failed to add panel to cockpit: {}", err));
};
self
}
fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
&self,
cockpit_name: U,
panel_name: V,
) -> &Self {
if let Err(err) = self.sender.send(TelemetryMessage::RemovePanelFromCockpit {
cockpit_name: cockpit_name.into(),
panel_name: panel_name.into(),
}) {
util::log_error(format!("Failed to add panel to cockpit: {}", err));
};
self
}
}
#[derive(Clone)]
#[deprecated(since = "0.10.16", note = "Use TelemetryTransmitter since it is sync")]
pub struct TelemetryTransmitterSync<L> {
sender: Arc<Mutex<crossbeam_channel::Sender<TelemetryMessage<L>>>>,
}
impl<L> TelemetryTransmitterSync<L> where L: Send + 'static {}
impl<L> TransmitsTelemetryData<L> for TelemetryTransmitterSync<L> {
fn transmit(&self, observation: Observation<L>) -> &Self {
if let Err(err) = self
.sender
.lock()
.unwrap()
.send(TelemetryMessage::Observation(observation))
{
util::log_error(format!("Failed to transmit observation: {}", err));
};
self
}
fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self
where
L: Send + 'static,
{
if let Err(err) = self
.sender
.lock()
.unwrap()
.send(TelemetryMessage::AddHandler(Box::new(handler)))
{
util::log_error(format!("Failed to add handler: {}", err));
};
self
}
fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self {
if let Err(err) = self
.sender
.lock()
.unwrap()
.send(TelemetryMessage::AddCockpit(cockpit))
{
util::log_error(format!("Failed to add cockpit: {}", err));
};
self
}
fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self {
if let Err(err) = self
.sender
.lock()
.unwrap()
.send(TelemetryMessage::RemoveCockpit(name.into()))
{
util::log_error(format!("Failed to remove cockpit: {}", err));
};
self
}
fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self {
if let Err(err) = self
.sender
.lock()
.unwrap()
.send(TelemetryMessage::AddPanelToCockpit {
cockpit_name: cockpit_name.into(),
panel,
})
{
util::log_error(format!("Failed to add panel to cockpit: {}", err));
};
self
}
fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
&self,
cockpit_name: U,
panel_name: V,
) -> &Self {
if let Err(err) =
self.sender
.lock()
.unwrap()
.send(TelemetryMessage::RemovePanelFromCockpit {
cockpit_name: cockpit_name.into(),
panel_name: panel_name.into(),
})
{
util::log_error(format!("Failed to add panel to cockpit: {}", err));
};
self
}
}
pub trait Descriptive {
fn title(&self) -> Option<&str> {
None
}
fn description(&self) -> Option<&str> {
None
}
}
pub trait PutsSnapshot: Send + 'static {
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool);
}