use crate::{
app::NodeInfo,
channels::{RxBundle, TxBundle},
codelet::{CodeletInstance, CodeletStatus, LifecycleStatus, NodeId, Statistics},
opt_vec::OptVec,
prelude::{Acqtime, Codelet, DefaultStatus, Pubtime, SignalTimeValue, SignalValue, Signals},
signals::SignalKind,
};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::{Debug, Display},
hash::Hash,
sync::{Arc, PoisonError, RwLock},
};
pub trait Monitor: Send + Sync + Debug + Display {
fn dtype(&self) -> Option<GaugeDataType>;
fn check(
&self,
pubtime: Pubtime,
value: &GaugeValue,
) -> Result<MonitorStatus, MonitorCheckError>;
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum MonitorCheckError {
#[error("invalid data type: {0:?}")]
InvalidDataType(GaugeDataType),
#[error("monitor check failed: {0}")]
Other(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum GaugeDataType {
Bool,
Int64,
Usize,
Float64,
String,
Pubtime,
Acqtime,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum GaugeValue {
Bool(bool),
Int64(i64),
Usize(usize),
Float64(f64),
String(String),
Pubtime(Pubtime),
Acqtime(Acqtime),
}
impl GaugeValue {
pub fn dtype(&self) -> GaugeDataType {
match self {
GaugeValue::Bool(_) => GaugeDataType::Bool,
GaugeValue::Int64(_) => GaugeDataType::Int64,
GaugeValue::Usize(_) => GaugeDataType::Usize,
GaugeValue::Float64(_) => GaugeDataType::Float64,
GaugeValue::String(_) => GaugeDataType::String,
GaugeValue::Pubtime(_) => GaugeDataType::Pubtime,
GaugeValue::Acqtime(_) => GaugeDataType::Acqtime,
}
}
}
impl From<SignalValue> for GaugeValue {
fn from(other: SignalValue) -> Self {
match other {
SignalValue::Bool(v) => GaugeValue::Bool(v),
SignalValue::Int64(v) => GaugeValue::Int64(v),
SignalValue::Usize(v) => GaugeValue::Usize(v),
SignalValue::Float64(v) => GaugeValue::Float64(v),
SignalValue::String(v) => GaugeValue::String(v),
}
}
}
impl std::fmt::Display for GaugeValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GaugeValue::Bool(v) => write!(f, "{}", v),
GaugeValue::Int64(v) => write!(f, "{}", v),
GaugeValue::Usize(v) => write!(f, "{}", v),
GaugeValue::Float64(v) => write!(f, "{}", v),
GaugeValue::String(v) => write!(f, "{}", v),
GaugeValue::Pubtime(v) => write!(f, "{}", v),
GaugeValue::Acqtime(v) => write!(f, "{}", v),
}
}
}
impl From<bool> for GaugeValue {
fn from(other: bool) -> Self {
GaugeValue::Bool(other)
}
}
impl From<i64> for GaugeValue {
fn from(other: i64) -> Self {
GaugeValue::Int64(other)
}
}
impl From<usize> for GaugeValue {
fn from(other: usize) -> Self {
GaugeValue::Usize(other)
}
}
impl From<f64> for GaugeValue {
fn from(other: f64) -> Self {
GaugeValue::Float64(other)
}
}
impl From<&str> for GaugeValue {
fn from(other: &str) -> Self {
GaugeValue::String(other.into())
}
}
impl From<String> for GaugeValue {
fn from(other: String) -> Self {
GaugeValue::String(other)
}
}
impl From<Pubtime> for GaugeValue {
fn from(other: Pubtime) -> Self {
GaugeValue::Pubtime(other)
}
}
impl From<Acqtime> for GaugeValue {
fn from(other: Acqtime) -> Self {
GaugeValue::Acqtime(other)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MonitorStatus {
Nominal,
Warning,
Critical,
}
impl MonitorStatus {
pub fn combine(self, other: MonitorStatus) -> MonitorStatus {
match (self, other) {
(MonitorStatus::Critical, _) | (_, MonitorStatus::Critical) => MonitorStatus::Critical,
(MonitorStatus::Warning, _) | (_, MonitorStatus::Warning) => MonitorStatus::Warning,
(MonitorStatus::Nominal, MonitorStatus::Nominal) => MonitorStatus::Nominal,
}
}
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum GaugeKey {
SignalValue(String),
SignalPubtime(String),
RxAvailable(String),
TxTotal(String),
TxPubtime(String),
}
#[derive(Default)]
pub(crate) struct AppMonitor {
monitor_def: Option<AppMonitorDef>,
node_to_id: HashMap<String, usize>,
node_names: OptVec<String>,
observables: OptVec<RwLock<NodeObservables>>,
monitors: OptVec<RwLock<NodeMonitors>>,
}
impl AppMonitor {
pub fn to_data(&self) -> AppMonitorData {
AppMonitorData {
observables: self.observables.map(|g| g.read().unwrap().clone()),
monitors: self
.monitors
.iter()
.flat_map(|(nid, g)| {
g.read()
.unwrap()
.monitors
.iter()
.flat_map(move |(key, mg)| {
mg.checks.iter().map(move |(info, _)| MonitorData {
info: info.clone(),
node_id: NodeId(nid),
key: key.clone(),
pubtime: mg.last_time,
value: mg.last_value.clone(),
status: mg.last_status.clone(),
})
})
.collect::<Vec<_>>()
})
.collect(),
}
}
}
pub struct AppMonitorData {
pub observables: OptVec<NodeObservables>,
pub monitors: Vec<MonitorData>,
}
pub struct MonitorData {
pub info: String,
pub node_id: NodeId,
pub key: GaugeKey,
pub pubtime: Option<Pubtime>,
pub value: Result<Option<GaugeValue>, GetValueError>,
pub status: Result<MonitorStatus, MonitorError>,
}
#[derive(Default, Clone)]
pub struct NodeObservables {
pub info: Arc<NodeInfo>,
pub lifecycle_status: LifecycleStatus,
pub status: Option<(String, DefaultStatus)>,
pub signals: Vec<SignalEntry>,
pub statistics: Statistics,
}
impl NodeObservables {
pub fn get_signal(&self, key: &str) -> Result<Option<SignalTimeValue>, GetValueError> {
Ok(self
.signals
.iter()
.find(|e| e.key == *key)
.ok_or(GetValueError::InvalidSignal(key.into()))?
.cell
.clone())
}
pub fn get(&self, key: &GaugeKey) -> Result<Option<GaugeValue>, GetValueError> {
Ok(match key {
GaugeKey::SignalValue(skey) => self.get_signal(skey)?.map(|stv| stv.value.into()),
GaugeKey::SignalPubtime(skey) => self
.get_signal(skey)?
.map(|stv| GaugeValue::Pubtime(stv.time)),
GaugeKey::RxAvailable(ch_name) => Some(GaugeValue::Usize(
self.statistics.rx_available_messages_count[self.rx_index_by_name(ch_name)?],
)),
GaugeKey::TxTotal(ch_name) => Some(GaugeValue::Usize(
self.statistics.tx_published_message_count[self.tx_index_by_name(ch_name)?],
)),
GaugeKey::TxPubtime(ch_name) => self.statistics.tx_last_pubtime
[self.tx_index_by_name(ch_name)?]
.map(|v| GaugeValue::Pubtime(v)),
})
}
fn rx_index_by_name(&self, name: &str) -> Result<usize, GetValueError> {
self.info
.rx_index_by_name(name)
.ok_or_else(|| GetValueError::InvalidRx(name.to_string()))
}
fn tx_index_by_name(&self, name: &str) -> Result<usize, GetValueError> {
self.info
.tx_index_by_name(name)
.ok_or_else(|| GetValueError::InvalidTx(name.to_string()))
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct SignalEntry {
pub key: String,
pub cell: Option<SignalTimeValue>,
}
struct NodeMonitors {
monitors: HashMap<GaugeKey, MonitorGroup>,
last: Result<MonitorStatus, MonitorError>,
}
impl Default for NodeMonitors {
fn default() -> Self {
Self {
monitors: Default::default(),
last: Err(MonitorError::NeverEvaluated),
}
}
}
struct MonitorGroup {
checks: Vec<(String, Box<dyn Monitor>)>,
last_time: Option<Pubtime>,
last_value: Result<Option<GaugeValue>, GetValueError>,
last_status: Result<MonitorStatus, MonitorError>,
}
impl Default for MonitorGroup {
fn default() -> Self {
Self {
checks: Default::default(),
last_time: None,
last_value: Ok(None),
last_status: Err(MonitorError::NeverEvaluated),
}
}
}
impl MonitorGroup {
pub fn add(&mut self, info: String, monitor: Box<dyn Monitor>) {
self.checks.push((info, monitor));
}
pub fn update(
&mut self,
gauge_key: GaugeKey,
pubtime: Pubtime,
value: Result<Option<GaugeValue>, GetValueError>,
) {
match &value {
Err(_) => {
self.last_status = Err(MonitorError::UnknownKey(gauge_key.clone()));
}
Ok(None) => {
self.last_status = Err(MonitorError::ValueNotSet(gauge_key.clone()));
}
Ok(Some(v)) => {
let ret =
self.checks
.iter_mut()
.try_fold(MonitorStatus::Nominal, |acc, (_, monitor)| {
match monitor.check(pubtime, v) {
Ok(status) => Ok(acc.combine(status)),
Err(err) => Err(MonitorError::CheckError(err)),
}
});
self.last_time = Some(pubtime);
self.last_value = value;
self.last_status = ret;
}
}
}
}
impl AppMonitor {
pub(crate) fn add_node<C: Codelet>(
&mut self,
info: Arc<NodeInfo>,
instance: &mut CodeletInstance<C>,
) {
let rx_count = instance.rx.channel_count();
let tx_count = instance.tx.channel_count();
let node_signals = NodeObservables {
info,
lifecycle_status: instance.lifecycle_status,
status: None,
signals: <C::Signals as Signals>::Kind::list()
.iter()
.map(|e| SignalEntry {
key: e.to_string(),
cell: None,
})
.collect(),
statistics: Statistics {
rx_available_messages_count: vec![0; rx_count],
tx_published_message_count: vec![0; tx_count],
tx_last_pubtime: vec![None; tx_count],
..Default::default()
},
};
let mut monitors = NodeMonitors::default();
if let Some(def) = self.monitor_def.as_mut() {
let mut i = 0;
while i < def.entries.len() {
let entry = &def.entries[i];
if entry.0 == instance.name {
let (_node, signal, info, monitor) = def.entries.swap_remove(i);
monitors
.monitors
.entry(signal)
.or_default()
.add(info, monitor);
} else {
i += 1;
}
}
}
let id = instance.id.expect("internal error: setup not called").0;
self.observables.insert(id, RwLock::new(node_signals));
self.monitors.insert(id, RwLock::new(monitors));
self.node_names.insert(id, instance.name.clone());
self.node_to_id.insert(instance.name.clone(), id);
}
pub(crate) fn update_node<C>(
&self,
pubtime: Pubtime,
instance: &CodeletInstance<C>,
) -> Result<(), UpdateNodeError>
where
C: Codelet,
{
let id = instance.id.expect("internal error: setup not called").0;
let observables = &mut self.observables[id].write()?;
let collection = &mut self.monitors[id].write()?;
observables.lifecycle_status = instance.lifecycle_status;
observables.status = instance
.status
.as_ref()
.map(|s| (s.label().to_string(), s.as_default_status()));
observables.statistics.copy_from(&instance.statistics);
update_signals_impl(&instance.signals, &mut observables.signals);
for (gauge_key, monitors) in collection.monitors.iter_mut() {
monitors.update(gauge_key.clone(), pubtime, observables.get(gauge_key));
}
collection.last = collection
.monitors
.iter()
.try_fold(MonitorStatus::Nominal, |acc, (_, e)| {
Ok(acc.combine(e.last_status.clone()?))
});
Ok(())
}
pub fn status(&self) -> Result<MonitorStatus, MonitorError> {
self.monitors
.iter()
.try_fold(MonitorStatus::Nominal, |acc, (_, e)| {
Ok(acc.combine(e.read()?.last.clone()?))
})
}
pub fn get_failed_observables(&self) -> Result<Vec<(String, GaugeKey)>, GetFailedError> {
let mut out = Vec::new();
for (node_id, group) in self.monitors.iter() {
let node_name = &self.node_names[node_id];
for (key, value) in group.read()?.monitors.iter() {
if value.last_status != Ok(MonitorStatus::Nominal) {
out.push((node_name.clone(), key.clone()));
}
}
}
Ok(out)
}
pub fn copy_node_observables(&self, node: &str) -> Result<NodeObservables, GetValueError> {
let id = *self
.node_to_id
.get(node)
.ok_or(GetValueError::InvalidNode(node.into()))?;
Ok(self.observables[id].read()?.clone())
}
pub fn get_signal(
&self,
node: &str,
key: &str,
) -> Result<Option<SignalTimeValue>, GetValueError> {
let id = *self
.node_to_id
.get(node)
.ok_or(GetValueError::InvalidNode(node.into()))?;
self.observables[id].read()?.get_signal(key)
}
pub fn get(&self, node: &str, key: &GaugeKey) -> Result<Option<GaugeValue>, GetValueError> {
let id = *self
.node_to_id
.get(node)
.ok_or(GetValueError::InvalidNode(node.into()))?;
self.observables[id].read()?.get(key)
}
pub fn as_report_string(&self, opts: MonitorReportOptions) -> Result<String, LockError> {
let mut out = String::new();
for (node_id, entry) in self.monitors.iter() {
let node_name = &self.node_names[node_id];
Self::as_report_string_impl(
&mut out,
node_name,
entry.read()?.monitors.iter(),
opts.include_nominal,
opts.always_show_monitors,
);
}
Ok(out)
}
fn as_report_string_impl<'a, I>(
out: &mut String,
node_name: &str,
iter: I,
include_nominal: bool,
always_show_monitors: bool,
) where
I: Iterator<Item = (&'a GaugeKey, &'a MonitorGroup)>,
{
fn colorize(text: &str, color: &str) -> String {
format!("\x1b[{}m{}\x1b[0m", color, text)
}
for (key, group) in iter {
if !include_nominal && group.last_status == Ok(MonitorStatus::Nominal) {
continue;
}
let status_str = match group.last_status {
Err(_) => colorize("ERR", "91"),
Ok(status) => match status {
MonitorStatus::Nominal => colorize(" OK", "92"),
MonitorStatus::Warning => colorize("WRN", "93"),
MonitorStatus::Critical => colorize("CRT", "91"),
},
};
let value_str = match &group.last_value {
Err(_) => colorize("(error)", "91"),
Ok(None) => colorize("N/A", "90"),
Ok(Some(v)) => colorize(&format!("{v}"), "96"),
};
*out += &format!("{} {node_name}/{key:?}): {value_str}\n", status_str);
if group.last_status != Ok(MonitorStatus::Nominal) || always_show_monitors {
for (info, monitor) in group.checks.iter() {
if info != "" {
*out += &format!(" {} {info}\n", colorize("M", "96"));
*out += &format!(" {}\n", colorize(&format!("{monitor}"), "37"));
} else {
*out += &format!(" {} {monitor}\n", colorize("M", "96"));
}
}
}
if let Err(err) = &group.last_status {
*out += &format!(" {}\n", colorize(&format!("{err:?}"), "91"));
}
}
}
}
fn update_signals_impl<G>(src: &G, dst: &mut Vec<SignalEntry>)
where
G: Signals,
{
let iter = src.as_time_value_iter();
assert_eq!(dst.len(), iter.len());
for (e, v) in std::iter::zip(dst.iter_mut(), iter) {
e.cell = v;
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum GetValueError {
#[error("lock poisoned")]
LockPoisoned,
#[error("invalid node: {0}")]
InvalidNode(String),
#[error("invalid signal: {0}")]
InvalidSignal(String),
#[error("invalid Rx channel name: {0}")]
InvalidRx(String),
#[error("invalid TX channel name: {0}")]
InvalidTx(String),
}
impl<T> From<PoisonError<T>> for GetValueError {
fn from(_: PoisonError<T>) -> Self {
GetValueError::LockPoisoned
}
}
#[derive(thiserror::Error, Debug)]
pub enum GetFailedError {
#[error("lock poisoned")]
LockPoisoned,
}
impl<T> From<PoisonError<T>> for GetFailedError {
fn from(_: PoisonError<T>) -> Self {
GetFailedError::LockPoisoned
}
}
#[derive(Default, Debug, Clone)]
pub struct MonitorReportOptions {
pub include_nominal: bool,
pub always_show_monitors: bool,
}
#[derive(thiserror::Error, Debug)]
pub enum UpdateNodeError {
#[error("lock poisoned")]
LockPoisoned,
#[error("invalid node name: {0}")]
InvalidNode(String),
#[error("invalid signal name: {0}/{1}")]
InvalidSignal(String, String),
#[error("invalid data type: actual={0:?}, expected={1:?}")]
InvalidDataType(GaugeDataType, GaugeDataType),
}
impl<T> From<PoisonError<T>> for UpdateNodeError {
fn from(_: PoisonError<T>) -> Self {
UpdateNodeError::LockPoisoned
}
}
#[derive(Clone)]
pub struct SharedAppMonitor(Arc<RwLock<AppMonitor>>);
impl SharedAppMonitor {
pub fn copy_node_observables(&self, node: &str) -> Result<NodeObservables, GetValueError> {
self.0.read()?.copy_node_observables(node)
}
pub fn get_signal(
&self,
node: &str,
key: &str,
) -> Result<Option<SignalTimeValue>, GetValueError> {
self.0.read()?.get_signal(node, key)
}
pub fn get(&self, node: &str, key: &GaugeKey) -> Result<Option<GaugeValue>, GetValueError> {
self.0.read()?.get(node, key)
}
pub fn get_failed_observables(&self) -> Result<Vec<(String, GaugeKey)>, GetFailedError> {
self.0.read()?.get_failed_observables()
}
pub fn setup_node<C: Codelet>(
&self,
info: Arc<NodeInfo>,
instance: &mut CodeletInstance<C>,
) -> Result<(), LockError> {
self.0.write()?.add_node(info, instance);
instance.monitor = Some(self.clone());
Ok(())
}
pub fn update_node<C>(
&self,
pubtime: Pubtime,
instance: &CodeletInstance<C>,
) -> Result<(), UpdateNodeError>
where
C: Codelet,
{
self.0.read()?.update_node::<C>(pubtime, instance)
}
pub fn status(&self) -> Result<MonitorStatus, MonitorError> {
self.0.read()?.status()
}
pub fn as_report_string(&self, opts: MonitorReportOptions) -> Result<String, LockError> {
self.0.read()?.as_report_string(opts)
}
pub fn to_data(&self) -> Result<AppMonitorData, LockError> {
Ok(self.0.read()?.to_data())
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq)]
pub enum LockError {
#[error("lock poisoned")]
LockPoisoned,
}
impl<T> From<PoisonError<T>> for LockError {
fn from(_: PoisonError<T>) -> Self {
LockError::LockPoisoned
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq)]
pub enum MonitorError {
#[error("lock poisoned")]
LockPoisoned,
#[error("never evaluated")]
NeverEvaluated,
#[error("unknown key: {0:?}")]
UnknownKey(GaugeKey),
#[error("value not set: {0:?}")]
ValueNotSet(GaugeKey),
#[error("{0}")]
CheckError(MonitorCheckError),
}
impl<T> From<PoisonError<T>> for MonitorError {
fn from(_: PoisonError<T>) -> Self {
MonitorError::LockPoisoned
}
}
#[derive(Default)]
pub struct AppMonitorDef {
entries: Vec<(String, GaugeKey, String, Box<dyn Monitor>)>,
}
impl AppMonitorDef {
pub fn new() -> Self {
Self::default()
}
pub fn push<M, S1, S3>(
&mut self,
node: S1,
key: GaugeKey,
info: S3,
monitor: M,
) -> Result<(), AppMonitorDefPushError>
where
S1: Into<String>,
S3: Into<String>,
M: 'static + Monitor,
{
self.entries
.push((node.into(), key, info.into(), Box::new(monitor)));
Ok(())
}
}
#[derive(thiserror::Error, Debug)]
pub enum AppMonitorDefPushError {}
impl From<AppMonitorDef> for AppMonitor {
fn from(other: AppMonitorDef) -> Self {
let mut out = AppMonitor::default();
out.monitor_def = Some(other);
out
}
}
impl From<AppMonitorDef> for SharedAppMonitor {
fn from(other: AppMonitorDef) -> Self {
Self(Arc::new(RwLock::new(other.into())))
}
}
macro_rules! impl_binary_signal_monitor {
($type:ty, $dtype:ident) => {
impl Monitor for $type {
fn dtype(&self) -> Option<GaugeDataType> {
Some(GaugeDataType::$dtype)
}
fn check(
&self,
_: Pubtime,
value: &GaugeValue,
) -> Result<MonitorStatus, MonitorCheckError> {
match value {
GaugeValue::$dtype(value) => Ok(if self.check_impl((*value).into()) {
MonitorStatus::Nominal
} else {
MonitorStatus::Critical
}),
other => Err(MonitorCheckError::InvalidDataType(other.dtype())),
}
}
}
};
}
pub mod monitors {
use crate::{
monitors::{GaugeDataType, GaugeValue, Monitor, MonitorCheckError, MonitorStatus},
prelude::Pubtime,
};
use std::{collections::HashSet, fmt::Debug, ops::RangeBounds, time::Duration};
#[derive(Debug)]
pub struct MaxAge {
warn_max_age: Option<Duration>,
crit_max_age: Duration,
}
impl MaxAge {
pub fn new_warn_crit(
warn_max_age: Duration,
crit_max_age: Duration,
) -> Result<Self, MaxAgeError> {
if warn_max_age >= crit_max_age {
Err(MaxAgeError::WarnMaxAgeTooLarge)
} else {
Ok(Self {
warn_max_age: Some(warn_max_age),
crit_max_age,
})
}
}
pub fn new_crit(crit_max_age: Duration) -> Self {
Self {
warn_max_age: None,
crit_max_age,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum MaxAgeError {
#[error("warn max age must be smaller than crit max age")]
WarnMaxAgeTooLarge,
}
impl Monitor for MaxAge {
fn dtype(&self) -> Option<GaugeDataType> {
Some(GaugeDataType::Pubtime)
}
fn check(
&self,
pubtime: Pubtime,
value: &GaugeValue,
) -> Result<MonitorStatus, MonitorCheckError> {
match value {
GaugeValue::Pubtime(pt) => Ok(if *pubtime > **pt + self.crit_max_age {
MonitorStatus::Critical
} else {
if let Some(warn_max_age) = self.warn_max_age {
if *pubtime > **pt + warn_max_age {
MonitorStatus::Warning
} else {
MonitorStatus::Nominal
}
} else {
MonitorStatus::Nominal
}
}),
other => Err(MonitorCheckError::InvalidDataType(other.dtype())),
}
}
}
impl std::fmt::Display for MaxAge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(warn_max_age) = self.warn_max_age {
write!(
f,
"Max age: warning {:?}, critical{:?}",
warn_max_age, self.crit_max_age
)
} else {
write!(f, "Max age {:?}", self.crit_max_age)
}
}
}
#[derive(Debug)]
pub struct MinAge {
warn_min_age: Option<Duration>,
crit_min_age: Duration,
}
impl MinAge {
pub fn new_warn_crit(
warn_min_age: Duration,
crit_min_age: Duration,
) -> Result<Self, MinAgeError> {
if warn_min_age <= crit_min_age {
Err(MinAgeError::WarnMinAgeTooSmall)
} else {
Ok(Self {
warn_min_age: Some(warn_min_age),
crit_min_age,
})
}
}
pub fn new_crit(crit_min_age: Duration) -> Self {
Self {
warn_min_age: None,
crit_min_age,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum MinAgeError {
#[error("warn min age must be greater than crit min age")]
WarnMinAgeTooSmall,
}
impl Monitor for MinAge {
fn dtype(&self) -> Option<GaugeDataType> {
Some(GaugeDataType::Pubtime)
}
fn check(
&self,
pubtime: Pubtime,
value: &GaugeValue,
) -> Result<MonitorStatus, MonitorCheckError> {
match value {
GaugeValue::Pubtime(pt) => Ok(if *pubtime < **pt + self.crit_min_age {
MonitorStatus::Critical
} else {
if let Some(warn_min_age) = self.warn_min_age {
if *pubtime < **pt + warn_min_age {
MonitorStatus::Warning
} else {
MonitorStatus::Nominal
}
} else {
MonitorStatus::Nominal
}
}),
other => Err(MonitorCheckError::InvalidDataType(other.dtype())),
}
}
}
impl std::fmt::Display for MinAge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(warn_min_age) = self.warn_min_age {
write!(
f,
"Max age: warning {:?}, critical{:?}",
warn_min_age, self.crit_min_age
)
} else {
write!(f, "Max age {:?}", self.crit_min_age)
}
}
}
#[derive(Debug)]
pub struct Equals(GaugeValue);
impl Equals {
pub fn new<T>(value: T) -> Self
where
T: Into<GaugeValue>,
{
Self(value.into())
}
}
impl Monitor for Equals {
fn dtype(&self) -> Option<GaugeDataType> {
Some(self.0.dtype())
}
fn check(
&self,
_: Pubtime,
value: &GaugeValue,
) -> Result<MonitorStatus, MonitorCheckError> {
if value.dtype() == self.0.dtype() {
Ok(if *value == self.0 {
MonitorStatus::Nominal
} else {
MonitorStatus::Critical
})
} else {
Err(MonitorCheckError::InvalidDataType(value.dtype()))
}
}
}
impl std::fmt::Display for Equals {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Equals ")?;
write!(f, "{}", self.0)
}
}
#[derive(Debug)]
pub struct CheckTrue;
impl CheckTrue {
fn check_impl(&self, value: bool) -> bool {
value
}
}
impl_binary_signal_monitor!(CheckTrue, Bool);
impl std::fmt::Display for CheckTrue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Check true")
}
}
#[derive(Debug)]
pub struct IntAllowList(HashSet<i64>);
impl IntAllowList {
pub fn from_iter<I>(items: I) -> Self
where
I: IntoIterator<Item = i64>,
{
Self(items.into_iter().collect())
}
fn check_impl(&self, value: i64) -> bool {
self.0.contains(&value)
}
}
impl_binary_signal_monitor!(IntAllowList, Int64);
impl std::fmt::Display for IntAllowList {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut values: Vec<i64> = self.0.iter().copied().collect();
values.sort();
write!(f, "Check in allowed list: {:?}", values)
}
}
#[derive(Debug)]
pub struct FloatAllowRange<R>(R);
impl<R> FloatAllowRange<R> {
pub fn new(range: R) -> Self
where
R: RangeBounds<f64>,
{
Self(range)
}
fn check_impl(&self, value: f64) -> bool
where
R: RangeBounds<f64>,
{
self.0.contains(&value)
}
}
impl<R> Monitor for FloatAllowRange<R>
where
R: RangeBounds<f64> + Send + Sync + Debug,
{
fn dtype(&self) -> Option<GaugeDataType> {
Some(GaugeDataType::Float64)
}
fn check(
&self,
_: Pubtime,
value: &GaugeValue,
) -> Result<MonitorStatus, MonitorCheckError> {
match value {
GaugeValue::Float64(value) => Ok(if self.check_impl(*value) {
MonitorStatus::Nominal
} else {
MonitorStatus::Critical
}),
other => Err(MonitorCheckError::InvalidDataType(other.dtype())),
}
}
}
impl<R> std::fmt::Display for FloatAllowRange<R>
where
R: RangeBounds<f64> + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Check in range: {:?}", self.0)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_max_age() {
let m = MaxAge::new_warn_crit(Duration::from_millis(500), Duration::from_millis(1000))
.unwrap();
assert_eq!(m.dtype(), Some(GaugeDataType::Pubtime));
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(2300).into())
),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1800).into())
),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1500).into())
),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1499).into())
),
Ok(MonitorStatus::Warning)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1000).into())
),
Ok(MonitorStatus::Warning)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(999).into())
),
Ok(MonitorStatus::Critical)
);
}
#[test]
fn test_min_age() {
let m = MinAge::new_warn_crit(Duration::from_millis(1000), Duration::from_millis(500))
.unwrap();
assert_eq!(m.dtype(), Some(GaugeDataType::Pubtime));
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(800).into())
),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1000).into())
),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1001).into())
),
Ok(MonitorStatus::Warning)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1500).into())
),
Ok(MonitorStatus::Warning)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(1501).into())
),
Ok(MonitorStatus::Critical)
);
assert_eq!(
m.check(
Duration::from_millis(2000).into(),
&GaugeValue::Pubtime(Duration::from_millis(2100).into())
),
Ok(MonitorStatus::Critical)
);
}
#[test]
fn test_check_true() {
let m = CheckTrue;
assert_eq!(m.dtype(), Some(GaugeDataType::Bool));
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Bool(true)),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Bool(false)),
Ok(MonitorStatus::Critical)
);
}
#[test]
fn test_int_allow_list() {
let m = IntAllowList::from_iter([3, 10, 9]);
assert_eq!(m.dtype(), Some(GaugeDataType::Int64));
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Int64(9)),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Int64(4)),
Ok(MonitorStatus::Critical)
);
}
#[test]
fn test_float_allow_range_1() {
let m = FloatAllowRange::new(3.1..2.7);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(2.9)),
Ok(MonitorStatus::Critical)
);
}
#[test]
fn test_float_allow_range_2() {
let m = FloatAllowRange::new(2.7..3.1);
assert_eq!(m.dtype(), Some(GaugeDataType::Float64));
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(2.6999)),
Ok(MonitorStatus::Critical)
);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(2.7)),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(3.0)),
Ok(MonitorStatus::Nominal)
);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(3.1)),
Ok(MonitorStatus::Critical)
);
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(3.9)),
Ok(MonitorStatus::Critical)
);
}
#[test]
fn test_float_allow_range_3() {
let m = FloatAllowRange::new(2.7..=3.1);
assert_eq!(m.dtype(), Some(GaugeDataType::Float64));
assert_eq!(
m.check(Duration::default().into(), &GaugeValue::Float64(3.1)),
Ok(MonitorStatus::Nominal)
);
}
}
}