use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use dynamo_llm::kv_router::publisher::WorkerMetricsPublisher;
use parking_lot::Mutex;
use crate::engine::ComponentSnapshot;
use crate::metrics::ComponentGauges;
pub struct SnapshotPublisher {
gauges: Arc<ComponentGauges>,
router_publishers: HashMap<u32, Arc<WorkerMetricsPublisher>>,
warned_ranks: Mutex<HashSet<u32>>,
}
impl SnapshotPublisher {
pub fn new(
gauges: Arc<ComponentGauges>,
router_publishers: HashMap<u32, Arc<WorkerMetricsPublisher>>,
) -> Self {
Self {
gauges,
router_publishers,
warned_ranks: Mutex::new(HashSet::new()),
}
}
pub fn publish(&self, dp_rank: u32, snap: ComponentSnapshot) {
self.gauges.update(&snap);
if let Some(rp) = self.router_publishers.get(&dp_rank)
&& let Err(e) = rp.publish(Some(dp_rank), None, Some(snap.kv_used_blocks))
{
if self.warned_ranks.lock().insert(dp_rank) {
tracing::warn!(dp_rank, error = %e, "router signal publish failed; suppressing further");
} else {
tracing::debug!(dp_rank, error = %e, "router signal publish failed");
}
}
}
pub fn dp_ranks(&self) -> Vec<u32> {
let mut ranks: Vec<u32> = self.router_publishers.keys().copied().collect();
ranks.sort_unstable();
ranks
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::ComponentSnapshot;
use crate::metrics::{ComponentGauges, EngineMetrics, TestHierarchy};
fn make_publisher() -> SnapshotPublisher {
let metrics = EngineMetrics::from_hierarchy(TestHierarchy::new());
let gauges = Arc::new(ComponentGauges::new(&metrics, &[0]).expect("component gauges"));
SnapshotPublisher::new(gauges, HashMap::new())
}
#[test]
fn publish_unknown_rank_is_noop() {
let publisher = make_publisher();
publisher.publish(
42,
ComponentSnapshot {
kv_used_blocks: 5,
kv_total_blocks: 100,
gpu_cache_usage: 0.5,
kv_cache_hit_rate: Some(0.3),
dp_rank: 42,
},
);
}
#[test]
fn publish_updates_gauges_synchronously() {
let metrics = EngineMetrics::from_hierarchy(TestHierarchy::new());
let gauges = Arc::new(ComponentGauges::new(&metrics, &[0]).expect("component gauges"));
let publisher = SnapshotPublisher::new(gauges, HashMap::new());
publisher.publish(
0,
ComponentSnapshot {
kv_used_blocks: 7,
kv_total_blocks: 100,
gpu_cache_usage: 0.07,
kv_cache_hit_rate: Some(0.25),
dp_rank: 0,
},
);
let text = metrics
.hierarchy()
.get_metrics_registry()
.prometheus_expfmt_combined()
.expect("expfmt");
assert!(
text.contains("dynamo_component_total_blocks") && text.contains("100"),
"total_blocks not in /metrics: {text}"
);
assert!(
text.contains("dynamo_component_gpu_cache_usage_percent"),
"gpu_cache_usage_percent not in /metrics: {text}"
);
}
#[test]
fn seeded_ranks_render_in_metrics() {
let metrics = EngineMetrics::from_hierarchy(TestHierarchy::new());
let _gauges = ComponentGauges::new(&metrics, &[0, 1]).expect("component gauges");
let text = metrics
.hierarchy()
.get_metrics_registry()
.prometheus_expfmt_combined()
.expect("expfmt");
assert!(
text.contains(r#"dynamo_component_total_blocks{"#) && text.contains(r#"dp_rank="0""#),
"rank 0 total_blocks not seeded: {text}"
);
assert!(
text.contains(r#"dp_rank="1""#),
"rank 1 total_blocks not seeded: {text}"
);
assert!(
!text.contains("dynamo_component_kv_cache_hit_rate"),
"kv_cache_hit_rate should not be seeded: {text}"
);
}
}