zlayer_agent/
metrics_providers.rs1use crate::cgroups_stats::ContainerStats;
8use crate::runtime::{ContainerId, Runtime};
9use crate::service::ServiceManager;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::Arc;
13use zlayer_scheduler::metrics::{
14 ContainerStatsProvider, MetricsContainerId, RawContainerStats, ServiceContainerProvider,
15};
16
17pub struct ServiceManagerContainerProvider {
38 manager: Arc<ServiceManager>,
39}
40
41impl ServiceManagerContainerProvider {
42 pub fn new(manager: Arc<ServiceManager>) -> Self {
44 Self { manager }
45 }
46}
47
48#[async_trait]
49impl ServiceContainerProvider for ServiceManagerContainerProvider {
50 async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
52 let container_ids = self.manager.get_service_containers(service_name).await;
54
55 container_ids
56 .into_iter()
57 .map(|id| MetricsContainerId {
58 service: id.service,
59 replica: id.replica,
60 })
61 .collect()
62 }
63
64 async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
66 let service_names = self.manager.list_services().await;
67 let mut result = HashMap::new();
68
69 for name in service_names {
70 let container_ids = self.get_container_ids(&name).await;
71 if !container_ids.is_empty() {
72 result.insert(name, container_ids);
73 }
74 }
75
76 result
77 }
78}
79
80pub struct RuntimeStatsProvider {
96 runtime: Arc<dyn Runtime + Send + Sync>,
97}
98
99impl RuntimeStatsProvider {
100 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
102 Self { runtime }
103 }
104}
105
106#[async_trait]
107impl ContainerStatsProvider for RuntimeStatsProvider {
108 async fn get_stats(&self, id: &MetricsContainerId) -> Result<RawContainerStats, String> {
110 let container_id = ContainerId {
112 service: id.service.clone(),
113 replica: id.replica,
114 };
115
116 let stats = self
118 .runtime
119 .get_container_stats(&container_id)
120 .await
121 .map_err(|e| e.to_string())?;
122
123 Ok(container_stats_to_raw(&stats))
125 }
126}
127
128fn container_stats_to_raw(stats: &ContainerStats) -> RawContainerStats {
134 RawContainerStats {
135 cpu_usage_usec: stats.cpu_usage_usec,
136 memory_bytes: stats.memory_bytes,
137 memory_limit: stats.memory_limit,
138 timestamp: stats.timestamp,
139 }
140}
141
142#[cfg(test)]
143#[allow(deprecated)]
144mod tests {
145 use super::*;
146 use crate::runtime::MockRuntime;
147 use std::sync::Arc;
148
149 fn mock_spec() -> zlayer_spec::ServiceSpec {
150 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
151 r"
152version: v1
153deployment: test
154services:
155 test:
156 rtype: service
157 image:
158 name: test:latest
159 endpoints:
160 - name: http
161 protocol: http
162 port: 8080
163 scale:
164 mode: fixed
165 replicas: 1
166",
167 )
168 .unwrap()
169 .services
170 .remove("test")
171 .unwrap()
172 }
173
174 #[tokio::test]
175 async fn test_service_manager_container_provider_empty() {
176 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
177 let manager = Arc::new(ServiceManager::new(runtime));
178 let provider = ServiceManagerContainerProvider::new(manager);
179
180 let containers = provider.get_container_ids("nonexistent").await;
182 assert!(containers.is_empty());
183
184 let all = provider.get_all_services().await;
185 assert!(all.is_empty());
186 }
187
188 #[tokio::test]
189 async fn test_service_manager_container_provider_with_service() {
190 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
191 let manager = Arc::new(ServiceManager::new(runtime));
192
193 Box::pin(manager.upsert_service("api".to_string(), mock_spec()))
195 .await
196 .unwrap();
197 manager.scale_service("api", 3).await.unwrap();
198
199 let provider = ServiceManagerContainerProvider::new(manager);
200
201 let containers = provider.get_container_ids("api").await;
203 assert_eq!(containers.len(), 3);
204
205 for c in &containers {
207 assert_eq!(c.service, "api");
208 assert!(c.replica >= 1 && c.replica <= 3);
210 }
211
212 let all = provider.get_all_services().await;
214 assert_eq!(all.len(), 1);
215 assert!(all.contains_key("api"));
216 assert_eq!(all["api"].len(), 3);
217 }
218
219 #[tokio::test]
220 async fn test_runtime_stats_provider() {
221 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
222 let manager = Arc::new(ServiceManager::new(runtime.clone()));
223
224 Box::pin(manager.upsert_service("test".to_string(), mock_spec()))
226 .await
227 .unwrap();
228 manager.scale_service("test", 1).await.unwrap();
229
230 let stats_provider = RuntimeStatsProvider::new(runtime);
231
232 let id = MetricsContainerId {
233 service: "test".to_string(),
234 replica: 1,
235 };
236
237 let stats = stats_provider.get_stats(&id).await.unwrap();
239 assert_eq!(stats.cpu_usage_usec, 1_000_000);
240 assert_eq!(stats.memory_bytes, 50 * 1024 * 1024);
241 assert_eq!(stats.memory_limit, 256 * 1024 * 1024);
242 }
243
244 #[tokio::test]
245 async fn test_runtime_stats_provider_not_found() {
246 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
247 let stats_provider = RuntimeStatsProvider::new(runtime);
248
249 let id = MetricsContainerId {
250 service: "nonexistent".to_string(),
251 replica: 1,
252 };
253
254 let result = stats_provider.get_stats(&id).await;
256 assert!(result.is_err());
257 }
258
259 #[test]
260 fn test_container_stats_to_raw() {
261 use std::time::Instant;
262
263 let stats = ContainerStats {
264 cpu_usage_usec: 1_000_000,
265 memory_bytes: 100 * 1024 * 1024,
266 memory_limit: 256 * 1024 * 1024,
267 timestamp: Instant::now(),
268 };
269
270 let raw = container_stats_to_raw(&stats);
271
272 assert_eq!(raw.cpu_usage_usec, stats.cpu_usage_usec);
273 assert_eq!(raw.memory_bytes, stats.memory_bytes);
274 assert_eq!(raw.memory_limit, stats.memory_limit);
275 }
277}