#![allow(clippy::doc_markdown)]
use alloc::{borrow::Cow, string::String, vec::Vec};
use std::net::UdpSocket;
use cadence::{BufferedUdpMetricSink, Gauged, QueuingMetricSink, StatsdClient};
use libafl_bolts::{ClientId, Error};
use super::{
Monitor,
stats::{ClientStatsManager, EdgeCoverage, ItemGeometry, manager::GlobalStats},
};
const METRIC_PREFIX: &str = "fuzzing";
#[derive(Debug)]
pub enum StatsdMonitorTagFlavor {
DogStatsd {
tag_identifier: Cow<'static, str>,
custom_tags: Vec<(Cow<'static, str>, Cow<'static, str>)>,
},
None,
}
impl Default for StatsdMonitorTagFlavor {
fn default() -> Self {
Self::DogStatsd {
tag_identifier: "default".into(),
custom_tags: vec![],
}
}
}
#[derive(Debug)]
pub struct StatsdMonitor {
target_host: String,
target_port: u16,
tag_flavor: StatsdMonitorTagFlavor,
statsd_client: Option<StatsdClient>,
}
impl StatsdMonitor {
#[must_use]
pub fn new(target_host: String, target_port: u16, tag_flavor: StatsdMonitorTagFlavor) -> Self {
let mut this = Self {
target_host,
target_port,
tag_flavor,
statsd_client: None,
};
this.setup_statsd_client();
this
}
fn setup_statsd_client(&mut self) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();
let Ok(udp_sink) =
BufferedUdpMetricSink::from((self.target_host.as_str(), self.target_port), socket)
else {
log::warn!(
"Statsd monitor failed to connect target host {}:{}",
self.target_host,
self.target_port
);
return;
};
let queuing_sink = QueuingMetricSink::builder()
.with_error_handler(|e| {
log::warn!("Statsd monitor failed to send to target host: {e:?}");
})
.build(udp_sink);
let mut client_builder = StatsdClient::builder(METRIC_PREFIX, queuing_sink);
if let StatsdMonitorTagFlavor::DogStatsd {
tag_identifier,
custom_tags,
} = &self.tag_flavor
{
client_builder = client_builder
.with_tag("banner", tag_identifier.as_ref())
.with_tag("afl_version", env!("CARGO_PKG_VERSION"));
for (tag_key, tag_value) in custom_tags {
client_builder = client_builder.with_tag(tag_key.as_ref(), tag_value.as_ref());
}
}
let client = client_builder.build();
self.statsd_client = Some(client);
}
#[expect(clippy::cast_precision_loss)]
fn try_display(&mut self, client_stats_manager: &mut ClientStatsManager) -> Option<()> {
if self.statsd_client.is_none() {
self.setup_statsd_client();
}
let Some(statsd_client) = &mut self.statsd_client else {
return Some(());
};
let GlobalStats {
total_execs,
execs_per_sec,
corpus_size,
objective_size,
..
} = client_stats_manager.global_stats();
let total_execs = *total_execs;
let execs_per_sec = *execs_per_sec;
let corpus_size = *corpus_size;
let objective_size = *objective_size;
let ItemGeometry {
pending,
pend_fav,
own_finds,
imported,
stability,
} = client_stats_manager.item_geometry();
let edges_coverage = client_stats_manager.edges_coverage();
statsd_client.gauge("execs_done", total_execs).ok()?;
statsd_client.gauge("execs_per_sec", execs_per_sec).ok()?;
statsd_client.gauge("corpus_count", corpus_size).ok()?;
statsd_client.gauge("corpus_found", own_finds).ok()?;
statsd_client.gauge("corpus_imported", imported).ok()?;
if let Some(stability) = stability {
statsd_client.gauge("stability", stability).ok()?; }
statsd_client.gauge("pending_favs", pend_fav).ok()?;
statsd_client.gauge("pending_total", pending).ok()?;
statsd_client
.gauge("saved_solutions", objective_size)
.ok()?; if let Some(EdgeCoverage {
edges_hit,
edges_total,
}) = edges_coverage
{
statsd_client.gauge("edges_found", edges_hit).ok()?;
statsd_client
.gauge("map_density", (edges_hit as f64) / (edges_total as f64))
.ok()?; }
Some(())
}
}
impl Monitor for StatsdMonitor {
fn display(
&mut self,
client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) -> Result<(), Error> {
if self.try_display(client_stats_manager).is_none() {
self.statsd_client = None;
}
Ok(())
}
}