use crate::action;
use crate::alarm;
use crate::alarm::{Alarm, AlarmBase, DataSink};
use crate::config;
use crate::filter;
use crate::filter::FilterFactory;
use crate::measurement;
use crate::ActionMap;
use crate::{Error, PlaceholderMap, Result};
use async_trait::async_trait;
#[cfg(feature = "docker")]
mod docker_container_status;
mod filesystem_usage;
mod memory_usage;
mod network_throughput;
mod pressure_average;
mod process_exit_status;
mod process_output_integer;
mod process_output_match;
mod systemd_unit_status;
#[cfg(feature = "sensors")]
mod temperature;
#[async_trait]
pub trait Check: Send + Sync {
async fn trigger(&mut self);
fn interval(&self) -> std::time::Duration;
fn name(&self) -> &str;
}
#[async_trait]
pub trait DataSource: Send + Sync {
type Item: Send + Sync + measurement::Measurement;
async fn get_data(
&mut self,
placeholders: &mut PlaceholderMap,
) -> Result<Vec<Result<Option<Self::Item>>>>;
fn format_data(&self, data: &Self::Item) -> String;
fn ids(&self) -> &[String];
}
pub struct CheckBase<T, U>
where
T: DataSource,
U: Alarm,
{
interval: std::time::Duration,
name: String,
timeout: std::time::Duration,
placeholders: PlaceholderMap,
filter: Option<Vec<Box<dyn filter::Filter<T::Item>>>>,
data_source: T,
alarms: Vec<Vec<U>>,
}
impl<T, U> CheckBase<T, U>
where
T: DataSource,
U: Alarm<Item = T::Item>,
{
fn new(
interval: std::time::Duration,
name: String,
timeout: Option<std::time::Duration>,
placeholders: PlaceholderMap,
filter: Option<Vec<Box<dyn filter::Filter<T::Item>>>>,
data_source: T,
alarms: Vec<Vec<U>>,
) -> Result<Self> {
if interval.is_zero() {
Err(Error(String::from("'interval' cannot be 0.")))
} else if name.is_empty() {
Err(Error(String::from("'name' cannot be empty.")))
} else if matches!(timeout, Some(timeout) if timeout.is_zero()) {
Err(Error(String::from("'timeout' cannot be 0.")))
} else if matches!(timeout, Some(timeout) if timeout > interval) {
Err(Error(String::from(
"'timeout' cannot be greater than 'interval'.",
)))
} else {
let timeout = timeout.unwrap_or_else(|| {
interval.min(std::time::Duration::from_secs(
config::default::check_timeout().into(),
))
});
Ok(Self {
interval,
name,
timeout,
placeholders,
filter,
data_source,
alarms,
})
}
}
}
#[async_trait]
impl<T, U> Check for CheckBase<T, U>
where
T: DataSource,
U: Alarm<Item = T::Item>,
{
async fn trigger(&mut self) {
let mut placeholders = crate::global_placeholders();
crate::merge_placeholders(&mut placeholders, &self.placeholders);
placeholders.insert(String::from("check_name"), self.name.clone());
let res =
tokio::time::timeout(self.timeout, self.data_source.get_data(&mut placeholders)).await;
let ids = self.data_source.ids();
let data_vec = match res {
Ok(inner) => inner,
Err(_) => Err(Error(format!(
"Timed out after {} seconds.",
self.timeout.as_secs()
))),
};
let mut data_vec = data_vec.unwrap_or_else(|x| {
let mut res = Vec::new();
for _ in 0..ids.len() {
res.push(Err(x.clone()))
}
res
});
if let Some(filter) = &mut self.filter {
data_vec = data_vec
.into_iter()
.zip(filter.iter_mut())
.map(|(data, filter)| match data {
Ok(Some(data)) => Ok(Some(filter.filter(data))),
Ok(None) => Ok(None),
Err(x) => {
filter.error();
Err(x)
}
})
.collect();
}
for ((i, data), alarms) in data_vec.iter().enumerate().zip(self.alarms.iter_mut()) {
match data {
Ok(data) => match data {
Some(data) => log::debug!(
"Check '{}' got {} for id '{}'.",
self.name,
self.data_source.format_data(data),
ids[i]
),
None => log::debug!("Check '{}' for id '{}' is warming up.", self.name, ids[i]),
},
Err(err) => log::warn!(
"Check '{}' got no data for id '{}': {}",
self.name,
ids[i],
err
),
}
for alarm in alarms.iter_mut() {
let mut placeholders = placeholders.clone();
let result = match data {
Ok(data) => match data {
Some(data) => alarm.put_data(data, placeholders).await,
None => Ok(()),
},
Err(err) => {
placeholders.insert(String::from("check_error"), err.to_string());
alarm.put_error(err, placeholders).await
}
};
if let Err(err) = result {
log::error!("{} had an error: {}", alarm.log_id(), err);
}
}
}
}
fn interval(&self) -> std::time::Duration {
self.interval
}
fn name(&self) -> &str {
self.name.as_str()
}
}
fn factory<'a, T, U>(check_config: &'a config::Check, actions: &ActionMap) -> Result<Box<dyn Check>>
where
T: DataSource + TryFrom<&'a config::Check, Error = Error> + 'static,
U: DataSink<Item = T::Item> + TryFrom<&'a config::Alarm, Error = Error> + 'static,
T::Item: filter::FilterFactory,
{
let data_source = T::try_from(check_config)?;
let mut all_alarms: Vec<Vec<AlarmBase<U>>> = Vec::new();
for (i, id) in data_source.ids().iter().enumerate() {
let mut alarms: Vec<AlarmBase<U>> = Vec::new();
let mut used_names = std::collections::HashSet::new();
for alarm_config in check_config.alarms.iter() {
if !used_names.insert(alarm_config.name.clone()) {
return Err(Error(format!(
"Found duplicate alarm name '{}' for check '{}'.",
alarm_config.name, check_config.name
)));
}
let alarm_log_id = format!(
"Alarm '{}', id '{}' from check '{}'",
alarm_config.name, id, check_config.name
);
if alarm_config.disable {
log::info!("{alarm_log_id} is disabled.");
continue;
}
if i == 0 {
log::info!(
"Alarm '{}' from check '{}' will be triggered after {} bad cycles and recover after {} good cycles.",
alarm_config.name,
check_config.name,
alarm_config.cycles,
alarm_config.recover_cycles
);
}
let data_sink = U::try_from(alarm_config)?;
let alarm_state_machine = alarm::StateMachine::new(
alarm_config.cycles,
alarm_config.repeat_cycles,
alarm_config.recover_cycles,
alarm_config.error_repeat_cycles,
alarm_log_id.clone(),
)?;
let alarm = alarm::AlarmBase::new(
alarm_config.name.clone(),
id.clone(),
action::get_action(&alarm_config.action, actions)?,
alarm_config.placeholders.clone(),
alarm_config
.filter
.as_ref()
.map(T::Item::filter_factory)
.transpose()?,
match &alarm_config.recover_action {
Some(action) => Some(action::get_action(action, actions)?),
None => None,
},
alarm_config.recover_placeholders.clone(),
match &alarm_config.error_action {
Some(action) => Some(action::get_action(action, actions)?),
None => None,
},
alarm_config.error_placeholders.clone(),
match &alarm_config.error_recover_action {
Some(action) => Some(action::get_action(action, actions)?),
None => None,
},
alarm_config.error_recover_placeholders.clone(),
alarm_config.invert,
alarm_state_machine,
data_sink,
alarm_log_id,
)?;
alarms.push(alarm);
}
all_alarms.push(alarms);
}
let filter = check_config
.filter
.as_ref()
.map(|x| {
(0..data_source.ids().len())
.map(|_| T::Item::filter_factory(x))
.collect()
})
.transpose()?;
Ok(Box::new(CheckBase::new(
std::time::Duration::from_secs(check_config.interval.into()),
check_config.name.clone(),
check_config
.timeout
.map(|x| std::time::Duration::from_secs(x.into())),
check_config.placeholders.clone(),
filter,
data_source,
all_alarms,
)?))
}
pub fn from_check_config(
check_config: &config::Check,
actions: &ActionMap,
) -> Result<Box<dyn Check>> {
match &check_config.type_ {
#[cfg(feature = "docker")]
config::CheckType::DockerContainerStatus(_) => factory::<
docker_container_status::DockerContainerStatus,
alarm::BinaryState,
>(check_config, actions),
config::CheckType::FilesystemUsage(_) => {
factory::<filesystem_usage::FilesystemUsage, alarm::Level>(check_config, actions)
}
config::CheckType::MemoryUsage(_) => {
factory::<memory_usage::MemoryUsage, alarm::Level>(check_config, actions)
}
config::CheckType::NetworkThroughput(_) => {
factory::<network_throughput::NetworkThroughput, alarm::DataSize>(check_config, actions)
}
config::CheckType::PressureAverage(_) => {
factory::<pressure_average::PressureAverage, alarm::Level>(check_config, actions)
}
config::CheckType::ProcessExitStatus(_) => factory::<
process_exit_status::ProcessExitStatus,
alarm::StatusCode,
>(check_config, actions),
config::CheckType::ProcessOutputInteger(_) => factory::<
process_output_integer::ProcessOutputInteger,
alarm::Integer,
>(check_config, actions),
config::CheckType::ProcessOutputMatch(_) => factory::<
process_output_match::ProcessOutputMatch,
alarm::BinaryState,
>(check_config, actions),
config::CheckType::SystemdUnitStatus(_) => factory::<
systemd_unit_status::SystemdUnitStatus,
alarm::BinaryState,
>(check_config, actions),
#[cfg(feature = "sensors")]
config::CheckType::Temperature(_) => {
factory::<temperature::Temperature, alarm::Temperature>(check_config, actions)
}
}
.map_err(|x| {
Error(format!(
"Failed to create check '{}' from config: {}",
check_config.name, x
))
})
}