use std::sync::Arc;
use std::time::Duration;
use sysinfo::System;
use tokio::task::JoinHandle;
use wasmcloud_tracing::{
Counter, Gauge, Histogram, KeyValue, Meter, ObservableGauge, UpDownCounter,
};
const DEFAULT_REFRESH_TIME: Duration = Duration::from_secs(5);
#[derive(Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct HostMetrics {
pub handle_rpc_message_duration_ns: Histogram<u64>,
pub component_invocations: Counter<u64>,
pub component_errors: Counter<u64>,
pub component_active_instances: UpDownCounter<i64>,
pub component_max_instances: Gauge<u64>,
pub system_total_memory_bytes: ObservableGauge<u64>,
pub system_used_memory_bytes: ObservableGauge<u64>,
pub system_cpu_usage: ObservableGauge<f64>,
pub host_id: String,
pub lattice_id: String,
_refresh_task_handle: Arc<RefreshWrapper>,
}
struct SystemMetrics {
system_total_memory_bytes: u64,
system_used_memory_bytes: u64,
system_cpu_usage: f64,
}
#[derive(Debug)]
struct RefreshWrapper(JoinHandle<()>);
impl Drop for RefreshWrapper {
fn drop(&mut self) {
self.0.abort();
}
}
impl HostMetrics {
pub fn new(
meter: &Meter,
host_id: String,
lattice_id: String,
refresh_time: Option<Duration>,
) -> anyhow::Result<Self> {
let wasmcloud_host_handle_rpc_message_duration_ns = meter
.u64_histogram("wasmcloud_host.handle_rpc_message.duration")
.with_description("Duration in nanoseconds each handle_rpc_message operation took")
.with_unit("nanoseconds")
.build();
let component_invocation_count = meter
.u64_counter("wasmcloud_host.component.invocations")
.with_description("Number of component invocations")
.build();
let component_error_count = meter
.u64_counter("wasmcloud_host.component.invocation.errors")
.with_description("Number of component errors")
.build();
let component_active_instances = meter
.i64_up_down_counter("wasmcloud_host.component.active_instances")
.with_description("Number of active component instances")
.build();
let component_max_instances = meter
.u64_gauge("wasmcloud_host.component.max_instances")
.with_description("Maximum number of component instances")
.build();
let mut system = System::new();
system.refresh_memory();
system.refresh_cpu_usage();
let initial_metrics = SystemMetrics {
system_total_memory_bytes: system.total_memory(),
system_used_memory_bytes: system.used_memory(),
system_cpu_usage: system.global_cpu_usage() as f64,
};
let (tx, rx) = tokio::sync::watch::channel(initial_metrics);
let refresh_time = refresh_time.unwrap_or(DEFAULT_REFRESH_TIME);
let refresh_task_handle = tokio::spawn(async move {
loop {
system.refresh_memory();
system.refresh_cpu_usage();
tx.send_modify(|current| {
current.system_total_memory_bytes = system.total_memory();
current.system_used_memory_bytes = system.used_memory();
current.system_cpu_usage = system.global_cpu_usage() as f64;
});
tokio::time::sleep(refresh_time).await;
}
});
let system_memory_total_bytes = meter
.u64_observable_gauge("wasmcloud_host.process.memory.total.bytes")
.with_description("The total amount of memory in bytes")
.with_unit("bytes")
.with_callback({
let rx = rx.clone();
move |observer| {
let metrics = rx.borrow();
observer.observe(metrics.system_total_memory_bytes, &[]);
}
})
.build();
let system_memory_used_bytes = meter
.u64_observable_gauge("wasmcloud_host.process.memory.used.bytes")
.with_description("The used amount of memory in bytes")
.with_unit("bytes")
.with_callback({
let rx_clone = rx.clone();
move |observer| {
let metrics = rx_clone.borrow();
observer.observe(metrics.system_used_memory_bytes, &[]);
}
})
.build();
let system_cpu_usage = meter
.f64_observable_gauge("wasmcloud_host.process.cpu.usage")
.with_description("The CPU usage of the process")
.with_unit("percentage")
.with_callback({
let rx = rx.clone();
move |observer| {
let metrics = rx.borrow();
observer.observe(metrics.system_cpu_usage, &[]);
}
})
.build();
Ok(Self {
handle_rpc_message_duration_ns: wasmcloud_host_handle_rpc_message_duration_ns,
component_invocations: component_invocation_count,
component_errors: component_error_count,
component_active_instances,
component_max_instances,
system_total_memory_bytes: system_memory_total_bytes,
system_used_memory_bytes: system_memory_used_bytes,
system_cpu_usage,
host_id,
lattice_id,
_refresh_task_handle: Arc::new(RefreshWrapper(refresh_task_handle)),
})
}
pub(crate) fn increment_active_instance(&self, attributes: &[KeyValue]) {
self.component_active_instances.add(1, attributes);
}
pub(crate) fn decrement_active_instance(&self, attributes: &[KeyValue]) {
self.component_active_instances.add(-1, attributes);
}
pub(crate) fn set_max_instances(&self, max: u64, attributes: &[KeyValue]) {
self.component_max_instances.record(max, attributes);
}
pub(crate) fn record_component_invocation(
&self,
elapsed: u64,
attributes: &[KeyValue],
error: bool,
) {
self.handle_rpc_message_duration_ns
.record(elapsed, attributes);
self.component_invocations.add(1, attributes);
if error {
self.component_errors.add(1, attributes);
}
}
}