zlayer_agent/
metrics_providers.rs1use crate::cgroups_stats::ContainerStats;
8use crate::runtime::{ContainerId, Runtime};
9use crate::service::ServiceManager;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use zlayer_scheduler::metrics::{
15 ContainerStatsProvider, MetricsContainerId, RawContainerStats, ServiceContainerProvider,
16};
17
18pub struct ServiceManagerContainerProvider {
39 manager: Arc<ServiceManager>,
40}
41
42impl ServiceManagerContainerProvider {
43 pub fn new(manager: Arc<ServiceManager>) -> Self {
45 Self { manager }
46 }
47}
48
49#[async_trait]
50impl ServiceContainerProvider for ServiceManagerContainerProvider {
51 async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
53 let container_ids = self.manager.get_service_containers(service_name).await;
55
56 container_ids
57 .into_iter()
58 .map(|id| MetricsContainerId {
59 service: id.service,
60 replica: id.replica,
61 })
62 .collect()
63 }
64
65 async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
67 let service_names = self.manager.list_services().await;
68 let mut result = HashMap::new();
69
70 for name in service_names {
71 let container_ids = self.get_container_ids(&name).await;
72 if !container_ids.is_empty() {
73 result.insert(name, container_ids);
74 }
75 }
76
77 result
78 }
79}
80
81pub struct LockedServiceManagerContainerProvider {
88 manager: Arc<RwLock<ServiceManager>>,
89}
90
91impl LockedServiceManagerContainerProvider {
92 #[must_use]
94 pub fn new(manager: Arc<RwLock<ServiceManager>>) -> Self {
95 Self { manager }
96 }
97}
98
99#[async_trait]
100impl ServiceContainerProvider for LockedServiceManagerContainerProvider {
101 async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
102 let container_ids = self
103 .manager
104 .read()
105 .await
106 .get_service_containers(service_name)
107 .await;
108
109 container_ids
110 .into_iter()
111 .map(|id| MetricsContainerId {
112 service: id.service,
113 replica: id.replica,
114 })
115 .collect()
116 }
117
118 async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
119 let service_names = self.manager.read().await.list_services().await;
120 let mut result = HashMap::new();
121
122 for name in service_names {
123 let container_ids = self.get_container_ids(&name).await;
124 if !container_ids.is_empty() {
125 result.insert(name, container_ids);
126 }
127 }
128
129 result
130 }
131}
132
133pub struct RuntimeStatsProvider {
149 runtime: Arc<dyn Runtime + Send + Sync>,
150}
151
152impl RuntimeStatsProvider {
153 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
155 Self { runtime }
156 }
157}
158
159#[async_trait]
160impl ContainerStatsProvider for RuntimeStatsProvider {
161 async fn get_stats(&self, id: &MetricsContainerId) -> Result<RawContainerStats, String> {
163 let container_id = ContainerId::new(id.service.clone(), id.replica);
165
166 let stats = self
168 .runtime
169 .get_container_stats(&container_id)
170 .await
171 .map_err(|e| e.to_string())?;
172
173 Ok(container_stats_to_raw(&stats))
175 }
176}
177
178fn container_stats_to_raw(stats: &ContainerStats) -> RawContainerStats {
184 RawContainerStats {
185 cpu_usage_usec: stats.cpu_usage_usec,
186 memory_bytes: stats.memory_bytes,
187 memory_limit: stats.memory_limit,
188 timestamp: stats.timestamp,
189 }
190}
191
192#[cfg(test)]
193#[allow(deprecated)]
194mod tests {
195 use super::*;
196 use crate::runtime::MockRuntime;
197 use std::sync::Arc;
198
199 fn mock_spec() -> zlayer_spec::ServiceSpec {
200 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
201 r"
202version: v1
203deployment: test
204services:
205 test:
206 rtype: service
207 image:
208 name: test:latest
209 endpoints:
210 - name: http
211 protocol: http
212 port: 8080
213 scale:
214 mode: fixed
215 replicas: 1
216",
217 )
218 .unwrap()
219 .services
220 .remove("test")
221 .unwrap()
222 }
223
224 #[tokio::test]
225 async fn test_service_manager_container_provider_empty() {
226 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
227 let manager = Arc::new(ServiceManager::new(runtime));
228 let provider = ServiceManagerContainerProvider::new(manager);
229
230 let containers = provider.get_container_ids("nonexistent").await;
232 assert!(containers.is_empty());
233
234 let all = provider.get_all_services().await;
235 assert!(all.is_empty());
236 }
237
238 #[tokio::test]
239 async fn test_service_manager_container_provider_with_service() {
240 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
241 let manager = Arc::new(ServiceManager::new(runtime));
242
243 Box::pin(manager.upsert_service("api".to_string(), mock_spec()))
245 .await
246 .unwrap();
247 manager.scale_service("api", 3).await.unwrap();
248
249 let provider = ServiceManagerContainerProvider::new(manager);
250
251 let containers = provider.get_container_ids("api").await;
253 assert_eq!(containers.len(), 3);
254
255 for c in &containers {
257 assert_eq!(c.service, "api");
258 assert!(c.replica >= 1 && c.replica <= 3);
260 }
261
262 let all = provider.get_all_services().await;
264 assert_eq!(all.len(), 1);
265 assert!(all.contains_key("api"));
266 assert_eq!(all["api"].len(), 3);
267 }
268
269 #[tokio::test]
270 async fn test_runtime_stats_provider() {
271 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
272 let manager = Arc::new(ServiceManager::new(runtime.clone()));
273
274 Box::pin(manager.upsert_service("test".to_string(), mock_spec()))
276 .await
277 .unwrap();
278 manager.scale_service("test", 1).await.unwrap();
279
280 let stats_provider = RuntimeStatsProvider::new(runtime);
281
282 let id = MetricsContainerId {
283 service: "test".to_string(),
284 replica: 1,
285 };
286
287 let stats = stats_provider.get_stats(&id).await.unwrap();
289 assert_eq!(stats.cpu_usage_usec, 1_000_000);
290 assert_eq!(stats.memory_bytes, 50 * 1024 * 1024);
291 assert_eq!(stats.memory_limit, 256 * 1024 * 1024);
292 }
293
294 #[tokio::test]
295 async fn test_runtime_stats_provider_not_found() {
296 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
297 let stats_provider = RuntimeStatsProvider::new(runtime);
298
299 let id = MetricsContainerId {
300 service: "nonexistent".to_string(),
301 replica: 1,
302 };
303
304 let result = stats_provider.get_stats(&id).await;
306 assert!(result.is_err());
307 }
308
309 #[test]
310 fn test_container_stats_to_raw() {
311 use std::time::Instant;
312
313 let stats = ContainerStats {
314 cpu_usage_usec: 1_000_000,
315 memory_bytes: 100 * 1024 * 1024,
316 memory_limit: 256 * 1024 * 1024,
317 timestamp: Instant::now(),
318 };
319
320 let raw = container_stats_to_raw(&stats);
321
322 assert_eq!(raw.cpu_usage_usec, stats.cpu_usage_usec);
323 assert_eq!(raw.memory_bytes, stats.memory_bytes);
324 assert_eq!(raw.memory_limit, stats.memory_limit);
325 }
327}