use std::time::{Duration, Instant};
use crossbeam_channel::{self as channel, Receiver};
use cockpit::Cockpit;
use instruments::Panel;
use snapshot::{ItemKind, Snapshot};
use util;
use Descriptive;
use {HandlesObservations, Observation, ObservationLike, PutsSnapshot, TelemetryTransmitter};
pub trait AggregatesProcessors {
fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P);
fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S);
}
pub(crate) enum TelemetryMessage<L> {
Observation(Observation<L>),
AddCockpit(Cockpit<L>),
AddHandler(Box<HandlesObservations<Label = L>>),
AddPanel {
cockpit_name: String,
panel: Panel<L>,
},
}
pub struct ProcessingOutcome {
pub processed: usize,
pub dropped: usize,
pub instruments_updated: usize,
}
impl ProcessingOutcome {
pub fn combine_with(&mut self, other: &ProcessingOutcome) {
self.processed += other.processed;
self.dropped += other.dropped;
self.instruments_updated += other.instruments_updated;
}
pub fn something_happened(&self) -> bool {
self.processed > 0 || self.dropped > 0
}
}
impl Default for ProcessingOutcome {
fn default() -> ProcessingOutcome {
ProcessingOutcome {
processed: 0,
dropped: 0,
instruments_updated: 0,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ProcessingStrategy {
ProcessAll,
DropAll,
DropOlderThan(Duration),
}
impl ProcessingStrategy {
pub(crate) fn decider(&self) -> ProcessingDecider {
match *self {
ProcessingStrategy::ProcessAll => ProcessingDecider::ProcessAll,
ProcessingStrategy::DropAll => ProcessingDecider::DropAll,
ProcessingStrategy::DropOlderThan(max_age) => {
ProcessingDecider::DropBeforeDeadline(Instant::now() - max_age)
}
}
}
}
impl Default for ProcessingStrategy {
fn default() -> Self {
ProcessingStrategy::DropOlderThan(Duration::from_secs(60))
}
}
pub enum ProcessingDecider {
ProcessAll,
DropAll,
DropBeforeDeadline(Instant),
}
impl ProcessingDecider {
pub fn should_be_processed<T: ObservationLike>(&self, observation: &T) -> bool {
match self {
ProcessingDecider::ProcessAll => true,
ProcessingDecider::DropAll => false,
ProcessingDecider::DropBeforeDeadline(drop_deadline) => {
observation.timestamp() > *drop_deadline
}
}
}
}
pub trait ProcessesTelemetryMessages: PutsSnapshot + Send + 'static {
fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome;
}
pub struct TelemetryProcessor<L> {
name: Option<String>,
title: Option<String>,
description: Option<String>,
cockpits: Vec<Cockpit<L>>,
handlers: Vec<Box<HandlesObservations<Label = L>>>,
receiver: Receiver<TelemetryMessage<L>>,
snapshooters: Vec<Box<PutsSnapshot>>,
last_activity_at: Instant,
max_inactivity_duration: Option<Duration>,
}
impl<L> TelemetryProcessor<L>
where
L: Clone + Eq + Send + 'static,
{
pub fn new_pair<T: Into<String>>(name: T) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
let (tx, rx) = channel::unbounded();
let transmitter = TelemetryTransmitter { sender: tx };
let last_activity_at = Instant::now();
let max_inactivity_duration = None;
let receiver = TelemetryProcessor {
name: Some(name.into()),
title: None,
description: None,
cockpits: Vec::new(),
handlers: Vec::new(),
snapshooters: Vec::new(),
receiver: rx,
last_activity_at,
max_inactivity_duration,
};
(transmitter, receiver)
}
pub fn new_pair_without_name() -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
let (tx, rx) = channel::unbounded();
let transmitter = TelemetryTransmitter { sender: tx };
let last_activity_at = Instant::now();
let max_inactivity_duration = None;
let receiver = TelemetryProcessor {
name: None,
title: None,
description: None,
cockpits: Vec::new(),
handlers: Vec::new(),
snapshooters: Vec::new(),
receiver: rx,
last_activity_at,
max_inactivity_duration,
};
(transmitter, receiver)
}
pub fn add_cockpit(&mut self, cockpit: Cockpit<L>) {
self.cockpits.push(cockpit)
}
pub fn cockpits(&self) -> Vec<&Cockpit<L>> {
self.cockpits.iter().map(|p| p).collect()
}
pub fn add_handler(&mut self, handler: Box<HandlesObservations<Label = L>>) {
self.handlers.push(handler);
}
pub fn handlers(&self) -> Vec<&HandlesObservations<Label = L>> {
self.handlers.iter().map(|h| &**h).collect()
}
pub fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
self.snapshooters.push(Box::new(snapshooter));
}
pub fn snapshooters(&self) -> Vec<&PutsSnapshot> {
self.snapshooters.iter().map(|p| &**p).collect()
}
pub fn name(&self) -> Option<&str> {
self.name.as_ref().map(|n| &**n)
}
pub fn set_name<T: Into<String>>(&mut self, name: T) {
self.name = Some(name.into())
}
pub fn set_inactivity_limit(&mut self, limit: Duration) {
self.max_inactivity_duration = Some(limit);
}
fn put_values_into_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
util::put_default_descriptives(self, into, descriptive);
if let Some(d) = self.max_inactivity_duration {
if self.last_activity_at.elapsed() > d {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(true)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(false)));
return;
} else {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(false)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(true)));
}
};
self.cockpits
.iter()
.for_each(|c| c.put_snapshot(into, descriptive));
self.handlers
.iter()
.for_each(|h| h.put_snapshot(into, descriptive));
self.snapshooters
.iter()
.for_each(|s| s.put_snapshot(into, descriptive));
}
}
impl<L> ProcessesTelemetryMessages for TelemetryProcessor<L>
where
L: Clone + Eq + Send + 'static,
{
fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome {
let mut num_received = 0;
let mut processed = 0;
let mut instruments_updated = 0;
let mut dropped = 0;
let decider = strategy.decider();
while num_received < max {
match self.receiver.try_recv() {
Ok(TelemetryMessage::Observation(obs)) => {
if decider.should_be_processed(&obs) {
self.cockpits
.iter_mut()
.for_each(|c| instruments_updated += c.handle_observation(&obs));
self.handlers
.iter_mut()
.for_each(|h| instruments_updated += h.handle_observation(&obs));
processed += 1;
} else {
dropped += 1;
}
}
Ok(TelemetryMessage::AddCockpit(c)) => {
self.add_cockpit(c);
processed += 1;
}
Ok(TelemetryMessage::AddHandler(h)) => {
self.add_handler(h);
processed += 1;
}
Ok(TelemetryMessage::AddPanel {
cockpit_name,
panel,
}) => {
if let Some(ref mut cockpit) = self
.cockpits
.iter_mut()
.find(|c| c.name() == Some(&cockpit_name))
{
let _ = cockpit.add_panel(panel);
}
processed += 1;
}
Err(err) => {
util::log_error(format!("Failed to process messages: {}", err));
break;
}
};
num_received += 1;
}
let outcome = ProcessingOutcome {
processed,
dropped,
instruments_updated,
};
if outcome.something_happened() {
self.last_activity_at = Instant::now();
}
outcome
}
}
impl<L> PutsSnapshot for TelemetryProcessor<L>
where
L: Clone + Eq + Send + 'static,
{
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
if let Some(ref name) = self.name {
let mut new_level = Snapshot::default();
self.put_values_into_snapshot(&mut new_level, descriptive);
into.items
.push((name.clone(), ItemKind::Snapshot(new_level)));
} else {
self.put_values_into_snapshot(into, descriptive);
}
}
}
impl<L> Descriptive for TelemetryProcessor<L> {
fn title(&self) -> Option<&str> {
self.title.as_ref().map(|n| &**n)
}
fn description(&self) -> Option<&str> {
self.description.as_ref().map(|n| &**n)
}
}
pub struct ProcessorMount {
name: Option<String>,
title: Option<String>,
description: Option<String>,
processors: Vec<Box<ProcessesTelemetryMessages>>,
snapshooters: Vec<Box<PutsSnapshot>>,
last_activity_at: Instant,
max_inactivity_duration: Option<Duration>,
}
impl ProcessorMount {
pub fn new<T: Into<String>>(name: T) -> ProcessorMount {
let mut mount = ProcessorMount::default();
mount.set_name(name);
mount
}
pub fn name(&self) -> Option<&str> {
self.name.as_ref().map(|n| &**n)
}
pub fn set_name<T: Into<String>>(&mut self, name: T) {
self.name = Some(name.into())
}
pub fn set_inactivity_limit(&mut self, limit: Duration) {
self.max_inactivity_duration = Some(limit);
}
pub fn processors(&self) -> Vec<&ProcessesTelemetryMessages> {
self.processors.iter().map(|p| &**p).collect()
}
pub fn snapshooters(&self) -> Vec<&PutsSnapshot> {
self.snapshooters.iter().map(|s| &**s).collect()
}
fn put_values_into_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
util::put_default_descriptives(self, into, descriptive);
if let Some(d) = self.max_inactivity_duration {
if self.last_activity_at.elapsed() > d {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(true)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(false)));
return;
} else {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(false)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(true)));
}
};
self.processors
.iter()
.for_each(|p| p.put_snapshot(into, descriptive));
self.snapshooters
.iter()
.for_each(|s| s.put_snapshot(into, descriptive));
}
}
impl Default for ProcessorMount {
fn default() -> ProcessorMount {
ProcessorMount {
name: None,
title: None,
description: None,
processors: Vec::new(),
snapshooters: Vec::new(),
last_activity_at: Instant::now(),
max_inactivity_duration: None,
}
}
}
impl AggregatesProcessors for ProcessorMount {
fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P) {
self.processors.push(Box::new(processor));
}
fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
self.snapshooters.push(Box::new(snapshooter));
}
}
impl ProcessesTelemetryMessages for ProcessorMount {
fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome {
let mut outcome = ProcessingOutcome::default();
for processor in self.processors.iter_mut() {
outcome.combine_with(&processor.process(max, strategy));
}
if outcome.something_happened() {
self.last_activity_at = Instant::now();
}
outcome
}
}
impl PutsSnapshot for ProcessorMount {
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
if let Some(ref name) = self.name {
let mut new_level = Snapshot::default();
self.put_values_into_snapshot(&mut new_level, descriptive);
into.items
.push((name.clone(), ItemKind::Snapshot(new_level)));
} else {
self.put_values_into_snapshot(into, descriptive);
}
}
}
impl Descriptive for ProcessorMount {
fn title(&self) -> Option<&str> {
self.title.as_ref().map(|n| &**n)
}
fn description(&self) -> Option<&str> {
self.description.as_ref().map(|n| &**n)
}
}