use alloc::{fmt::Debug, string::String, vec::Vec};
use core::{fmt, time::Duration};
use std::{
boxed::Box,
sync::{atomic::AtomicU64, Arc},
thread,
};
use futures::executor::block_on;
use prometheus_client::{
encoding::text::{encode, Encode, SendSyncEncodeMetric},
metrics::{family::Family, gauge::Gauge},
registry::Registry,
};
use tide::Request;
use crate::{
bolts::{current_time, format_duration_hms},
monitors::{ClientStats, Monitor, UserStats},
};
#[derive(Clone)]
pub struct PrometheusMonitor<F>
where
F: FnMut(String),
{
print_fn: F,
start_time: Duration,
client_stats: Vec<ClientStats>,
corpus_count: Family<Labels, Gauge>,
objective_count: Family<Labels, Gauge>,
executions: Family<Labels, Gauge>,
exec_rate: Family<Labels, Gauge>,
runtime: Family<Labels, Gauge>,
clients_count: Family<Labels, Gauge>,
custom_stat: Family<Labels, Gauge<f64, AtomicU64>>,
}
impl<F> Debug for PrometheusMonitor<F>
where
F: FnMut(String),
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PrometheusMonitor")
.field("start_time", &self.start_time)
.field("client_stats", &self.client_stats)
.finish()
}
}
impl<F> Monitor for PrometheusMonitor<F>
where
F: FnMut(String),
{
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
fn start_time(&mut self) -> Duration {
self.start_time
}
#[allow(clippy::cast_sign_loss)]
fn display(&mut self, event_msg: String, sender_id: u32) {
let corpus_size = self.corpus_size();
self.corpus_count
.get_or_create(&Labels {
client: sender_id,
stat: String::new(),
})
.set(corpus_size);
let objective_size = self.objective_size();
self.objective_count
.get_or_create(&Labels {
client: sender_id,
stat: String::new(),
})
.set(objective_size);
let total_execs = self.total_execs();
self.executions
.get_or_create(&Labels {
client: sender_id,
stat: String::new(),
})
.set(total_execs);
let execs_per_sec = self.execs_per_sec() as u64;
self.exec_rate
.get_or_create(&Labels {
client: sender_id,
stat: String::new(),
})
.set(execs_per_sec);
let run_time = (current_time() - self.start_time).as_secs();
self.runtime
.get_or_create(&Labels {
client: sender_id,
stat: String::new(),
})
.set(run_time); let total_clients = self.client_stats().len().try_into().unwrap(); self.clients_count
.get_or_create(&Labels {
client: sender_id,
stat: String::new(),
})
.set(total_clients);
let fmt = format!(
"[Prometheus] [{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg,
sender_id,
format_duration_hms(&(current_time() - self.start_time)),
self.client_stats().len(),
self.corpus_size(),
self.objective_size(),
self.total_execs(),
self.execs_per_sec_pretty()
);
(self.print_fn)(fmt);
let cur_client = self.client_stats_mut_for(sender_id);
let cur_client_clone = cur_client.clone();
for (key, val) in cur_client_clone.user_monitor {
println!("{key}: {val}");
#[allow(clippy::cast_precision_loss)]
let value: f64 = match val {
UserStats::Number(n) => n as f64,
UserStats::Float(f) => f,
UserStats::String(_s) => 0.0,
UserStats::Ratio(a, b) => (a as f64 / b as f64) * 100.0,
};
self.custom_stat
.get_or_create(&Labels {
client: sender_id,
stat: key.clone(),
})
.set(value);
}
}
}
impl<F> PrometheusMonitor<F>
where
F: FnMut(String),
{
pub fn new(listener: String, print_fn: F) -> Self {
let corpus_count = Family::<Labels, Gauge>::default();
let corpus_count_clone = corpus_count.clone();
let objective_count = Family::<Labels, Gauge>::default();
let objective_count_clone = objective_count.clone();
let executions = Family::<Labels, Gauge>::default();
let executions_clone = executions.clone();
let exec_rate = Family::<Labels, Gauge>::default();
let exec_rate_clone = exec_rate.clone();
let runtime = Family::<Labels, Gauge>::default();
let runtime_clone = runtime.clone();
let clients_count = Family::<Labels, Gauge>::default();
let clients_count_clone = clients_count.clone();
let custom_stat = Family::<Labels, Gauge<f64, AtomicU64>>::default();
let custom_stat_clone = custom_stat.clone();
thread::spawn(move || {
block_on(serve_metrics(
listener,
corpus_count_clone,
objective_count_clone,
executions_clone,
exec_rate_clone,
runtime_clone,
clients_count_clone,
custom_stat_clone,
))
.map_err(|err| println!("{err:?}"))
.ok();
});
Self {
print_fn,
start_time: current_time(),
client_stats: vec![],
corpus_count,
objective_count,
executions,
exec_rate,
runtime,
clients_count,
custom_stat,
}
}
pub fn with_time(listener: String, print_fn: F, start_time: Duration) -> Self {
let corpus_count = Family::<Labels, Gauge>::default();
let corpus_count_clone = corpus_count.clone();
let objective_count = Family::<Labels, Gauge>::default();
let objective_count_clone = objective_count.clone();
let executions = Family::<Labels, Gauge>::default();
let executions_clone = executions.clone();
let exec_rate = Family::<Labels, Gauge>::default();
let exec_rate_clone = exec_rate.clone();
let runtime = Family::<Labels, Gauge>::default();
let runtime_clone = runtime.clone();
let clients_count = Family::<Labels, Gauge>::default();
let clients_count_clone = clients_count.clone();
let custom_stat = Family::<Labels, Gauge<f64, AtomicU64>>::default();
let custom_stat_clone = custom_stat.clone();
thread::spawn(move || {
block_on(serve_metrics(
listener,
corpus_count_clone,
objective_count_clone,
executions_clone,
exec_rate_clone,
runtime_clone,
clients_count_clone,
custom_stat_clone,
))
.map_err(|err| println!("{err:?}"))
.ok();
});
Self {
print_fn,
start_time,
client_stats: vec![],
corpus_count,
objective_count,
executions,
exec_rate,
runtime,
clients_count,
custom_stat,
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn serve_metrics(
listener: String,
corpus: Family<Labels, Gauge>,
objectives: Family<Labels, Gauge>,
executions: Family<Labels, Gauge>,
exec_rate: Family<Labels, Gauge>,
runtime: Family<Labels, Gauge>,
clients_count: Family<Labels, Gauge>,
custom_stat: Family<Labels, Gauge<f64, AtomicU64>>,
) -> Result<(), std::io::Error> {
tide::log::start();
let mut registry = <Registry>::default();
registry.register(
"corpus_count",
"Number of test cases in the corpus",
Box::new(corpus),
);
registry.register(
"objective_count",
"Number of times the objective has been achieved (e.g., crashes)",
Box::new(objectives),
);
registry.register(
"executions_total",
"Number of executions the fuzzer has done",
Box::new(executions),
);
registry.register(
"execution_rate",
"Rate of executions per second",
Box::new(exec_rate),
);
registry.register(
"runtime",
"How long the fuzzer has been running for (seconds)",
Box::new(runtime),
);
registry.register(
"clients_count",
"How many clients have been spawned for the fuzzing job",
Box::new(clients_count),
);
registry.register(
"custom_stat",
"A metric to contain custom stats returned by feedbacks, filterable by label",
Box::new(custom_stat),
);
let mut app = tide::with_state(State {
registry: Arc::new(registry),
});
app.at("/")
.get(|_| async { Ok("LibAFL Prometheus Monitor") });
app.at("/metrics").get(|req: Request<State>| async move {
let mut encoded = Vec::new();
encode(&mut encoded, &req.state().registry).unwrap();
let response = tide::Response::builder(200)
.body(encoded)
.content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
.build();
Ok(response)
});
app.listen(listener).await?;
Ok(())
}
#[derive(Clone, Hash, PartialEq, Eq, Encode, Debug)]
pub struct Labels {
client: u32, stat: String, }
#[derive(Clone)]
struct State {
#[allow(dead_code)]
registry: Arc<Registry<Box<dyn SendSyncEncodeMetric>>>,
}