zlayer_agent/
metrics_providers.rs1use crate::cgroups_stats::ContainerStats;
8use crate::runtime::{ContainerId, Runtime};
9use crate::service::ServiceManager;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::Arc;
13use zlayer_scheduler::metrics::{
14 ContainerStatsProvider, MetricsContainerId, RawContainerStats, ServiceContainerProvider,
15};
16
17pub struct ServiceManagerContainerProvider {
38 manager: Arc<ServiceManager>,
39}
40
41impl ServiceManagerContainerProvider {
42 pub fn new(manager: Arc<ServiceManager>) -> Self {
44 Self { manager }
45 }
46}
47
48#[async_trait]
49impl ServiceContainerProvider for ServiceManagerContainerProvider {
50 async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
52 let container_ids = self.manager.get_service_containers(service_name).await;
54
55 container_ids
56 .into_iter()
57 .map(|id| MetricsContainerId {
58 service: id.service,
59 replica: id.replica,
60 })
61 .collect()
62 }
63
64 async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
66 let service_names = self.manager.list_services().await;
67 let mut result = HashMap::new();
68
69 for name in service_names {
70 let container_ids = self.get_container_ids(&name).await;
71 if !container_ids.is_empty() {
72 result.insert(name, container_ids);
73 }
74 }
75
76 result
77 }
78}
79
80pub struct RuntimeStatsProvider {
96 runtime: Arc<dyn Runtime + Send + Sync>,
97}
98
99impl RuntimeStatsProvider {
100 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
102 Self { runtime }
103 }
104}
105
106#[async_trait]
107impl ContainerStatsProvider for RuntimeStatsProvider {
108 async fn get_stats(&self, id: &MetricsContainerId) -> Result<RawContainerStats, String> {
110 let container_id = ContainerId::new(id.service.clone(), id.replica);
112
113 let stats = self
115 .runtime
116 .get_container_stats(&container_id)
117 .await
118 .map_err(|e| e.to_string())?;
119
120 Ok(container_stats_to_raw(&stats))
122 }
123}
124
125fn container_stats_to_raw(stats: &ContainerStats) -> RawContainerStats {
131 RawContainerStats {
132 cpu_usage_usec: stats.cpu_usage_usec,
133 memory_bytes: stats.memory_bytes,
134 memory_limit: stats.memory_limit,
135 timestamp: stats.timestamp,
136 }
137}
138
139#[cfg(test)]
140#[allow(deprecated)]
141mod tests {
142 use super::*;
143 use crate::runtime::MockRuntime;
144 use std::sync::Arc;
145
146 fn mock_spec() -> zlayer_spec::ServiceSpec {
147 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
148 r"
149version: v1
150deployment: test
151services:
152 test:
153 rtype: service
154 image:
155 name: test:latest
156 endpoints:
157 - name: http
158 protocol: http
159 port: 8080
160 scale:
161 mode: fixed
162 replicas: 1
163",
164 )
165 .unwrap()
166 .services
167 .remove("test")
168 .unwrap()
169 }
170
171 #[tokio::test]
172 async fn test_service_manager_container_provider_empty() {
173 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
174 let manager = Arc::new(ServiceManager::new(runtime));
175 let provider = ServiceManagerContainerProvider::new(manager);
176
177 let containers = provider.get_container_ids("nonexistent").await;
179 assert!(containers.is_empty());
180
181 let all = provider.get_all_services().await;
182 assert!(all.is_empty());
183 }
184
185 #[tokio::test]
186 async fn test_service_manager_container_provider_with_service() {
187 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
188 let manager = Arc::new(ServiceManager::new(runtime));
189
190 Box::pin(manager.upsert_service("api".to_string(), mock_spec()))
192 .await
193 .unwrap();
194 manager.scale_service("api", 3).await.unwrap();
195
196 let provider = ServiceManagerContainerProvider::new(manager);
197
198 let containers = provider.get_container_ids("api").await;
200 assert_eq!(containers.len(), 3);
201
202 for c in &containers {
204 assert_eq!(c.service, "api");
205 assert!(c.replica >= 1 && c.replica <= 3);
207 }
208
209 let all = provider.get_all_services().await;
211 assert_eq!(all.len(), 1);
212 assert!(all.contains_key("api"));
213 assert_eq!(all["api"].len(), 3);
214 }
215
216 #[tokio::test]
217 async fn test_runtime_stats_provider() {
218 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
219 let manager = Arc::new(ServiceManager::new(runtime.clone()));
220
221 Box::pin(manager.upsert_service("test".to_string(), mock_spec()))
223 .await
224 .unwrap();
225 manager.scale_service("test", 1).await.unwrap();
226
227 let stats_provider = RuntimeStatsProvider::new(runtime);
228
229 let id = MetricsContainerId {
230 service: "test".to_string(),
231 replica: 1,
232 };
233
234 let stats = stats_provider.get_stats(&id).await.unwrap();
236 assert_eq!(stats.cpu_usage_usec, 1_000_000);
237 assert_eq!(stats.memory_bytes, 50 * 1024 * 1024);
238 assert_eq!(stats.memory_limit, 256 * 1024 * 1024);
239 }
240
241 #[tokio::test]
242 async fn test_runtime_stats_provider_not_found() {
243 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
244 let stats_provider = RuntimeStatsProvider::new(runtime);
245
246 let id = MetricsContainerId {
247 service: "nonexistent".to_string(),
248 replica: 1,
249 };
250
251 let result = stats_provider.get_stats(&id).await;
253 assert!(result.is_err());
254 }
255
256 #[test]
257 fn test_container_stats_to_raw() {
258 use std::time::Instant;
259
260 let stats = ContainerStats {
261 cpu_usage_usec: 1_000_000,
262 memory_bytes: 100 * 1024 * 1024,
263 memory_limit: 256 * 1024 * 1024,
264 timestamp: Instant::now(),
265 };
266
267 let raw = container_stats_to_raw(&stats);
268
269 assert_eq!(raw.cpu_usage_usec, stats.cpu_usage_usec);
270 assert_eq!(raw.memory_bytes, stats.memory_bytes);
271 assert_eq!(raw.memory_limit, stats.memory_limit);
272 }
274}