use std::time::{Duration, Instant};
use crossbeam_channel::{self as channel, Receiver, TryRecvError};
use crate::cockpit::Cockpit;
use crate::instruments::Panel;
use crate::snapshot::{ItemKind, Snapshot};
use crate::util;
use crate::Descriptive;
use crate::{
HandlesObservations, Observation, ObservationLike, PutsSnapshot, TelemetryTransmitter,
};
pub trait AggregatesProcessors {
fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P);
fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S);
}
pub(crate) enum TelemetryMessage<L> {
Observation(Observation<L>),
AddCockpit(Cockpit<L>),
RemoveCockpit(String),
AddHandler(Box<dyn HandlesObservations<Label = L>>),
AddPanelToCockpit {
cockpit_name: String,
panel: Panel<L>,
},
RemovePanelFromCockpit {
cockpit_name: String,
panel_name: String,
},
}
pub struct ProcessingOutcome {
pub processed: usize,
pub dropped: usize,
pub instruments_updated: usize,
pub observations_enqueued: usize,
}
impl ProcessingOutcome {
pub fn combine_with(&mut self, other: &ProcessingOutcome) {
self.processed += other.processed;
self.dropped += other.dropped;
self.instruments_updated += other.instruments_updated;
self.observations_enqueued += other.observations_enqueued;
}
pub fn something_happened(&self) -> bool {
self.processed > 0 || self.dropped > 0
}
}
impl Default for ProcessingOutcome {
fn default() -> ProcessingOutcome {
ProcessingOutcome {
processed: 0,
dropped: 0,
instruments_updated: 0,
observations_enqueued: 0,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ProcessingStrategy {
ProcessAll,
DropAll,
DropOlderThan(Duration),
}
impl ProcessingStrategy {
pub(crate) fn decider(&self) -> ProcessingDecider {
match *self {
ProcessingStrategy::ProcessAll => ProcessingDecider::ProcessAll,
ProcessingStrategy::DropAll => ProcessingDecider::DropAll,
ProcessingStrategy::DropOlderThan(max_age) => {
ProcessingDecider::DropBeforeDeadline(Instant::now() - max_age)
}
}
}
}
impl Default for ProcessingStrategy {
fn default() -> Self {
ProcessingStrategy::DropOlderThan(Duration::from_secs(60))
}
}
pub enum ProcessingDecider {
ProcessAll,
DropAll,
DropBeforeDeadline(Instant),
}
impl ProcessingDecider {
pub fn should_be_processed<T: ObservationLike>(&self, observation: &T) -> bool {
match self {
ProcessingDecider::ProcessAll => true,
ProcessingDecider::DropAll => false,
ProcessingDecider::DropBeforeDeadline(drop_deadline) => {
observation.timestamp() > *drop_deadline
}
}
}
}
pub trait ProcessesTelemetryMessages: PutsSnapshot + Send + 'static {
fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome;
}
pub struct TelemetryProcessor<L> {
name: Option<String>,
title: Option<String>,
description: Option<String>,
cockpits: Vec<Cockpit<L>>,
handlers: Vec<Box<dyn HandlesObservations<Label = L>>>,
receiver: Receiver<TelemetryMessage<L>>,
snapshooters: Vec<Box<dyn PutsSnapshot>>,
last_activity_at: Instant,
max_inactivity_duration: Option<Duration>,
is_disconnected: bool,
}
impl<L> TelemetryProcessor<L>
where
L: Clone + Eq + Send + 'static,
{
pub fn new_pair<T: Into<String>>(name: T) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
Self::create(Some(name.into()), None, true)
}
pub fn new_pair_bounded<T: Into<String>>(
name: T,
cap: usize,
block_on_full: bool,
) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
Self::create(Some(name.into()), Some(cap), block_on_full)
}
pub fn new_pair_without_name() -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
Self::create(None, None, true)
}
pub fn new_pair_bounded_without_name(
cap: usize,
block_on_full: bool,
) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
Self::create(None, Some(cap), block_on_full)
}
fn create(
name: Option<String>,
bounded: Option<usize>,
use_send: bool,
) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>) {
let (tx, receiver) = if let Some(bound) = bounded {
channel::bounded(bound)
} else {
channel::unbounded()
};
let transmitter = TelemetryTransmitter {
use_send,
sender: tx,
};
let last_activity_at = Instant::now();
let max_inactivity_duration = None;
let receiver = TelemetryProcessor {
name,
title: None,
description: None,
cockpits: Vec::new(),
handlers: Vec::new(),
snapshooters: Vec::new(),
receiver,
last_activity_at,
max_inactivity_duration,
is_disconnected: false,
};
(transmitter, receiver)
}
pub fn add_cockpit(&mut self, cockpit: Cockpit<L>) {
if let Some(name) = cockpit.get_name() {
if self
.cockpits
.iter()
.any(|c| c.get_name().map(|n| n == name).unwrap_or(false))
{
return;
}
}
self.cockpits.push(cockpit)
}
fn remove_cockpit<T: AsRef<str>>(&mut self, name: T) {
self.cockpits
.retain(|c| c.get_name().map(|n| n != name.as_ref()).unwrap_or(true))
}
pub fn cockpit(mut self, cockpit: Cockpit<L>) -> Self {
self.add_cockpit(cockpit);
self
}
#[deprecated(
since = "0.10.6",
note = "use get_cockpits. this method will change its signature"
)]
pub fn cockpits(&self) -> Vec<&Cockpit<L>> {
self.get_cockpits()
}
pub fn get_cockpits(&self) -> Vec<&Cockpit<L>> {
self.cockpits.iter().map(|p| p).collect()
}
pub fn add_handler<T: HandlesObservations<Label = L>>(&mut self, handler: T) {
self.handlers.push(Box::new(handler));
}
pub fn handler<T: HandlesObservations<Label = L>>(mut self, handler: T) -> Self {
self.add_handler(handler);
self
}
#[deprecated(
since = "0.10.6",
note = "use get_handlers. this method will change its signature"
)]
pub fn handlers(&self) -> Vec<&dyn HandlesObservations<Label = L>> {
self.get_handlers()
}
pub fn get_handlers(&self) -> Vec<&dyn HandlesObservations<Label = L>> {
self.handlers.iter().map(|h| &**h).collect()
}
pub fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
self.snapshooters.push(Box::new(snapshooter));
}
pub fn snapshooter<S: PutsSnapshot>(mut self, snapshooter: S) -> Self {
self.add_snapshooter(snapshooter);
self
}
#[deprecated(
since = "0.10.6",
note = "use get_snapshooters. this method will change its signature"
)]
pub fn snapshooters(&self) -> Vec<&dyn PutsSnapshot> {
self.get_snapshooters()
}
pub fn get_snapshooters(&self) -> Vec<&dyn PutsSnapshot> {
self.snapshooters.iter().map(|p| &**p).collect()
}
#[deprecated(
since = "0.10.6",
note = "use get_name. this method will change its signature"
)]
pub fn name(&self) -> Option<&str> {
self.get_name()
}
pub fn get_name(&self) -> Option<&str> {
self.name.as_ref().map(|n| &**n)
}
pub fn set_name<T: Into<String>>(&mut self, name: T) {
self.name = Some(name.into())
}
pub fn set_inactivity_limit(&mut self, limit: Duration) {
self.max_inactivity_duration = Some(limit);
}
pub fn inactivity_limit(mut self, limit: Duration) -> Self {
self.set_inactivity_limit(limit);
self
}
fn put_values_into_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
util::put_default_descriptives(self, into, descriptive);
if let Some(d) = self.max_inactivity_duration {
if self.last_activity_at.elapsed() > d {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(true)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(false)));
return;
} else {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(false)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(true)));
}
};
self.cockpits
.iter()
.for_each(|c| c.put_snapshot(into, descriptive));
self.handlers
.iter()
.for_each(|h| h.put_snapshot(into, descriptive));
self.snapshooters
.iter()
.for_each(|s| s.put_snapshot(into, descriptive));
}
}
impl<L> ProcessesTelemetryMessages for TelemetryProcessor<L>
where
L: Clone + Eq + Send + 'static,
{
fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome {
if self.is_disconnected {
return ProcessingOutcome::default();
}
let mut num_received = 0;
let mut processed = 0;
let mut instruments_updated = 0;
let mut dropped = 0;
let decider = strategy.decider();
while num_received < max {
match self.receiver.try_recv() {
Ok(TelemetryMessage::Observation(obs)) => {
if decider.should_be_processed(&obs) {
self.cockpits
.iter_mut()
.for_each(|c| instruments_updated += c.handle_observation(&obs));
self.handlers
.iter_mut()
.for_each(|h| instruments_updated += h.handle_observation(&obs));
processed += 1;
} else {
dropped += 1;
}
}
Ok(TelemetryMessage::AddCockpit(c)) => {
self.add_cockpit(c);
processed += 1;
}
Ok(TelemetryMessage::RemoveCockpit(name)) => {
self.remove_cockpit(name);
processed += 1;
}
Ok(TelemetryMessage::AddHandler(h)) => {
self.handlers.push(h);
processed += 1;
}
Ok(TelemetryMessage::AddPanelToCockpit {
cockpit_name,
panel,
}) => {
if let Some(ref mut cockpit) = self
.cockpits
.iter_mut()
.find(|c| c.get_name() == Some(&cockpit_name))
{
cockpit.add_panel(panel);
}
processed += 1;
}
Ok(TelemetryMessage::RemovePanelFromCockpit {
cockpit_name,
panel_name,
}) => {
if let Some(ref mut cockpit) = self
.cockpits
.iter_mut()
.find(|c| c.get_name() == Some(&cockpit_name))
{
cockpit.remove_panel(panel_name);
}
processed += 1;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
let name = self
.name
.as_ref()
.map(|n| &**n)
.unwrap_or_else(|| "<no name>");
util::log_warning(format!(
"Processor '{}' failed to receive message. Channel disconnected. Exiting",
name
));
self.is_disconnected = true;
break;
}
};
num_received += 1;
}
let outcome = ProcessingOutcome {
processed,
dropped,
instruments_updated,
observations_enqueued: self.receiver.len(),
};
if outcome.something_happened() {
self.last_activity_at = Instant::now();
}
outcome
}
}
impl<L> PutsSnapshot for TelemetryProcessor<L>
where
L: Clone + Eq + Send + 'static,
{
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
if let Some(ref name) = self.name {
let mut new_level = Snapshot::default();
self.put_values_into_snapshot(&mut new_level, descriptive);
into.items
.push((name.clone(), ItemKind::Snapshot(new_level)));
} else {
self.put_values_into_snapshot(into, descriptive);
}
}
}
impl<L> Descriptive for TelemetryProcessor<L> {
fn title(&self) -> Option<&str> {
self.title.as_ref().map(|n| &**n)
}
fn description(&self) -> Option<&str> {
self.description.as_ref().map(|n| &**n)
}
}
pub struct ProcessorMount {
name: Option<String>,
title: Option<String>,
description: Option<String>,
processors: Vec<Box<dyn ProcessesTelemetryMessages>>,
snapshooters: Vec<Box<dyn PutsSnapshot>>,
last_activity_at: Instant,
max_inactivity_duration: Option<Duration>,
}
impl ProcessorMount {
pub fn new<T: Into<String>>(name: T) -> ProcessorMount {
let mut mount = ProcessorMount::default();
mount.set_name(name);
mount
}
pub fn name(&self) -> Option<&str> {
self.name.as_ref().map(|n| &**n)
}
pub fn set_name<T: Into<String>>(&mut self, name: T) {
self.name = Some(name.into())
}
pub fn set_inactivity_limit(&mut self, limit: Duration) {
self.max_inactivity_duration = Some(limit);
}
pub fn processors(&self) -> Vec<&dyn ProcessesTelemetryMessages> {
self.processors.iter().map(|p| &**p).collect()
}
pub fn snapshooters(&self) -> Vec<&dyn PutsSnapshot> {
self.snapshooters.iter().map(|s| &**s).collect()
}
fn put_values_into_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
util::put_default_descriptives(self, into, descriptive);
if let Some(d) = self.max_inactivity_duration {
if self.last_activity_at.elapsed() > d {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(true)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(false)));
return;
} else {
into.items
.push(("_inactive".to_string(), ItemKind::Boolean(false)));
into.items
.push(("_active".to_string(), ItemKind::Boolean(true)));
}
};
self.processors
.iter()
.for_each(|p| p.put_snapshot(into, descriptive));
self.snapshooters
.iter()
.for_each(|s| s.put_snapshot(into, descriptive));
}
}
impl Default for ProcessorMount {
fn default() -> ProcessorMount {
ProcessorMount {
name: None,
title: None,
description: None,
processors: Vec::new(),
snapshooters: Vec::new(),
last_activity_at: Instant::now(),
max_inactivity_duration: None,
}
}
}
impl AggregatesProcessors for ProcessorMount {
fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P) {
self.processors.push(Box::new(processor));
}
fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
self.snapshooters.push(Box::new(snapshooter));
}
}
impl ProcessesTelemetryMessages for ProcessorMount {
fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome {
let mut outcome = ProcessingOutcome::default();
for processor in self.processors.iter_mut() {
outcome.combine_with(&processor.process(max, strategy));
}
if outcome.something_happened() {
self.last_activity_at = Instant::now();
}
outcome
}
}
impl PutsSnapshot for ProcessorMount {
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
if let Some(ref name) = self.name {
let mut new_level = Snapshot::default();
self.put_values_into_snapshot(&mut new_level, descriptive);
into.items
.push((name.clone(), ItemKind::Snapshot(new_level)));
} else {
self.put_values_into_snapshot(into, descriptive);
}
}
}
impl Descriptive for ProcessorMount {
fn title(&self) -> Option<&str> {
self.title.as_ref().map(|n| &**n)
}
fn description(&self) -> Option<&str> {
self.description.as_ref().map(|n| &**n)
}
}
#[test]
fn the_telemetry_transmitter_is_sync() {
fn is_sync<T>(_proc: T)
where
T: super::TransmitsTelemetryData<()> + Sync,
{
}
let (tx, _rx) = TelemetryProcessor::new_pair_without_name();
is_sync(tx);
}