dynamo_llm/kv_router/
metrics_aggregator.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16pub use crate::kv_router::protocols::ForwardPassMetrics;
17use crate::kv_router::KV_METRICS_ENDPOINT;
18
19use crate::kv_router::scheduler::Endpoint;
20use crate::kv_router::ProcessedEndpoints;
21use dynamo_runtime::component::Component;
22use dynamo_runtime::{service::EndpointInfo, utils::Duration, Result};
23use tokio::sync::watch;
24use tokio_util::sync::CancellationToken;
25
26pub struct KvMetricsAggregator {
27    pub service_name: String,
28    pub endpoints_rx: watch::Receiver<ProcessedEndpoints>,
29}
30
31impl KvMetricsAggregator {
32    pub async fn new(component: Component, cancellation_token: CancellationToken) -> Self {
33        let (watch_tx, watch_rx) = watch::channel(ProcessedEndpoints::default());
34
35        tokio::spawn(collect_endpoints_task(
36            component.clone(),
37            watch_tx,
38            cancellation_token.clone(),
39        ));
40
41        Self {
42            service_name: component.service_name(),
43            endpoints_rx: watch_rx,
44        }
45    }
46
47    pub fn get_endpoints(&self) -> ProcessedEndpoints {
48        self.endpoints_rx.borrow().clone()
49    }
50
51    pub fn endpoints_watcher(&self) -> watch::Receiver<ProcessedEndpoints> {
52        self.endpoints_rx.clone()
53    }
54}
55
56/// [gluo TODO] 'collect_endpoints' is from component/metrics,
57/// should consolidate these functions into generic metrics aggregator
58/// functions and shared by KvMetricsAggregator and component/metrics.
59/// Collect endpoints from a component
60pub async fn collect_endpoints(
61    component: &Component,
62    subject: &str,
63    timeout: Duration,
64) -> Result<Vec<EndpointInfo>> {
65    // Collect stats from each backend
66    let stream = component.scrape_stats(timeout).await?;
67
68    // Filter the stats by the service subject
69    let endpoints = stream
70        .into_endpoints()
71        .filter(|e| e.subject.starts_with(subject))
72        .collect::<Vec<_>>();
73    tracing::debug!("Endpoints: {endpoints:?}");
74
75    if endpoints.is_empty() {
76        tracing::warn!("No endpoints found matching subject {subject}");
77    }
78
79    Ok(endpoints)
80}
81
82pub async fn collect_endpoints_task(
83    component: Component,
84    watch_tx: watch::Sender<ProcessedEndpoints>,
85    cancel: CancellationToken,
86) {
87    let backoff_delay = Duration::from_millis(100);
88    let scrape_timeout = Duration::from_millis(300);
89    let endpoint = component.endpoint(KV_METRICS_ENDPOINT);
90    let service_subject = endpoint.subject();
91
92    loop {
93        tokio::select! {
94            _ = cancel.cancelled() => {
95                tracing::debug!("cancellation token triggered");
96                break;
97            }
98            _ = tokio::time::sleep(backoff_delay) => {
99                tracing::trace!("collecting endpoints for service: {}", service_subject);
100                let unfiltered_endpoints =
101                    match collect_endpoints(&component, &service_subject, scrape_timeout).await
102                    {
103                        Ok(v) => v,
104                        Err(e) => {
105                            tracing::warn!("Failed to retrieve endpoints for {}: {:?}", service_subject, e);
106                            continue;
107                        }
108                    };
109                tracing::debug!("unfiltered endpoints: {:?}", unfiltered_endpoints);
110
111                let endpoints: Vec<Endpoint> = unfiltered_endpoints
112                    .into_iter()
113                    .filter(|s| s.data.is_some())
114                    .filter_map(|s|
115                        match s.data.unwrap().decode::<ForwardPassMetrics>() {
116                            Ok(data) => Some(Endpoint {
117                                name: s.name,
118                                subject: s.subject,
119                                data,
120                            }),
121                            Err(e) => {
122                                tracing::debug!("skip endpoint data that can't be parsed as ForwardPassMetrics: {:?}", e);
123                                None
124                            }
125                        }
126                    )
127                    .collect();
128                tracing::debug!("endpoints: {:?}", endpoints);
129
130                tracing::trace!(
131                    "found {} endpoints for service: {}",
132                    endpoints.len(),
133                    service_subject
134                );
135
136                let processed = ProcessedEndpoints::new(endpoints);
137
138                if watch_tx.send(processed).is_err() {
139                    tracing::trace!("failed to send processed endpoints; shutting down");
140                    break;
141                }
142            }
143        }
144    }
145}