use std::sync::{Arc, LazyLock, OnceLock};
use std::time::Duration;
use dynamo_runtime::component::Component;
use dynamo_runtime::metrics::MetricsHierarchy;
use dynamo_runtime::metrics::prometheus_names::{
frontend_service, labels, name_prefix, router, router_request, routing_overhead,
};
fn router_metric(suffix: &str) -> String {
format!("{}{}", router_request::METRIC_PREFIX, suffix)
}
use dynamo_runtime::traits::DistributedRuntimeProvider;
use prometheus::{HistogramOpts, IntGaugeVec, Opts};
use crate::http::service::metrics::generate_log_buckets;
fn compute_overhead_buckets() -> Vec<f64> {
prometheus::exponential_buckets(0.001, 2.0, 15).unwrap()
}
fn async_overhead_buckets() -> Vec<f64> {
prometheus::exponential_buckets(0.01, 3.0, 17).unwrap()
}
pub struct WorkerLoadMetrics {
pub active_decode_blocks: IntGaugeVec,
pub active_prefill_tokens: IntGaugeVec,
}
impl WorkerLoadMetrics {
pub fn observe(
&self,
worker_id: u64,
dp_rank: u32,
worker_type: &str,
active_blocks: usize,
active_tokens: usize,
) {
let worker_id_str = worker_id.to_string();
let dp_rank_str = dp_rank.to_string();
let labels = &[worker_id_str.as_str(), dp_rank_str.as_str(), worker_type];
self.active_decode_blocks
.with_label_values(labels)
.set(active_blocks as i64);
self.active_prefill_tokens
.with_label_values(labels)
.set(active_tokens as i64);
}
}
pub static WORKER_LOAD_METRICS: LazyLock<WorkerLoadMetrics> = LazyLock::new(|| WorkerLoadMetrics {
active_decode_blocks: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_DECODE_BLOCKS
),
"Active KV cache decode blocks per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.expect("Failed to create worker_active_decode_blocks gauge"),
active_prefill_tokens: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_PREFILL_TOKENS
),
"Active prefill tokens queued per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.expect("Failed to create worker_active_prefill_tokens gauge"),
});
pub fn register_worker_load_metrics(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
let m = &*WORKER_LOAD_METRICS;
registry.register(Box::new(m.active_decode_blocks.clone()))?;
registry.register(Box::new(m.active_prefill_tokens.clone()))?;
Ok(())
}
pub struct RouterQueueMetrics {
pub pending_requests: IntGaugeVec,
pub pending_isl_tokens: IntGaugeVec,
}
pub static ROUTER_QUEUE_METRICS: LazyLock<RouterQueueMetrics> =
LazyLock::new(|| RouterQueueMetrics {
pending_requests: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::ROUTER_QUEUE_PENDING_REQUESTS
),
"Number of requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.expect("Failed to create router_queue_pending_requests gauge"),
pending_isl_tokens: IntGaugeVec::new(
Opts::new(
format!("{}_router_queue_pending_isl_tokens", name_prefix::FRONTEND),
"Sum of isl_tokens for requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.expect("Failed to create router_queue_pending_isl_tokens gauge"),
});
impl RouterQueueMetrics {
pub fn set_pending(&self, worker_type: &str, count: usize) {
self.pending_requests
.with_label_values(&[worker_type])
.set(count as i64);
}
pub fn set_pending_isl_tokens(&self, worker_type: &str, tokens: usize) {
self.pending_isl_tokens
.with_label_values(&[worker_type])
.set(tokens as i64);
}
}
pub fn register_router_queue_metrics(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
let m = &*ROUTER_QUEUE_METRICS;
registry.register(Box::new(m.pending_requests.clone()))?;
registry.register(Box::new(m.pending_isl_tokens.clone()))?;
Ok(())
}
pub struct RoutingOverheadMetrics {
pub block_hashing: prometheus::Histogram,
pub indexer_find_matches: prometheus::Histogram,
pub seq_hashing: prometheus::Histogram,
pub scheduling: prometheus::Histogram,
pub total: prometheus::Histogram,
}
static ROUTING_OVERHEAD_METRICS: OnceLock<Arc<RoutingOverheadMetrics>> = OnceLock::new();
impl RoutingOverheadMetrics {
pub fn register(
registry: &prometheus::Registry,
instance_id: u64,
) -> Result<(), prometheus::Error> {
let m = ROUTING_OVERHEAD_METRICS.get_or_init(|| {
let compute_buckets = compute_overhead_buckets();
let async_buckets = async_overhead_buckets();
let router_id = instance_id.to_string();
let make = |suffix: &str, help: &str, buckets: Vec<f64>| {
let name = format!("{}_{}", name_prefix::ROUTER, suffix);
prometheus::Histogram::with_opts(
HistogramOpts::new(name, help)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(buckets),
)
};
let block_hashing = make(
routing_overhead::BLOCK_HASHING_MS,
"Time spent computing block hashes in milliseconds",
compute_buckets.clone(),
)
.expect("overhead_block_hashing_ms");
let indexer_find_matches = make(
routing_overhead::INDEXER_FIND_MATCHES_MS,
"Time spent in indexer find_matches in milliseconds",
async_buckets.clone(),
)
.expect("overhead_indexer_find_matches_ms");
let seq_hashing = make(
routing_overhead::SEQ_HASHING_MS,
"Time spent computing sequence hashes in milliseconds",
compute_buckets,
)
.expect("overhead_seq_hashing_ms");
let scheduling = make(
routing_overhead::SCHEDULING_MS,
"Time spent in scheduler worker selection in milliseconds",
async_buckets.clone(),
)
.expect("overhead_scheduling_ms");
let total = make(
routing_overhead::TOTAL_MS,
"Total routing overhead per request in milliseconds",
async_buckets,
)
.expect("overhead_total_ms");
Arc::new(Self {
block_hashing,
indexer_find_matches,
seq_hashing,
scheduling,
total,
})
});
registry.register(Box::new(m.block_hashing.clone()))?;
registry.register(Box::new(m.indexer_find_matches.clone()))?;
registry.register(Box::new(m.seq_hashing.clone()))?;
registry.register(Box::new(m.scheduling.clone()))?;
registry.register(Box::new(m.total.clone()))?;
Ok(())
}
pub fn get() -> Option<Arc<Self>> {
ROUTING_OVERHEAD_METRICS.get().cloned()
}
pub fn observe(
&self,
hash_elapsed: Duration,
seq_hash_elapsed: Duration,
find_matches_elapsed: Duration,
total_elapsed: Duration,
) {
self.block_hashing
.observe(hash_elapsed.as_secs_f64() * 1000.0);
self.seq_hashing
.observe(seq_hash_elapsed.saturating_sub(hash_elapsed).as_secs_f64() * 1000.0);
self.indexer_find_matches.observe(
find_matches_elapsed
.saturating_sub(seq_hash_elapsed)
.as_secs_f64()
* 1000.0,
);
self.scheduling.observe(
total_elapsed
.saturating_sub(find_matches_elapsed)
.as_secs_f64()
* 1000.0,
);
self.total.observe(total_elapsed.as_secs_f64() * 1000.0);
}
}
pub struct RouterRequestMetrics {
pub requests_total: prometheus::IntCounter,
pub time_to_first_token_seconds: prometheus::Histogram,
pub inter_token_latency_seconds: prometheus::Histogram,
pub input_sequence_tokens: prometheus::Histogram,
pub output_sequence_tokens: prometheus::Histogram,
pub kv_hit_rate: prometheus::Histogram,
pub kv_transfer_estimated_latency_seconds: prometheus::Histogram,
}
static ROUTER_REQUEST_METRICS: OnceLock<Arc<RouterRequestMetrics>> = OnceLock::new();
impl RouterRequestMetrics {
pub fn from_component(component: &Component) -> Arc<Self> {
ROUTER_REQUEST_METRICS
.get_or_init(|| {
let instance_id = component.drt().discovery().instance_id();
let router_id = instance_id.to_string();
let extra_labels: &[(&str, &str)] = &[(labels::ROUTER_ID, &router_id)];
let metrics = component.metrics();
let requests_total = metrics
.create_intcounter(
&router_metric(frontend_service::REQUESTS_TOTAL),
"Total number of requests processed by the router",
extra_labels,
)
.expect("failed to create router_requests_total");
let time_to_first_token_seconds = metrics
.create_histogram(
&router_metric(frontend_service::TIME_TO_FIRST_TOKEN_SECONDS),
"Time to first token observed at the router",
extra_labels,
Some(generate_log_buckets(0.001, 480.0, 18)),
)
.expect("failed to create router_time_to_first_token_seconds");
let inter_token_latency_seconds = metrics
.create_histogram(
&router_metric(frontend_service::INTER_TOKEN_LATENCY_SECONDS),
"Average inter-token latency observed at the router",
extra_labels,
Some(generate_log_buckets(0.001, 2.0, 13)),
)
.expect("failed to create router_inter_token_latency_seconds");
let input_sequence_tokens = metrics
.create_histogram(
&router_metric(frontend_service::INPUT_SEQUENCE_TOKENS),
"Input sequence length in tokens observed at the router",
extra_labels,
Some(generate_log_buckets(50.0, 128000.0, 12)),
)
.expect("failed to create router_input_sequence_tokens");
let output_sequence_tokens = metrics
.create_histogram(
&router_metric(frontend_service::OUTPUT_SEQUENCE_TOKENS),
"Output sequence length in tokens observed at the router",
extra_labels,
Some(generate_log_buckets(50.0, 32000.0, 10)),
)
.expect("failed to create router_output_sequence_tokens");
let kv_hit_rate = metrics
.create_histogram(
&router_metric(frontend_service::KV_HIT_RATE),
"Predicted KV cache hit rate at routing time (0.0-1.0)",
extra_labels,
Some(prometheus::linear_buckets(0.0, 0.05, 21).unwrap()),
)
.expect("failed to create router_kv_hit_rate");
let kv_transfer_estimated_latency_seconds = metrics
.create_histogram(
&router_metric(frontend_service::KV_TRANSFER_ESTIMATED_LATENCY_SECONDS),
"Upper-bound estimation of KV cache transfer latency in disaggregated serving (prefill_complete to first_token)",
extra_labels,
Some(generate_log_buckets(0.001, 10.0, 15)),
)
.expect("failed to create router_kv_transfer_estimated_latency_seconds");
Arc::new(Self {
requests_total,
time_to_first_token_seconds,
inter_token_latency_seconds,
input_sequence_tokens,
output_sequence_tokens,
kv_hit_rate,
kv_transfer_estimated_latency_seconds,
})
})
.clone()
}
}
pub struct RemoteIndexerMetrics {
pub query_failures_total: prometheus::IntCounter,
pub write_failures_total: prometheus::IntCounter,
}
static REMOTE_INDEXER_METRICS: OnceLock<Arc<RemoteIndexerMetrics>> = OnceLock::new();
impl RemoteIndexerMetrics {
pub fn from_component(component: &Component) -> Arc<Self> {
REMOTE_INDEXER_METRICS
.get_or_init(|| {
let instance_id = component.drt().discovery().instance_id();
let router_id = instance_id.to_string();
let extra_labels: &[(&str, &str)] = &[(labels::ROUTER_ID, &router_id)];
let metrics = component.metrics();
let query_failures_total = metrics
.create_intcounter(
router::REMOTE_INDEXER_QUERY_FAILURES_TOTAL,
"Total number of remote indexer overlap queries that failed",
extra_labels,
)
.expect("failed to create router_remote_indexer_query_failures_total");
let write_failures_total = metrics
.create_intcounter(
router::REMOTE_INDEXER_WRITE_FAILURES_TOTAL,
"Total number of remote indexer routing-decision writes that failed",
extra_labels,
)
.expect("failed to create router_remote_indexer_write_failures_total");
Arc::new(Self {
query_failures_total,
write_failures_total,
})
})
.clone()
}
pub fn increment_query_failures(&self) {
self.query_failures_total.inc();
}
pub fn increment_write_failures(&self) {
self.write_failures_total.inc();
}
}
#[cfg(test)]
mod tests {
use super::*;
use prometheus::{Encoder, TextEncoder};
fn gather_pef(registry: &prometheus::Registry) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(®istry.gather(), &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
#[test]
fn test_worker_load_metrics_pef() {
let registry = prometheus::Registry::new();
let metrics = WorkerLoadMetrics {
active_decode_blocks: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_DECODE_BLOCKS
),
"Active KV cache decode blocks per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.unwrap(),
active_prefill_tokens: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_PREFILL_TOKENS
),
"Active prefill tokens queued per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.unwrap(),
};
registry
.register(Box::new(metrics.active_decode_blocks.clone()))
.unwrap();
registry
.register(Box::new(metrics.active_prefill_tokens.clone()))
.unwrap();
metrics.observe(123, 0, "decode", 42, 100);
let output = gather_pef(®istry);
let expected = "\
# HELP dynamo_frontend_worker_active_decode_blocks Active KV cache decode blocks per worker
# TYPE dynamo_frontend_worker_active_decode_blocks gauge
dynamo_frontend_worker_active_decode_blocks{dp_rank=\"0\",worker_id=\"123\",worker_type=\"decode\"} 42
# HELP dynamo_frontend_worker_active_prefill_tokens Active prefill tokens queued per worker
# TYPE dynamo_frontend_worker_active_prefill_tokens gauge
dynamo_frontend_worker_active_prefill_tokens{dp_rank=\"0\",worker_id=\"123\",worker_type=\"decode\"} 100
";
assert_eq!(
output, expected,
"\nActual PEF:\n{output}\nExpected PEF:\n{expected}"
);
}
#[test]
fn test_router_queue_metrics_pef() {
let registry = prometheus::Registry::new();
let metrics = RouterQueueMetrics {
pending_requests: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::ROUTER_QUEUE_PENDING_REQUESTS
),
"Number of requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.unwrap(),
pending_isl_tokens: IntGaugeVec::new(
Opts::new(
format!("{}_router_queue_pending_isl_tokens", name_prefix::FRONTEND),
"Sum of isl_tokens for requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.unwrap(),
};
registry
.register(Box::new(metrics.pending_requests.clone()))
.unwrap();
registry
.register(Box::new(metrics.pending_isl_tokens.clone()))
.unwrap();
metrics.set_pending("decode", 5);
metrics.set_pending_isl_tokens("decode", 1024);
let output = gather_pef(®istry);
let expected = "\
# HELP dynamo_frontend_router_queue_pending_isl_tokens Sum of isl_tokens for requests pending in the router scheduler queue
# TYPE dynamo_frontend_router_queue_pending_isl_tokens gauge
dynamo_frontend_router_queue_pending_isl_tokens{worker_type=\"decode\"} 1024
# HELP dynamo_frontend_router_queue_pending_requests Number of requests pending in the router scheduler queue
# TYPE dynamo_frontend_router_queue_pending_requests gauge
dynamo_frontend_router_queue_pending_requests{worker_type=\"decode\"} 5
";
assert_eq!(
output, expected,
"\nActual PEF:\n{output}\nExpected PEF:\n{expected}"
);
}
#[test]
fn test_routing_overhead_metric_names_pef() {
let registry = prometheus::Registry::new();
let buckets = async_overhead_buckets();
let prefix = name_prefix::ROUTER;
let name = format!("{}_{}", prefix, routing_overhead::TOTAL_MS);
let total = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
name,
"Total routing overhead per request in milliseconds",
)
.buckets(buckets),
)
.unwrap();
registry.register(Box::new(total.clone())).unwrap();
total.observe(1.5);
let output = gather_pef(®istry);
assert!(
output.contains("# HELP dynamo_router_overhead_total_ms"),
"PEF missing HELP for routing overhead metric"
);
assert!(
output.contains("# TYPE dynamo_router_overhead_total_ms histogram"),
"PEF missing TYPE for routing overhead metric"
);
assert!(
output.contains("dynamo_router_overhead_total_ms_count 1"),
"PEF missing observation count"
);
}
#[test]
fn test_routing_overhead_saturating_sub() {
let buckets = prometheus::exponential_buckets(0.0001, 2.0, 18).unwrap();
let make = |name: &str| {
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(name, "test").buckets(buckets.clone()),
)
.unwrap()
};
let metrics = RoutingOverheadMetrics {
block_hashing: make("test_block_hashing_ms"),
indexer_find_matches: make("test_find_matches_ms"),
seq_hashing: make("test_seq_hashing_ms"),
scheduling: make("test_scheduling_ms"),
total: make("test_total_ms"),
};
metrics.observe(
Duration::from_millis(10),
Duration::from_millis(5),
Duration::from_millis(3),
Duration::from_millis(1),
);
}
#[test]
fn test_kv_transfer_estimated_latency_metric_pef() {
let registry = prometheus::Registry::new();
let name = format!(
"{}{}",
router_request::METRIC_PREFIX,
frontend_service::KV_TRANSFER_ESTIMATED_LATENCY_SECONDS,
);
let buckets = generate_log_buckets(0.001, 10.0, 15);
let histogram = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
&name,
"Upper-bound estimation of KV cache transfer latency in disaggregated serving (prefill_complete to first_token)",
)
.buckets(buckets),
)
.unwrap();
registry.register(Box::new(histogram.clone())).unwrap();
histogram.observe(0.005);
let output = gather_pef(®istry);
assert!(
output.contains("# HELP router_kv_transfer_estimated_latency_seconds"),
"PEF missing HELP line. Got:\n{output}"
);
assert!(
output.contains("# TYPE router_kv_transfer_estimated_latency_seconds histogram"),
"PEF missing TYPE line. Got:\n{output}"
);
assert!(
output.contains("router_kv_transfer_estimated_latency_seconds_count 1"),
"PEF missing observation count. Got:\n{output}"
);
assert!(
output.contains("router_kv_transfer_estimated_latency_seconds_sum 0.005"),
"PEF missing observation sum. Got:\n{output}"
);
}
}