dynamo_llm/kv_router/
metrics_aggregator.rs1pub use crate::kv_router::protocols::ForwardPassMetrics;
17use crate::kv_router::KV_METRICS_ENDPOINT;
18
19use crate::kv_router::scheduler::Endpoint;
20use crate::kv_router::ProcessedEndpoints;
21use dynamo_runtime::component::Component;
22use dynamo_runtime::{service::EndpointInfo, utils::Duration, Result};
23use tokio::sync::watch;
24use tokio_util::sync::CancellationToken;
25
26pub struct KvMetricsAggregator {
27 pub service_name: String,
28 pub endpoints_rx: watch::Receiver<ProcessedEndpoints>,
29}
30
31impl KvMetricsAggregator {
32 pub async fn new(component: Component, cancellation_token: CancellationToken) -> Self {
33 let (watch_tx, watch_rx) = watch::channel(ProcessedEndpoints::default());
34
35 tokio::spawn(collect_endpoints_task(
36 component.clone(),
37 watch_tx,
38 cancellation_token.clone(),
39 ));
40
41 Self {
42 service_name: component.service_name(),
43 endpoints_rx: watch_rx,
44 }
45 }
46
47 pub fn get_endpoints(&self) -> ProcessedEndpoints {
48 self.endpoints_rx.borrow().clone()
49 }
50
51 pub fn endpoints_watcher(&self) -> watch::Receiver<ProcessedEndpoints> {
52 self.endpoints_rx.clone()
53 }
54}
55
56pub async fn collect_endpoints(
61 component: &Component,
62 subject: &str,
63 timeout: Duration,
64) -> Result<Vec<EndpointInfo>> {
65 let stream = component.scrape_stats(timeout).await?;
67
68 let endpoints = stream
70 .into_endpoints()
71 .filter(|e| e.subject.starts_with(subject))
72 .collect::<Vec<_>>();
73 tracing::debug!("Endpoints: {endpoints:?}");
74
75 if endpoints.is_empty() {
76 tracing::warn!("No endpoints found matching subject {subject}");
77 }
78
79 Ok(endpoints)
80}
81
82pub async fn collect_endpoints_task(
83 component: Component,
84 watch_tx: watch::Sender<ProcessedEndpoints>,
85 cancel: CancellationToken,
86) {
87 let backoff_delay = Duration::from_millis(100);
88 let scrape_timeout = Duration::from_millis(300);
89 let endpoint = component.endpoint(KV_METRICS_ENDPOINT);
90 let service_subject = endpoint.subject();
91
92 loop {
93 tokio::select! {
94 _ = cancel.cancelled() => {
95 tracing::debug!("cancellation token triggered");
96 break;
97 }
98 _ = tokio::time::sleep(backoff_delay) => {
99 tracing::trace!("collecting endpoints for service: {}", service_subject);
100 let unfiltered_endpoints =
101 match collect_endpoints(&component, &service_subject, scrape_timeout).await
102 {
103 Ok(v) => v,
104 Err(e) => {
105 tracing::warn!("Failed to retrieve endpoints for {}: {:?}", service_subject, e);
106 continue;
107 }
108 };
109 tracing::debug!("unfiltered endpoints: {:?}", unfiltered_endpoints);
110
111 let endpoints: Vec<Endpoint> = unfiltered_endpoints
112 .into_iter()
113 .filter(|s| s.data.is_some())
114 .filter_map(|s|
115 match s.data.unwrap().decode::<ForwardPassMetrics>() {
116 Ok(data) => Some(Endpoint {
117 name: s.name,
118 subject: s.subject,
119 data,
120 }),
121 Err(e) => {
122 tracing::debug!("skip endpoint data that can't be parsed as ForwardPassMetrics: {:?}", e);
123 None
124 }
125 }
126 )
127 .collect();
128 tracing::debug!("endpoints: {:?}", endpoints);
129
130 tracing::trace!(
131 "found {} endpoints for service: {}",
132 endpoints.len(),
133 service_subject
134 );
135
136 let processed = ProcessedEndpoints::new(endpoints);
137
138 if watch_tx.send(processed).is_err() {
139 tracing::trace!("failed to send processed endpoints; shutting down");
140 break;
141 }
142 }
143 }
144 }
145}