use std::num::NonZero;
use std::time::{Duration, Instant};
use microsandbox_metrics::{MetricsError, MetricsSlotWriter, SampleWrite};
pub const DEFAULT_SAMPLE_INTERVAL: Duration = Duration::from_secs(1);
pub trait NetworkMetrics: Send + Sync {
fn tx_bytes(&self) -> u64;
fn rx_bytes(&self) -> u64;
}
impl NetworkMetrics for () {
fn tx_bytes(&self) -> u64 {
0
}
fn rx_bytes(&self) -> u64 {
0
}
}
#[cfg(feature = "net")]
impl NetworkMetrics for microsandbox_network::network::MetricsHandle {
fn tx_bytes(&self) -> u64 {
microsandbox_network::network::MetricsHandle::tx_bytes(self)
}
fn rx_bytes(&self) -> u64 {
microsandbox_network::network::MetricsHandle::rx_bytes(self)
}
}
pub async fn run_metrics_sampler(
writer: MetricsSlotWriter,
sandbox_id: i32,
pid: u32,
interval_ms: NonZero<u64>,
krun_metrics: msb_krun::MetricsHandle,
network_metrics: Option<Box<dyn NetworkMetrics>>,
) {
let interval = Duration::from_millis(interval_ms.get());
let mut previous = krun_metrics.aggregate_snapshot();
let mut previous_instant = Instant::now();
match write_sample(&writer, None, &previous, network_metrics.as_deref()) {
Ok(()) => {}
Err(SampleWriteError::Generation) => {
tracing::info!(
sandbox_id,
pid,
"metrics slot reclaimed before first sample; stopping sampler"
);
return;
}
Err(SampleWriteError::Other(err)) => {
tracing::warn!(sandbox_id, pid, error = %err, "failed to write initial sandbox metrics");
}
}
loop {
tokio::time::sleep(interval).await;
let current = krun_metrics.aggregate_snapshot();
let now = Instant::now();
let wall_secs = now
.checked_duration_since(previous_instant)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
let cpu_percent = cpu_percent_from_vcpu_time(
current.cpu.vcpu_time_ns,
previous.cpu.vcpu_time_ns,
wall_secs,
);
match write_sample(&writer, cpu_percent, ¤t, network_metrics.as_deref()) {
Ok(()) => {}
Err(SampleWriteError::Generation) => {
tracing::info!(sandbox_id, pid, "metrics slot reclaimed; stopping sampler");
break;
}
Err(SampleWriteError::Other(err)) => {
tracing::warn!(sandbox_id, pid, error = %err, "failed to write sandbox metrics");
}
}
previous = current;
previous_instant = now;
}
}
enum SampleWriteError {
Generation,
Other(MetricsError),
}
fn write_sample(
writer: &MetricsSlotWriter,
cpu_percent: Option<f32>,
krun: &msb_krun::VmMetrics,
network_metrics: Option<&dyn NetworkMetrics>,
) -> Result<(), SampleWriteError> {
let (rx, tx) = match network_metrics {
Some(m) => (m.rx_bytes(), m.tx_bytes()),
None => (0, 0),
};
let sample = SampleWrite {
sampled_at: chrono::Utc::now(),
cpu_percent,
vcpu_time_ns: krun.cpu.vcpu_time_ns,
memory_bytes: krun.memory.used_bytes,
memory_available_bytes: krun.memory.available_bytes,
memory_host_resident_bytes: krun.memory.host_resident_bytes,
disk_read_bytes: krun.block.read_bytes,
disk_write_bytes: krun.block.write_bytes,
net_rx_bytes: rx,
net_tx_bytes: tx,
};
match writer.write_sample(sample) {
Ok(()) => Ok(()),
Err(MetricsError::GenerationMismatch { .. }) => Err(SampleWriteError::Generation),
Err(other) => Err(SampleWriteError::Other(other)),
}
}
fn cpu_percent_from_vcpu_time(
current_ns: Option<u64>,
previous_ns: Option<u64>,
wall_secs: f64,
) -> Option<f32> {
match (current_ns, previous_ns) {
(Some(current_ns), Some(previous_ns)) if wall_secs > 0.0 => {
let vcpu_delta_ns = current_ns.saturating_sub(previous_ns);
Some((((vcpu_delta_ns as f64 / 1_000_000_000.0) / wall_secs) * 100.0) as f32)
}
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cpu_percent_uses_vcpu_seconds_over_wall_seconds() {
assert_eq!(
cpu_percent_from_vcpu_time(Some(2_000_000_000), Some(1_000_000_000), 1.0),
Some(100.0)
);
assert_eq!(
cpu_percent_from_vcpu_time(Some(3_000_000_000), Some(1_000_000_000), 1.0),
Some(200.0)
);
assert_eq!(
cpu_percent_from_vcpu_time(Some(1_000_000_000), Some(2_000_000_000), 1.0),
Some(0.0)
);
assert_eq!(cpu_percent_from_vcpu_time(None, Some(0), 1.0), None);
assert_eq!(cpu_percent_from_vcpu_time(Some(0), Some(0), 0.0), None);
}
}