use std::sync::OnceLock;
use std::time::Duration;
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::metrics::{Counter, Gauge, Histogram};
const METER_NAME: &str = "beamr";
struct Instruments {
processes_alive: Gauge<u64>,
scheduler_utilization: Gauge<f64>,
gc_collections: Counter<u64>,
gc_duration: Histogram<f64>,
messages_sent: Counter<u64>,
memory_heap_words: Gauge<u64>,
process_message_queue_len: Gauge<u64>,
process_reductions: Counter<u64>,
}
impl Instruments {
fn new() -> Self {
let meter = global::meter(METER_NAME);
Self {
processes_alive: meter
.u64_gauge("beamr.processes.alive")
.with_description("Current number of live Beamr processes")
.with_unit("{process}")
.build(),
scheduler_utilization: meter
.f64_gauge("beamr.scheduler.utilization")
.with_description(
"Fraction of scheduler time spent executing processes rather than idle",
)
.with_unit("1")
.build(),
gc_collections: meter
.u64_counter("beamr.gc.collections")
.with_description("Total number of Beamr garbage collections")
.with_unit("{collection}")
.build(),
gc_duration: meter
.f64_histogram("beamr.gc.duration")
.with_description("Beamr garbage collection duration")
.with_unit("s")
.build(),
messages_sent: meter
.u64_counter("beamr.messages.sent")
.with_description("Total number of Beamr messages sent")
.with_unit("{message}")
.build(),
memory_heap_words: meter
.u64_gauge("beamr.memory.heap_words")
.with_description("Total process heap words allocated")
.with_unit("{word}")
.build(),
process_message_queue_len: meter
.u64_gauge("beamr.process.message_queue_len")
.with_description(
"Current process mailbox depth sampled at scheduler slice boundaries",
)
.with_unit("{message}")
.build(),
process_reductions: meter
.u64_counter("beamr.process.reductions")
.with_description("Total scheduler reductions consumed by process")
.with_unit("{reduction}")
.build(),
}
}
}
fn instruments() -> &'static Instruments {
static INSTRUMENTS: OnceLock<Instruments> = OnceLock::new();
INSTRUMENTS.get_or_init(Instruments::new)
}
pub(crate) fn record_vm_health(
processes_alive: usize,
heap_words: usize,
scheduler_utilization: f64,
) {
let instruments = instruments();
instruments
.processes_alive
.record(usize_to_u64(processes_alive), &[]);
instruments
.memory_heap_words
.record(usize_to_u64(heap_words), &[]);
instruments
.scheduler_utilization
.record(scheduler_utilization.clamp(0.0, 1.0), &[]);
}
pub(crate) fn record_gc_collection(kind: &'static str, duration: Duration) {
let attributes = [KeyValue::new("gc.kind", kind)];
let instruments = instruments();
instruments.gc_collections.add(1, &attributes);
instruments
.gc_duration
.record(duration.as_secs_f64(), &attributes);
}
pub(crate) fn record_message_sent() {
instruments().messages_sent.add(1, &[]);
}
pub(crate) fn record_process_slice(pid: u64, reductions: u32, message_queue_len: usize) {
let pid_value = match i64::try_from(pid) {
Ok(value) => value,
Err(_) => i64::MAX,
};
let attributes = [KeyValue::new("pid", pid_value)];
let instruments = instruments();
instruments
.process_reductions
.add(u64::from(reductions), &attributes);
instruments
.process_message_queue_len
.record(usize_to_u64(message_queue_len), &attributes);
}
fn usize_to_u64(value: usize) -> u64 {
match u64::try_from(value) {
Ok(value) => value,
Err(_) => u64::MAX,
}
}