use std::collections::HashMap;
use std::sync::Arc;
use dynamo_runtime::metrics::{
MetricsHierarchy, PrometheusExpositionFormatCallback, create_metric, prometheus_names::labels,
};
use crate::engine::EngineConfig;
use crate::error::{BackendError, DynamoError, ErrorType};
pub struct EngineMetrics {
hierarchy: Arc<dyn MetricsHierarchy>,
auto_labels: Arc<HashMap<String, String>>,
}
impl EngineMetrics {
pub fn from_hierarchy<H>(hierarchy: H) -> Self
where
H: MetricsHierarchy + 'static,
{
let arc: Arc<dyn MetricsHierarchy> = Arc::new(hierarchy);
let labels = compute_auto_labels(&*arc, None, None);
Self {
hierarchy: arc,
auto_labels: Arc::new(labels),
}
}
pub fn with_engine_config<H>(hierarchy: H, engine_config: &EngineConfig) -> Self
where
H: MetricsHierarchy + 'static,
{
let arc: Arc<dyn MetricsHierarchy> = Arc::new(hierarchy);
let labels = compute_auto_labels(
&*arc,
Some(&engine_config.model),
engine_config.served_model_name.as_deref(),
);
Self {
hierarchy: arc,
auto_labels: Arc::new(labels),
}
}
pub fn hierarchy(&self) -> &Arc<dyn MetricsHierarchy> {
&self.hierarchy
}
pub fn auto_labels(&self) -> &Arc<HashMap<String, String>> {
&self.auto_labels
}
pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
self.hierarchy
.get_metrics_registry()
.add_expfmt_callback(callback);
}
}
fn const_labels(metrics: &EngineMetrics) -> Vec<(&str, &str)> {
let auto_injected = [
labels::NAMESPACE,
labels::COMPONENT,
labels::ENDPOINT,
labels::WORKER_ID,
];
metrics
.auto_labels()
.iter()
.filter(|(k, _)| !auto_injected.contains(&k.as_str()))
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}
fn gauge_err(name: &str, e: anyhow::Error) -> DynamoError {
DynamoError::builder()
.error_type(ErrorType::Backend(BackendError::Unknown))
.message(format!("gauge create {name}: {e}"))
.build()
}
pub struct LifecycleGauges {
cleanup_time_seconds: prometheus::Gauge,
drain_time_seconds: prometheus::Gauge,
#[allow(dead_code)]
model_load_time_seconds: prometheus::Gauge,
}
impl LifecycleGauges {
pub fn new(metrics: &EngineMetrics, model_load_time_seconds: f64) -> Result<Self, DynamoError> {
let labels = const_labels(metrics);
let hierarchy = metrics.hierarchy().as_ref();
let build = |name: &str, help: &str| {
create_metric::<prometheus::Gauge, _>(hierarchy, name, help, &labels, None, None)
.map_err(|e| gauge_err(name, e))
};
let cleanup = build(
"cleanup_time_seconds",
"Time spent releasing engine resources during shutdown. Set \
by the framework once after engine.cleanup() returns.",
)?;
let drain = build(
"drain_time_seconds",
"Time spent draining in-flight work before cleanup. Stays at \
0 for engines without a drain hook.",
)?;
let model_load = build(
"model_load_time_seconds",
"Time engine.start() took to return. Set once at Worker setup.",
)?;
model_load.set(model_load_time_seconds);
Ok(Self {
cleanup_time_seconds: cleanup,
drain_time_seconds: drain,
model_load_time_seconds: model_load,
})
}
pub fn observe_cleanup_time(&self, seconds: f64) {
self.cleanup_time_seconds.set(seconds);
}
pub fn observe_drain_time(&self, seconds: f64) {
self.drain_time_seconds.set(seconds);
}
}
pub struct ComponentGauges {
total_blocks: prometheus::IntGaugeVec,
gpu_cache_usage_percent: prometheus::GaugeVec,
kv_cache_hit_rate: prometheus::GaugeVec,
}
impl ComponentGauges {
pub fn new(metrics: &EngineMetrics, dp_ranks: &[u32]) -> Result<Self, DynamoError> {
let const_label_values = const_labels(metrics);
let hierarchy = metrics.hierarchy().as_ref();
let build_int = |name: &str, help: &str| {
create_metric::<prometheus::IntGaugeVec, _>(
hierarchy,
name,
help,
&const_label_values,
None,
Some(&["dp_rank"]),
)
.map_err(|e| gauge_err(name, e))
};
let build_f64 = |name: &str, help: &str| {
create_metric::<prometheus::GaugeVec, _>(
hierarchy,
name,
help,
&const_label_values,
None,
Some(&["dp_rank"]),
)
.map_err(|e| gauge_err(name, e))
};
let total_blocks = build_int(
"total_blocks",
"Total number of KV cache blocks available on the worker.",
)?;
let gpu_cache_usage_percent = build_f64(
"gpu_cache_usage_percent",
"GPU cache usage as a percentage (0.0-1.0).",
)?;
let kv_cache_hit_rate = build_f64(
"kv_cache_hit_rate",
"Prefix cache hit rate (0.0-1.0). Portable across engines.",
)?;
for &rank in dp_ranks {
let r = rank.to_string();
total_blocks.with_label_values(&[&r]).set(0);
gpu_cache_usage_percent.with_label_values(&[&r]).set(0.0);
}
Ok(Self {
total_blocks,
gpu_cache_usage_percent,
kv_cache_hit_rate,
})
}
pub fn update(&self, snap: &crate::engine::ComponentSnapshot) {
let rank = snap.dp_rank.to_string();
self.total_blocks
.with_label_values(&[&rank])
.set(snap.kv_total_blocks as i64);
self.gpu_cache_usage_percent
.with_label_values(&[&rank])
.set(snap.gpu_cache_usage as f64);
if let Some(hr) = snap.kv_cache_hit_rate {
self.kv_cache_hit_rate
.with_label_values(&[&rank])
.set(hr as f64);
}
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Default)]
pub struct TestHierarchy {
registry: dynamo_runtime::metrics::MetricsRegistry,
}
#[cfg(any(test, feature = "testing"))]
impl TestHierarchy {
pub fn new() -> Self {
Self::default()
}
}
#[cfg(any(test, feature = "testing"))]
impl MetricsHierarchy for TestHierarchy {
fn basename(&self) -> String {
"test".to_string()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
Vec::new()
}
fn get_metrics_registry(&self) -> &dynamo_runtime::metrics::MetricsRegistry {
&self.registry
}
}
fn compute_auto_labels(
hierarchy: &dyn MetricsHierarchy,
model: Option<&str>,
served_model_name: Option<&str>,
) -> HashMap<String, String> {
let mut out = HashMap::new();
let parents = hierarchy.parent_hierarchies();
let mut chain: Vec<String> = parents.iter().map(|p| p.basename()).collect();
chain.push(hierarchy.basename());
let mut put = |idx: usize, key: &str| {
if let Some(v) = chain.get(idx).filter(|s| !s.is_empty()) {
out.insert(key.to_string(), v.clone());
}
};
put(1, labels::NAMESPACE);
put(2, labels::COMPONENT);
put(3, labels::ENDPOINT);
if let Some(id) = hierarchy.connection_id() {
out.insert(labels::WORKER_ID.to_string(), format!("{:x}", id));
}
if let Some(m) = model.filter(|s| !s.is_empty()) {
out.insert(labels::MODEL.to_string(), m.to_string());
out.insert(labels::MODEL_NAME.to_string(), m.to_string());
}
if let Some(s) = served_model_name.filter(|s| !s.is_empty()) {
out.insert(labels::MODEL_NAME.to_string(), s.to_string());
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn with_engine_config_populates_model_labels() {
let config = EngineConfig {
model: "/local/path/qwen".to_string(),
served_model_name: Some("qwen3-0.6b".to_string()),
..Default::default()
};
let labels = EngineMetrics::with_engine_config(TestHierarchy::new(), &config)
.auto_labels()
.clone();
assert_eq!(labels.get("model").unwrap(), "/local/path/qwen");
assert_eq!(labels.get("model_name").unwrap(), "qwen3-0.6b");
}
#[test]
fn empty_model_strings_do_not_emit_labels() {
let config = EngineConfig {
model: String::new(),
served_model_name: Some(String::new()),
..Default::default()
};
let labels = EngineMetrics::with_engine_config(TestHierarchy::new(), &config)
.auto_labels()
.clone();
assert!(!labels.contains_key("model"));
assert!(!labels.contains_key("model_name"));
}
#[test]
fn add_expfmt_callback_appears_in_combined_scrape() {
let m = EngineMetrics::from_hierarchy(TestHierarchy::new());
m.add_expfmt_callback(Arc::new(|| Ok("# external metric\n".to_string())));
let text = m
.hierarchy()
.get_metrics_registry()
.prometheus_expfmt_combined()
.expect("expfmt");
assert!(text.contains("# external metric"));
}
struct NamedHierarchy {
registry: dynamo_runtime::metrics::MetricsRegistry,
name: String,
parents: Vec<NamedHierarchy>,
}
impl MetricsHierarchy for NamedHierarchy {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
self.parents
.iter()
.map(|p| p as &dyn MetricsHierarchy)
.collect()
}
fn get_metrics_registry(&self) -> &dynamo_runtime::metrics::MetricsRegistry {
&self.registry
}
}
fn leaf(name: &str) -> NamedHierarchy {
NamedHierarchy {
registry: Default::default(),
name: name.to_string(),
parents: Vec::new(),
}
}
#[test]
fn lifecycle_gauges_register_and_observe() {
let endpoint = NamedHierarchy {
registry: Default::default(),
name: "generate".to_string(),
parents: vec![leaf("drt"), leaf("dyn"), leaf("backend")],
};
let config = EngineConfig {
model: "test-model".to_string(),
..Default::default()
};
let metrics = EngineMetrics::with_engine_config(endpoint, &config);
let auto = metrics.auto_labels();
assert_eq!(auto.get(labels::NAMESPACE).map(String::as_str), Some("dyn"));
assert_eq!(
auto.get(labels::COMPONENT).map(String::as_str),
Some("backend")
);
assert_eq!(
auto.get(labels::ENDPOINT).map(String::as_str),
Some("generate")
);
let lifecycle = LifecycleGauges::new(&metrics, 0.0).expect("construct lifecycle gauges");
lifecycle.observe_cleanup_time(1.5);
lifecycle.observe_drain_time(0.25);
let text = metrics
.hierarchy()
.get_metrics_registry()
.prometheus_expfmt_combined()
.expect("expfmt");
let data_line = |gauge: &str| -> Option<&str> {
text.lines()
.find(|l| l.starts_with(gauge) && !l.starts_with('#'))
};
let cleanup = data_line("dynamo_component_cleanup_time_seconds")
.unwrap_or_else(|| panic!("cleanup gauge data row missing: {text}"));
let drain = data_line("dynamo_component_drain_time_seconds")
.unwrap_or_else(|| panic!("drain gauge data row missing: {text}"));
assert!(cleanup.ends_with(" 1.5"), "cleanup value wrong: {cleanup}");
assert!(drain.ends_with(" 0.25"), "drain value wrong: {drain}");
}
}