#[cfg(feature = "log")]
#[macro_use]
extern crate log;
use std::time::{Duration, Instant};
use snapshot::Snapshot;
use cockpit::Cockpit;
use instruments::Panel;
use processor::TelemetryMessage;
pub use observation::*;
pub use processor::AggregatesProcessors;
pub mod attached_mount;
pub mod cockpit;
pub mod driver;
pub mod instruments;
mod observation;
pub mod processor;
pub mod snapshot;
pub(crate) mod util;
pub trait HandlesObservations: PutsSnapshot + Send + 'static {
type Label: Send + 'static;
fn handle_observation(&mut self, observation: &Observation<Self::Label>) -> usize;
}
#[derive(Debug, Copy, Clone)]
pub struct Increment;
#[derive(Debug, Copy, Clone)]
pub struct IncrementBy(pub u32);
#[derive(Debug, Copy, Clone)]
pub struct Decrement;
#[derive(Debug, Copy, Clone)]
pub struct DecrementBy(pub u32);
#[derive(Debug, Copy, Clone)]
pub struct ChangeBy(pub i64);
pub trait TransmitsTelemetryData<L> {
fn transmit(&self, observation: Observation<L>) -> &Self;
fn observed(&self, label: L, count: u64, timestamp: Instant) -> &Self {
self.transmit(Observation::Observed {
label,
count,
timestamp,
})
}
fn observed_one(&self, label: L, timestamp: Instant) -> &Self {
self.transmit(Observation::ObservedOne { label, timestamp })
}
fn observed_one_value<V: Into<ObservedValue>>(
&self,
label: L,
value: V,
timestamp: Instant,
) -> &Self {
self.transmit(Observation::ObservedOneValue {
label,
value: value.into(),
timestamp,
})
}
fn observed_duration(&self, label: L, duration: Duration, timestamp: Instant) -> &Self {
self.observed_one_value(label, duration, timestamp)
}
fn observed_now(&self, label: L, count: u64) -> &Self {
self.observed(label, count, Instant::now())
}
fn observed_one_now(&self, label: L) -> &Self {
self.observed_one(label, Instant::now())
}
fn observed_one_value_now<V: Into<ObservedValue>>(&self, label: L, value: V) -> &Self {
self.observed_one_value(label, value, Instant::now())
}
fn observed_one_duration_now(&self, label: L, duration: Duration) -> &Self {
self.observed_duration(label, duration, Instant::now())
}
fn measure_time(&self, label: L, from: Instant) -> &Self {
let now = Instant::now();
if from <= now {
self.observed_duration(label, now - from, now);
}
self
}
fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self
where
L: Send + 'static;
fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self;
fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self;
fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self;
fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
&self,
cockpit_name: U,
panel_name: V,
) -> &Self;
}
#[derive(Clone)]
pub struct TelemetryTransmitter<L> {
sender: crossbeam_channel::Sender<TelemetryMessage<L>>,
use_send: bool,
}
impl<L: Send> TelemetryTransmitter<L> {
fn send(&self, msg: TelemetryMessage<L>) -> &Self {
if self.use_send {
if let Err(err) = self.sender.send(msg) {
util::log_warning(format!("failed to send telemetry message: {}", err))
}
} else if let Err(err) = self.sender.try_send(msg) {
util::log_warning(format!("failed to send telemetry message: {}", err))
}
self
}
pub fn is_queue_full(&self) -> bool {
self.sender.is_full()
}
pub fn is_queue_empty(&self) -> bool {
self.sender.is_empty()
}
pub fn queue_size(&self) -> usize {
self.sender.len()
}
pub fn queue_capacity(&self) -> Option<usize> {
self.sender.capacity()
}
}
impl<L: Send + 'static> TransmitsTelemetryData<L> for TelemetryTransmitter<L> {
fn transmit(&self, observation: Observation<L>) -> &Self {
self.send(TelemetryMessage::Observation(observation))
}
fn add_handler<H: HandlesObservations<Label = L>>(&self, handler: H) -> &Self {
self.send(TelemetryMessage::AddHandler(Box::new(handler)))
}
fn add_cockpit(&self, cockpit: Cockpit<L>) -> &Self {
self.send(TelemetryMessage::AddCockpit(cockpit))
}
fn remove_cockpit<T: Into<String>>(&self, name: T) -> &Self {
self.send(TelemetryMessage::RemoveCockpit(name.into()))
}
fn add_panel_to_cockpit<T: Into<String>>(&self, cockpit_name: T, panel: Panel<L>) -> &Self {
self.send(TelemetryMessage::AddPanelToCockpit {
cockpit_name: cockpit_name.into(),
panel,
})
}
fn remove_panel_from_cockpit<U: Into<String>, V: Into<String>>(
&self,
cockpit_name: U,
panel_name: V,
) -> &Self {
self.send(TelemetryMessage::RemovePanelFromCockpit {
cockpit_name: cockpit_name.into(),
panel_name: panel_name.into(),
})
}
}
pub trait Descriptive {
fn title(&self) -> Option<&str> {
None
}
fn description(&self) -> Option<&str> {
None
}
}
pub trait PutsSnapshot: Send + 'static {
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool);
}