use crate::erlang::{MSAccThread, RpcClient, SystemVersion};
use crate::Options;
use std::collections::BTreeMap;
use std::sync::mpsc;
use std::time::{Duration, Instant};
type MetricsReceiver = mpsc::Receiver<Metrics>;
type MetricsSender = mpsc::Sender<Metrics>;
#[derive(Debug, Clone)]
pub struct Metrics {
pub timestamp: Instant,
pub items: BTreeMap<String, MetricValue>,
}
impl Metrics {
fn new() -> Self {
Self {
timestamp: Instant::now(),
items: BTreeMap::new(),
}
}
fn insert(&mut self, name: &str, value: MetricValue) {
self.items.insert(name.to_owned(), value);
}
pub fn root_items(&self) -> impl Iterator<Item = (&str, &MetricValue)> {
self.items
.iter()
.filter(|(_, v)| v.parent().is_none())
.map(|(k, v)| (k.as_str(), v))
}
pub fn child_items<'a, 'b>(
&'a self,
parent: &'b str,
) -> impl 'a + Iterator<Item = (&'a str, &'a MetricValue)>
where
'b: 'a,
{
self.items
.iter()
.filter(move |(_, v)| v.parent().as_ref().map_or(false, |&x| x == parent))
.map(|(k, v)| (k.as_str(), v))
}
fn calc_delta(&mut self, prev: &Self) {
let duration = self.timestamp - prev.timestamp;
for (name, value) in &mut self.items {
if let MetricValue::Counter {
raw_value, value, ..
} = value
{
if let Some(MetricValue::Counter {
raw_value: prev, ..
}) = prev.items.get(name)
{
if let Some(delta) = raw_value.checked_sub(*prev) {
*value = Some(delta as f64 / duration.as_secs_f64());
}
}
}
}
}
}
#[derive(Debug, Clone)]
pub enum MetricValue {
Gauge {
value: u64,
parent: Option<String>,
},
Counter {
raw_value: u64,
value: Option<f64>, parent: Option<String>,
},
Utilization {
value: f64,
parent: Option<String>,
},
}
impl MetricValue {
pub fn utilization(value: f64) -> Self {
Self::Utilization {
value,
parent: None,
}
}
fn utilization_with_parent(value: f64, parent: &str) -> Self {
Self::Utilization {
value,
parent: Some(parent.to_owned()),
}
}
fn gauge(value: u64) -> Self {
Self::Gauge {
value,
parent: None,
}
}
fn gauge_with_parent(value: u64, parent: &str) -> Self {
Self::Gauge {
value,
parent: Some(parent.to_owned()),
}
}
fn counter(raw_value: u64) -> Self {
Self::Counter {
raw_value,
value: None,
parent: None,
}
}
fn counter_with_parent(raw_value: u64, parent: &str) -> Self {
Self::Counter {
raw_value,
value: None,
parent: Some(parent.to_owned()),
}
}
pub fn as_f64(&self) -> Option<f64> {
match self {
Self::Gauge { value, .. } => Some(*value as f64),
Self::Counter { value: Some(v), .. } => Some(v.round()),
Self::Counter { .. } => None,
Self::Utilization { value, .. } => Some(*value),
}
}
fn parent(&self) -> Option<&str> {
match self {
Self::Gauge { parent, .. } => parent.as_ref().map(|x| x.as_str()),
Self::Counter { parent, .. } => parent.as_ref().map(|x| x.as_str()),
Self::Utilization { parent, .. } => parent.as_ref().map(|x| x.as_str()),
}
}
}
impl std::fmt::Display for MetricValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Gauge { value, .. } => {
write!(f, "{}", format_u64(*value, " "))
}
Self::Utilization { value, .. } => {
write!(f, "{:.1} %", value)
}
Self::Counter {
value: Some(value), ..
} => {
write!(f, "{}", format_u64(value.round() as u64, "/s"))
}
Self::Counter { .. } => {
write!(f, "")
}
}
}
}
impl std::ops::AddAssign for MetricValue {
fn add_assign(&mut self, rhs: Self) {
match (self, rhs) {
(Self::Gauge { value, .. }, Self::Gauge { value: rhs, .. }) => {
*value += rhs;
}
(Self::Utilization { value, .. }, Self::Utilization { value: rhs, .. }) => {
*value += rhs;
}
(Self::Counter { value: lhs, .. }, Self::Counter { value: rhs, .. }) => {
if let (Some(lhs), Some(rhs)) = (lhs.as_mut(), rhs) {
*lhs += rhs;
} else {
*lhs = rhs;
}
}
(lhs, rhs) => {
panic!("cannot apply `MetricValue::add_assign()` to {lhs:?} and {rhs:?}",);
}
}
}
}
impl std::ops::SubAssign for MetricValue {
fn sub_assign(&mut self, rhs: Self) {
match (self, rhs) {
(Self::Gauge { value, .. }, Self::Gauge { value: rhs, .. }) => {
*value -= rhs;
}
(Self::Utilization { value, .. }, Self::Utilization { value: rhs, .. }) => {
*value -= rhs;
}
(Self::Counter { value: lhs, .. }, Self::Counter { value: rhs, .. }) => {
if let (Some(lhs), Some(rhs)) = (lhs.as_mut(), rhs) {
*lhs -= rhs;
}
}
(lhs, rhs) => {
panic!("cannot apply `MetricValue::sub_assign()` to {lhs:?} and {rhs:?}",);
}
}
}
}
pub fn format_u64(mut n: u64, suffix: &str) -> String {
let mut s = Vec::new();
for i in 0.. {
if i % 3 == 0 && i != 0 {
s.push(b',');
}
let m = n % 10;
s.push(b'0' + m as u8);
n /= 10;
if n == 0 {
break;
}
}
s.reverse();
let mut s = String::from_utf8(s).expect("unreachable");
s.push_str(suffix);
s
}
#[derive(Debug)]
pub struct MetricsPoller {
pub rx: MetricsReceiver,
pub system_version: SystemVersion,
rpc_client: RpcClient,
old_microstate_accounting_flag: bool,
}
impl MetricsPoller {
pub fn start_thread(options: Options) -> anyhow::Result<Self> {
MetricsPollerThread::start_thread(options)
}
}
impl Drop for MetricsPoller {
fn drop(&mut self) {
if !self.old_microstate_accounting_flag {
if let Err(e) = smol::block_on(
self.rpc_client
.set_system_flag_bool("microstate_accounting", "false"),
) {
log::warn!("faild to disable microstate accounting: {e}");
} else {
log::debug!("disabled microstate accounting");
}
}
}
}
#[derive(Debug)]
struct MetricsPollerThread {
options: Options,
rpc_client: RpcClient,
tx: MetricsSender,
prev_metrics: Metrics,
}
impl MetricsPollerThread {
fn start_thread(options: Options) -> anyhow::Result<MetricsPoller> {
let (tx, rx) = mpsc::channel();
let rpc_client: RpcClient = smol::block_on(async {
let cookie = options.find_cookie()?;
let client = RpcClient::connect(&options.erlang_node, &cookie).await?;
Ok(client) as anyhow::Result<_>
})?;
let system_version = smol::block_on(rpc_client.get_system_version())?;
let old_microstate_accounting_flag =
smol::block_on(rpc_client.set_system_flag_bool("microstate_accounting", "true"))?;
log::debug!(
"enabled microstate accounting (old flag state is {old_microstate_accounting_flag})"
);
let poller = MetricsPoller {
rx,
system_version,
rpc_client: rpc_client.clone(),
old_microstate_accounting_flag,
};
std::thread::spawn(|| {
Self {
options,
rpc_client,
tx,
prev_metrics: Metrics::new(),
}
.run()
});
Ok(poller)
}
fn run(mut self) {
let interval = Duration::from_secs(self.options.polling_interval.get() as u64);
smol::block_on(async {
loop {
match self.poll_once().await {
Err(e) => {
log::error!("faild to poll metrics: {e}");
break;
}
Ok(metrics) => {
let elapsed = metrics.timestamp.elapsed();
if self.tx.send(metrics).is_err() {
log::debug!("the main thread has terminated");
break;
}
if let Some(sleep_duration) = interval.checked_sub(elapsed) {
std::thread::sleep(sleep_duration);
}
}
}
}
})
}
fn insert_msacc_metrics(&self, metrics: &mut Metrics, msacc_threads: &[MSAccThread]) {
let mut aggregated_per_type = BTreeMap::<_, ThreadTime>::new();
let mut aggregated_per_state_per_type = BTreeMap::<_, BTreeMap<&str, u64>>::new();
let mut aggregated_per_thread_per_type = BTreeMap::<_, BTreeMap<u64, ThreadTime>>::new();
for thread in msacc_threads {
let x = aggregated_per_type.entry(&thread.thread_type).or_default();
let realtime = thread.counters.values().copied().sum::<u64>();
let sleeptime = thread.counters["sleep"];
x.realtime += realtime;
x.runtime += realtime - sleeptime;
let x = aggregated_per_thread_per_type
.entry(&thread.thread_type)
.or_default()
.entry(thread.thread_id)
.or_default();
x.realtime += realtime;
x.runtime += realtime - sleeptime;
for (state, value) in &thread.counters {
*aggregated_per_state_per_type
.entry(&thread.thread_type)
.or_default()
.entry(state)
.or_default() += *value;
}
}
for (ty, time) in aggregated_per_type {
let root_name = format!("utilization.{ty}");
metrics.insert(&root_name, MetricValue::utilization(time.utilization()));
for (state, value) in &aggregated_per_state_per_type[ty] {
let u = *value as f64 / time.realtime as f64 * 100.0;
metrics.insert(
&format!("{root_name}.state.{state}"),
MetricValue::utilization_with_parent(u, &root_name),
);
}
let id_width = aggregated_per_thread_per_type[ty]
.keys()
.map(|id| id / 10 + 1)
.max()
.unwrap_or(1) as usize;
for (thread_id, time) in &aggregated_per_thread_per_type[ty] {
metrics.insert(
&format!("{root_name}.thread.{:0id_width$}", thread_id),
MetricValue::utilization_with_parent(time.utilization(), &root_name),
);
}
}
}
async fn poll_once(&mut self) -> anyhow::Result<Metrics> {
let mut metrics = Metrics::new();
let msacc = self
.rpc_client
.get_statistics_microstate_accounting()
.await?;
self.insert_msacc_metrics(&mut metrics, &msacc);
let processes = self.rpc_client.get_system_info_u64("process_count").await?;
metrics.insert("system_info.process_count", MetricValue::gauge(processes));
let ports = self.rpc_client.get_system_info_u64("port_count").await?;
metrics.insert("system_info.port_count", MetricValue::gauge(ports));
let atoms = self.rpc_client.get_system_info_u64("atom_count").await?;
metrics.insert("system_info.atom_count", MetricValue::gauge(atoms));
let ets_tables = self.rpc_client.get_system_info_u64("ets_count").await?;
metrics.insert("system_info.ets_count", MetricValue::gauge(ets_tables));
let context_switches = self
.rpc_client
.get_statistics_1st_u64("context_switches")
.await?;
metrics.insert(
"statistics.context_switches",
MetricValue::counter(context_switches),
);
let exact_reductions = self
.rpc_client
.get_statistics_1st_u64("exact_reductions")
.await?;
metrics.insert(
"statistics.exact_reductions",
MetricValue::counter(exact_reductions),
);
let garbage_collection = self
.rpc_client
.get_statistics_1st_u64("garbage_collection")
.await?;
metrics.insert(
"statistics.garbage_collection",
MetricValue::counter(garbage_collection),
);
let runtime = self.rpc_client.get_statistics_1st_u64("runtime").await?;
metrics.insert("statistics.runtime", MetricValue::counter(runtime));
let (in_bytes, out_bytes) = self.rpc_client.get_statistics_io().await?;
metrics.insert(
"statistics.io.total_bytes",
MetricValue::counter(in_bytes + out_bytes),
);
metrics.insert(
"statistics.io.input_bytes",
MetricValue::counter_with_parent(in_bytes, "statistics.io.total_bytes"),
);
metrics.insert(
"statistics.io.output_bytes",
MetricValue::counter_with_parent(out_bytes, "statistics.io.total_bytes"),
);
let run_queue_lengths = self
.rpc_client
.get_statistics_u64_list("run_queue_lengths_all")
.await?;
let run_queue_total = run_queue_lengths.iter().copied().sum();
metrics.insert("statistics.run_queue", MetricValue::gauge(run_queue_total));
let width = run_queue_lengths.len() / 10 + 1;
for (i, n) in run_queue_lengths.into_iter().enumerate() {
metrics.insert(
&format!("statistics.run_queue.{:0width$}", i),
MetricValue::gauge_with_parent(n, "statistics.run_queue"),
);
}
let mut memory = self.rpc_client.get_memory().await?;
metrics.insert(
"memory.total_bytes",
MetricValue::gauge(memory.remove("total").expect("unreachable")),
);
for (k, v) in memory {
metrics.insert(
&format!("memory.{k}_bytes"),
MetricValue::gauge_with_parent(v, "memory.total_bytes"),
);
}
self.rpc_client
.set_system_flag_bool("microstate_accounting", "reset")
.await?;
log::debug!(
"MetricsPoller::poll_once(): elapsed={:?}",
metrics.timestamp.elapsed()
);
metrics.calc_delta(&self.prev_metrics);
self.prev_metrics = metrics.clone();
Ok(metrics)
}
}
#[derive(Debug, Default)]
struct ThreadTime {
runtime: u64,
realtime: u64,
}
impl ThreadTime {
fn utilization(&self) -> f64 {
self.runtime as f64 / self.realtime as f64 * 100.0
}
}