use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::error::{VmRuntimeError, VmRuntimeResult};
#[derive(Debug, Clone)]
pub struct MetricsConfig {
pub fifo_path: PathBuf,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct VmMetricsSnapshot {
pub timestamp_ms: u64,
pub vcpu_usage_us: u64,
pub memory_used_bytes: u64,
pub network_rx_bytes: u64,
pub network_tx_bytes: u64,
pub network_rx_packets: u64,
pub network_tx_packets: u64,
pub block_read_bytes: u64,
pub block_write_bytes: u64,
pub raw: Value,
}
impl VmMetricsSnapshot {
pub fn from_json_line(line: &str) -> VmRuntimeResult<Self> {
let raw: Value = serde_json::from_str(line)
.map_err(|e| VmRuntimeError::Metrics(format!("invalid json line: {e}")))?;
Ok(Self::from_value(raw))
}
fn from_value(raw: Value) -> Self {
let timestamp_ms = u64_at(&raw, &["utc_timestamp_ms"]);
let vcpu_usage_us = [
"exit_io_in_agg",
"exit_io_out_agg",
"exit_mmio_read_agg",
"exit_mmio_write_agg",
]
.iter()
.map(|bucket| u64_at(&raw, &["vcpu", bucket, "sum_us"]))
.sum();
let memory_used_bytes = first_nonzero_u64(
&raw,
&[
&["vmm", "memory_used_bytes"],
&["balloon", "actual_bytes"],
&["balloon", "actual_mib"],
],
);
let memory_used_bytes = if u64_at(&raw, &["balloon", "actual_mib"]) != 0
&& u64_at(&raw, &["vmm", "memory_used_bytes"]) == 0
&& u64_at(&raw, &["balloon", "actual_bytes"]) == 0
{
memory_used_bytes.saturating_mul(1024 * 1024)
} else {
memory_used_bytes
};
let network_rx_bytes = u64_at(&raw, &["net", "rx_bytes_count"]);
let network_tx_bytes = u64_at(&raw, &["net", "tx_bytes_count"]);
let network_rx_packets = u64_at(&raw, &["net", "rx_packets_count"]);
let network_tx_packets = u64_at(&raw, &["net", "tx_packets_count"]);
let block_read_bytes = u64_at(&raw, &["block", "read_bytes"]);
let block_write_bytes = u64_at(&raw, &["block", "write_bytes"]);
Self {
timestamp_ms,
vcpu_usage_us,
memory_used_bytes,
network_rx_bytes,
network_tx_bytes,
network_rx_packets,
network_tx_packets,
block_read_bytes,
block_write_bytes,
raw,
}
}
}
fn u64_at(root: &Value, path: &[&str]) -> u64 {
let mut cursor = root;
for segment in path {
match cursor.get(*segment) {
Some(next) => cursor = next,
None => return 0,
}
}
cursor.as_u64().unwrap_or(0)
}
fn first_nonzero_u64(root: &Value, paths: &[&[&str]]) -> u64 {
for path in paths {
let v = u64_at(root, path);
if v != 0 {
return v;
}
}
0
}
type SharedSnapshot = Arc<Mutex<Option<VmMetricsSnapshot>>>;
pub struct MetricsPoller {
snapshot: SharedSnapshot,
handle: Option<JoinHandle<()>>,
shutdown: Arc<AtomicBool>,
}
impl MetricsPoller {
pub fn start(config: MetricsConfig) -> VmRuntimeResult<Self> {
let snapshot: SharedSnapshot = Arc::new(Mutex::new(None));
let shutdown = Arc::new(AtomicBool::new(false));
let snapshot_thread = Arc::clone(&snapshot);
let shutdown_thread = Arc::clone(&shutdown);
let fifo_path = config.fifo_path.clone();
let handle = thread::Builder::new()
.name(format!("microvm-metrics:{}", fifo_path.display()))
.spawn(move || reader_loop(fifo_path, snapshot_thread, shutdown_thread))
.map_err(|e| VmRuntimeError::Metrics(format!("spawn reader thread: {e}")))?;
Ok(Self {
snapshot,
handle: Some(handle),
shutdown,
})
}
pub fn snapshot(&self) -> Option<VmMetricsSnapshot> {
match self.snapshot.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
pub fn shutdown(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.handle.take() {
join_with_timeout(handle, Duration::from_millis(200));
}
}
}
impl std::fmt::Debug for MetricsPoller {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsPoller")
.field("has_snapshot", &self.snapshot().is_some())
.field("shutdown", &self.shutdown.load(Ordering::Relaxed))
.finish()
}
}
impl Drop for MetricsPoller {
fn drop(&mut self) {
self.shutdown();
}
}
fn reader_loop(fifo_path: PathBuf, snapshot: SharedSnapshot, shutdown: Arc<AtomicBool>) {
let file = match File::open(&fifo_path) {
Ok(f) => f,
Err(_) => return,
};
let reader = BufReader::new(file);
for line in reader.lines() {
if shutdown.load(Ordering::SeqCst) {
break;
}
let Ok(line) = line else { break };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(snap) = VmMetricsSnapshot::from_json_line(trimmed) {
store_snapshot(&snapshot, snap);
}
}
}
fn store_snapshot(snapshot: &SharedSnapshot, value: VmMetricsSnapshot) {
let mut guard = match snapshot.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
*guard = Some(value);
}
fn join_with_timeout(handle: JoinHandle<()>, timeout: Duration) {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if !handle.is_finished() {
thread::sleep(Duration::from_millis(10));
continue;
}
let _ = handle.join();
return;
}
drop(handle);
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use std::time::{Duration, Instant};
fn sample_full_line() -> String {
serde_json::json!({
"utc_timestamp_ms": 1_700_000_000_123u64,
"vcpu": {
"exit_io_in": 1,
"exit_io_out": 2,
"exit_mmio_read": 3,
"exit_mmio_write": 4,
"exit_io_in_agg": { "min_us": 0, "max_us": 9, "sum_us": 100 },
"exit_io_out_agg": { "min_us": 0, "max_us": 9, "sum_us": 200 },
"exit_mmio_read_agg": { "min_us": 0, "max_us": 9, "sum_us": 300 },
"exit_mmio_write_agg": { "min_us": 0, "max_us": 9, "sum_us": 400 },
},
"net": {
"rx_bytes_count": 1024u64,
"tx_bytes_count": 2048u64,
"rx_packets_count": 10u64,
"tx_packets_count": 20u64,
},
"block": {
"read_bytes": 4096u64,
"write_bytes": 8192u64,
},
"balloon": {
"actual_mib": 256u64,
},
"vmm": {
"device_events": 7u64,
}
})
.to_string()
}
#[test]
fn parses_well_known_fields() {
let snap = VmMetricsSnapshot::from_json_line(&sample_full_line()).unwrap();
assert_eq!(snap.timestamp_ms, 1_700_000_000_123);
assert_eq!(snap.vcpu_usage_us, 100 + 200 + 300 + 400);
assert_eq!(snap.memory_used_bytes, 256 * 1024 * 1024);
assert_eq!(snap.network_rx_bytes, 1024);
assert_eq!(snap.network_tx_bytes, 2048);
assert_eq!(snap.network_rx_packets, 10);
assert_eq!(snap.network_tx_packets, 20);
assert_eq!(snap.block_read_bytes, 4096);
assert_eq!(snap.block_write_bytes, 8192);
assert_eq!(snap.raw["vmm"]["device_events"].as_u64(), Some(7));
}
#[test]
fn missing_fields_default_to_zero() {
let snap = VmMetricsSnapshot::from_json_line("{}").unwrap();
assert_eq!(snap.timestamp_ms, 0);
assert_eq!(snap.vcpu_usage_us, 0);
assert_eq!(snap.memory_used_bytes, 0);
assert_eq!(snap.network_rx_bytes, 0);
assert_eq!(snap.network_tx_bytes, 0);
assert_eq!(snap.network_rx_packets, 0);
assert_eq!(snap.network_tx_packets, 0);
assert_eq!(snap.block_read_bytes, 0);
assert_eq!(snap.block_write_bytes, 0);
assert!(snap.raw.is_object());
}
#[test]
fn partial_payload_keeps_present_fields_drops_missing() {
let line = serde_json::json!({
"utc_timestamp_ms": 42u64,
"net": { "rx_bytes_count": 7u64 }
})
.to_string();
let snap = VmMetricsSnapshot::from_json_line(&line).unwrap();
assert_eq!(snap.timestamp_ms, 42);
assert_eq!(snap.network_rx_bytes, 7);
assert_eq!(snap.network_tx_bytes, 0);
assert_eq!(snap.vcpu_usage_us, 0);
}
#[test]
fn invalid_json_is_rejected() {
let err = VmMetricsSnapshot::from_json_line("not json").unwrap_err();
match err {
VmRuntimeError::Metrics(msg) => assert!(msg.contains("invalid json line")),
other => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
fn balloon_actual_bytes_takes_priority_over_mib() {
let line = serde_json::json!({
"balloon": { "actual_bytes": 12345u64, "actual_mib": 999u64 }
})
.to_string();
let snap = VmMetricsSnapshot::from_json_line(&line).unwrap();
assert_eq!(snap.memory_used_bytes, 12345);
}
#[test]
fn poller_updates_snapshot_with_latest_line() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("metrics.fifo");
let first = serde_json::json!({
"utc_timestamp_ms": 1u64,
"net": { "rx_bytes_count": 100u64 }
});
let second = serde_json::json!({
"utc_timestamp_ms": 2u64,
"net": { "rx_bytes_count": 200u64 }
});
let mut writer = std::fs::File::create(&path).expect("create");
writeln!(writer, "{first}").unwrap();
writeln!(writer, "{second}").unwrap();
writer.flush().unwrap();
drop(writer);
let poller = MetricsPoller::start(MetricsConfig { fifo_path: path }).expect("start");
let snap = wait_for_snapshot(&poller, Duration::from_secs(2)).expect("snapshot");
assert_eq!(snap.timestamp_ms, 2);
assert_eq!(snap.network_rx_bytes, 200);
drop(poller);
}
#[test]
fn poller_snapshot_is_none_until_first_line() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("metrics.fifo");
mkfifo_or_skip(&path);
let poller = MetricsPoller::start(MetricsConfig {
fifo_path: path.clone(),
})
.expect("start");
assert!(poller.snapshot().is_none());
let writer_path = path.clone();
let writer = thread::spawn(move || {
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&writer_path)
.expect("open writer");
writeln!(f, "{}", serde_json::json!({"utc_timestamp_ms": 99u64})).unwrap();
f.flush().unwrap();
});
let snap = wait_for_snapshot(&poller, Duration::from_secs(2)).expect("snapshot");
assert_eq!(snap.timestamp_ms, 99);
writer.join().unwrap();
drop(poller);
}
fn wait_for_snapshot(poller: &MetricsPoller, timeout: Duration) -> Option<VmMetricsSnapshot> {
let deadline = Instant::now() + timeout;
loop {
if let Some(snap) = poller.snapshot() {
return Some(snap);
}
if Instant::now() >= deadline {
return None;
}
thread::sleep(Duration::from_millis(20));
}
}
fn mkfifo_or_skip(path: &std::path::Path) {
#[cfg(unix)]
{
use std::ffi::CString;
let c = CString::new(path.as_os_str().as_encoded_bytes()).expect("cstring");
let rc = unsafe { libc_mkfifo(c.as_ptr(), 0o600) };
assert_eq!(rc, 0, "mkfifo failed for {}", path.display());
}
#[cfg(not(unix))]
{
let _ = path;
panic!("FIFO tests require a unix host");
}
}
#[cfg(unix)]
unsafe extern "C" {
#[link_name = "mkfifo"]
fn libc_mkfifo(path: *const std::ffi::c_char, mode: u32) -> i32;
}
}