use std::io::Read;
use eyre::{eyre, Result};
use itertools::Itertools;
use log::{debug, log_enabled, trace, warn};
use tiny_http::{Method, Request, Response};
use crate::{
collectd::payload::Payload,
http_server::{HttpHandler, HttpHandlerResult},
metrics::{
system_metrics::{
SystemMetricConfig, CPU_METRIC_NAMESPACE, DISKSPACE_METRIC_NAMESPACE_LEGACY,
DISKSTATS_METRIC_NAMESPACE, MEMORY_METRIC_NAMESPACE,
NETWORK_INTERFACE_METRIC_NAMESPACE, PROCESSES_METRIC_NAMESPACE,
THERMAL_METRIC_NAMESPACE,
},
KeyedMetricReading, MetricsMBox,
},
};
#[derive(Clone)]
pub struct CollectdHandler {
data_collection_enabled: bool,
builtin_system_metric_collection_enabled: bool,
metrics_mbox: MetricsMBox,
filtered_namespaces: Vec<String>,
}
impl CollectdHandler {
pub fn new(
data_collection_enabled: bool,
system_metric_config: SystemMetricConfig,
metrics_mbox: MetricsMBox,
) -> Self {
let builtin_system_metric_collection_enabled = system_metric_config.enable;
let mut filtered_namespaces = vec![];
if builtin_system_metric_collection_enabled {
if system_metric_config.cpu_metrics_enabled() {
filtered_namespaces.push(CPU_METRIC_NAMESPACE.to_string() + "/")
}
if system_metric_config.memory_metrics_enabled() {
filtered_namespaces.push(MEMORY_METRIC_NAMESPACE.to_string() + "/")
}
if system_metric_config.thermal_metrics_enabled() {
filtered_namespaces.push(THERMAL_METRIC_NAMESPACE.to_string() + "/")
}
if system_metric_config.disk_space_metrics_enabled() {
filtered_namespaces.push(DISKSPACE_METRIC_NAMESPACE_LEGACY.to_string() + "/")
}
if system_metric_config.diskstats_metrics_enabled() {
filtered_namespaces.push(DISKSTATS_METRIC_NAMESPACE.to_string() + "/")
}
if system_metric_config.process_metrics_enabled() {
filtered_namespaces.push(PROCESSES_METRIC_NAMESPACE.to_string() + "/")
}
if system_metric_config.network_metrics_enabled() {
filtered_namespaces.push(NETWORK_INTERFACE_METRIC_NAMESPACE.to_string() + "/")
}
}
CollectdHandler {
data_collection_enabled,
builtin_system_metric_collection_enabled,
metrics_mbox,
filtered_namespaces,
}
}
fn parse_request(stream: &mut dyn Read) -> Result<Vec<KeyedMetricReading>> {
let payloads: Vec<Payload> = if log_enabled!(log::Level::Debug) {
let mut buf = vec![];
stream.read_to_end(&mut buf)?;
let s = String::from_utf8_lossy(&buf);
trace!("Received JSON: {}", s);
match serde_json::from_slice(&buf) {
Ok(payloads) => payloads,
Err(e) => {
debug!("Error parsing JSON: {}\n{}", e, String::from_utf8(buf)?);
return Err(eyre!("Error parsing JSON: {}", e));
}
}
} else {
serde_json::from_reader(stream)?
};
Ok(payloads
.into_iter()
.flat_map(Vec::<KeyedMetricReading>::from)
.collect())
}
}
impl HttpHandler for CollectdHandler {
fn handle_request(&self, request: &mut Request) -> HttpHandlerResult {
if request.url() != "/v1/collectd" || *request.method() != Method::Post {
return HttpHandlerResult::NotHandled;
}
if self.data_collection_enabled {
match Self::parse_request(request.as_reader()) {
Ok(mut readings) => {
if self.builtin_system_metric_collection_enabled {
readings = readings
.into_iter()
.filter(|reading| {
!self
.filtered_namespaces
.iter()
.any(|namespace| reading.name.as_str().starts_with(namespace))
})
.collect_vec()
}
if !readings.is_empty() && self.metrics_mbox.send_and_forget(readings).is_err()
{
return HttpHandlerResult::Response(Response::empty(500).boxed());
}
}
Err(e) => {
warn!("Error parsing request: {}", e);
}
}
}
HttpHandlerResult::Response(Response::empty(200).boxed())
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, time::Duration};
use insta::assert_json_snapshot;
use rstest::{fixture, rstest};
use ssf::ServiceMock;
use tiny_http::{Method, TestRequest};
use crate::{
http_server::{HttpHandler, HttpHandlerResult},
metrics::{
system_metrics::{
CpuMetricsConfig, MemoryMetricsConfig, OuiMetricsConfig, SystemMetricConfig,
ThermalMetricsConfig,
},
KeyedMetricReading, TakeMetrics,
},
};
use super::CollectdHandler;
#[rstest]
fn handle_push(mut fixture: Fixture) {
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"idle"}]"#,
);
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
assert_json_snapshot!(fixture.mock.take_metrics().unwrap());
}
#[rstest]
fn ignores_data_when_data_collection_is_off() {
let mut mock = ServiceMock::new();
let handler =
CollectdHandler::new(false, system_metric_config_disabled(), mock.mbox.clone());
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"idle"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
assert_eq!(mock.take_messages().len(), 0)
}
#[rstest]
fn drops_cpu_metrics_when_builtin_system_metrics_are_enabled() {
let mut mock = ServiceMock::new();
let handler = CollectdHandler::new(true, system_metric_config_enabled(), mock.mbox.clone());
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"idle"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"cpufreq","plugin_instance":"0","type":"cpu","type_instance":"idle"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"mockplugin","plugin_instance":"0","type":"mock","type_instance":"test"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
assert_json_snapshot!(mock.take_metrics().unwrap());
}
#[rstest]
fn drops_enabled_metrics_when_builtin_system_metrics_are_partially_enabled() {
let mut mock = ServiceMock::new();
let handler = CollectdHandler::new(true, system_metric_config_mixed(), mock.mbox.clone());
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"idle"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"processes","plugin_instance":"0","type":"memfaultd","type_instance":"utime"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
let r = TestRequest::new().with_method(Method::Post).with_path("/v1/collectd").with_body(
r#"[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1619712000.000,"interval":10.000,"host":"localhost","plugin":"interface","plugin_instance":"0","type":"wlan0","type_instance":"bytes_tx"}]"#,
);
assert!(matches!(
handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
assert_json_snapshot!(mock.take_metrics().unwrap());
}
fn system_metric_config_mixed() -> SystemMetricConfig {
SystemMetricConfig {
poll_interval_seconds: Duration::from_secs(10),
enable: true,
processes: Some(HashSet::new()),
disk_space: Some(HashSet::new()),
diskstats: None,
network_interfaces: None,
cpu: Some(CpuMetricsConfig { enable: true }),
memory: Some(MemoryMetricsConfig { enable: true }),
thermal: Some(ThermalMetricsConfig { enable: true }),
oui: Some(OuiMetricsConfig { enable: true }),
}
}
fn system_metric_config_disabled() -> SystemMetricConfig {
SystemMetricConfig {
poll_interval_seconds: Duration::from_secs(10),
enable: false,
processes: None,
disk_space: None,
diskstats: None,
network_interfaces: None,
cpu: Some(CpuMetricsConfig { enable: false }),
memory: Some(MemoryMetricsConfig { enable: false }),
thermal: Some(ThermalMetricsConfig { enable: false }),
oui: Some(OuiMetricsConfig { enable: false }),
}
}
fn system_metric_config_enabled() -> SystemMetricConfig {
SystemMetricConfig {
poll_interval_seconds: Duration::from_secs(10),
enable: true,
processes: None,
disk_space: None,
diskstats: None,
network_interfaces: None,
cpu: Some(CpuMetricsConfig { enable: true }),
memory: Some(MemoryMetricsConfig { enable: true }),
thermal: Some(ThermalMetricsConfig { enable: true }),
oui: Some(OuiMetricsConfig { enable: true }),
}
}
struct Fixture {
handler: CollectdHandler,
mock: ServiceMock<Vec<KeyedMetricReading>>,
}
#[fixture]
fn fixture() -> Fixture {
let mock = ServiceMock::new();
Fixture {
handler: CollectdHandler::new(true, system_metric_config_disabled(), mock.mbox.clone()),
mock,
}
}
}