Skip to main content

zlayer_agent/
metrics_providers.rs

1//! Metrics provider implementations for autoscaling
2//!
3//! This module provides implementations of the scheduler's metrics traits
4//! that bridge the agent crate's `ServiceManager` and Runtime with the
5//! scheduler crate's `CgroupsMetricsSource`.
6
7use crate::cgroups_stats::ContainerStats;
8use crate::runtime::{ContainerId, Runtime};
9use crate::service::ServiceManager;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use zlayer_scheduler::metrics::{
15    ContainerStatsProvider, MetricsContainerId, RawContainerStats, ServiceContainerProvider,
16};
17
18/// Provides container IDs for services from `ServiceManager`
19///
20/// Implements the scheduler's `ServiceContainerProvider` trait to
21/// bridge `ServiceManager` with `CgroupsMetricsSource`.
22///
23/// # Example
24///
25/// ```ignore
26/// use zlayer_agent::metrics_providers::ServiceManagerContainerProvider;
27/// use zlayer_agent::ServiceManager;
28/// use zlayer_scheduler::metrics::CgroupsMetricsSource;
29/// use std::sync::Arc;
30///
31/// let manager = Arc::new(ServiceManager::new(runtime));
32/// let provider = Arc::new(ServiceManagerContainerProvider::new(manager.clone()));
33///
34/// // Use with CgroupsMetricsSource
35/// let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
36/// let source = CgroupsMetricsSource::new(provider, stats_provider);
37/// ```
38pub struct ServiceManagerContainerProvider {
39    manager: Arc<ServiceManager>,
40}
41
42impl ServiceManagerContainerProvider {
43    /// Create a new provider wrapping a `ServiceManager`
44    pub fn new(manager: Arc<ServiceManager>) -> Self {
45        Self { manager }
46    }
47}
48
49#[async_trait]
50impl ServiceContainerProvider for ServiceManagerContainerProvider {
51    /// Get all container IDs for a given service
52    async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
53        // Get container IDs from ServiceManager
54        let container_ids = self.manager.get_service_containers(service_name).await;
55
56        container_ids
57            .into_iter()
58            .map(|id| MetricsContainerId {
59                service: id.service,
60                replica: id.replica,
61            })
62            .collect()
63    }
64
65    /// Get all services and their containers
66    async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
67        let service_names = self.manager.list_services().await;
68        let mut result = HashMap::new();
69
70        for name in service_names {
71            let container_ids = self.get_container_ids(&name).await;
72            if !container_ids.is_empty() {
73                result.insert(name, container_ids);
74            }
75        }
76
77        result
78    }
79}
80
81/// Provides container IDs for services from a lock-wrapped `ServiceManager`.
82///
83/// Identical to [`ServiceManagerContainerProvider`] but holds the daemon's
84/// post-`Arc::try_unwrap` `Arc<RwLock<ServiceManager>>` (the shape the running
85/// daemon owns). Each query takes a short read guard and forwards to the same
86/// `ServiceManager` accessors, so the cgroups metrics source works unchanged.
87pub struct LockedServiceManagerContainerProvider {
88    manager: Arc<RwLock<ServiceManager>>,
89}
90
91impl LockedServiceManagerContainerProvider {
92    /// Create a provider wrapping a lock-guarded `ServiceManager`.
93    #[must_use]
94    pub fn new(manager: Arc<RwLock<ServiceManager>>) -> Self {
95        Self { manager }
96    }
97}
98
99#[async_trait]
100impl ServiceContainerProvider for LockedServiceManagerContainerProvider {
101    async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
102        let container_ids = self
103            .manager
104            .read()
105            .await
106            .get_service_containers(service_name)
107            .await;
108
109        container_ids
110            .into_iter()
111            .map(|id| MetricsContainerId {
112                service: id.service,
113                replica: id.replica,
114            })
115            .collect()
116    }
117
118    async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
119        let service_names = self.manager.read().await.list_services().await;
120        let mut result = HashMap::new();
121
122        for name in service_names {
123            let container_ids = self.get_container_ids(&name).await;
124            if !container_ids.is_empty() {
125                result.insert(name, container_ids);
126            }
127        }
128
129        result
130    }
131}
132
133/// Provides container statistics from the Runtime
134///
135/// Implements the scheduler's `ContainerStatsProvider` trait to
136/// bridge the Runtime's `get_container_stats` with `CgroupsMetricsSource`.
137///
138/// # Example
139///
140/// ```ignore
141/// use zlayer_agent::metrics_providers::RuntimeStatsProvider;
142/// use zlayer_agent::YoukiRuntime;
143/// use std::sync::Arc;
144///
145/// let runtime = Arc::new(YoukiRuntime::with_defaults().await?);
146/// let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
147/// ```
148pub struct RuntimeStatsProvider {
149    runtime: Arc<dyn Runtime + Send + Sync>,
150}
151
152impl RuntimeStatsProvider {
153    /// Create a new stats provider wrapping a Runtime
154    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
155        Self { runtime }
156    }
157}
158
159#[async_trait]
160impl ContainerStatsProvider for RuntimeStatsProvider {
161    /// Get raw container statistics from cgroups
162    async fn get_stats(&self, id: &MetricsContainerId) -> Result<RawContainerStats, String> {
163        // Convert MetricsContainerId to ContainerId
164        let container_id = ContainerId::new(id.service.clone(), id.replica);
165
166        // Get stats from Runtime
167        let stats = self
168            .runtime
169            .get_container_stats(&container_id)
170            .await
171            .map_err(|e| e.to_string())?;
172
173        // Convert ContainerStats to RawContainerStats
174        Ok(container_stats_to_raw(&stats))
175    }
176}
177
178/// Convert agent's `ContainerStats` to scheduler's `RawContainerStats`
179///
180/// This function bridges the two stats types, allowing the agent crate
181/// to not depend on scheduler for its core types while still providing
182/// the necessary data for metrics collection.
183fn container_stats_to_raw(stats: &ContainerStats) -> RawContainerStats {
184    RawContainerStats {
185        cpu_usage_usec: stats.cpu_usage_usec,
186        memory_bytes: stats.memory_bytes,
187        memory_limit: stats.memory_limit,
188        timestamp: stats.timestamp,
189    }
190}
191
192#[cfg(test)]
193#[allow(deprecated)]
194mod tests {
195    use super::*;
196    use crate::runtime::MockRuntime;
197    use std::sync::Arc;
198
199    fn mock_spec() -> zlayer_spec::ServiceSpec {
200        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
201            r"
202version: v1
203deployment: test
204services:
205  test:
206    rtype: service
207    image:
208      name: test:latest
209    endpoints:
210      - name: http
211        protocol: http
212        port: 8080
213    scale:
214      mode: fixed
215      replicas: 1
216",
217        )
218        .unwrap()
219        .services
220        .remove("test")
221        .unwrap()
222    }
223
224    #[tokio::test]
225    async fn test_service_manager_container_provider_empty() {
226        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
227        let manager = Arc::new(ServiceManager::new(runtime));
228        let provider = ServiceManagerContainerProvider::new(manager);
229
230        // No services registered yet
231        let containers = provider.get_container_ids("nonexistent").await;
232        assert!(containers.is_empty());
233
234        let all = provider.get_all_services().await;
235        assert!(all.is_empty());
236    }
237
238    #[tokio::test]
239    async fn test_service_manager_container_provider_with_service() {
240        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
241        let manager = Arc::new(ServiceManager::new(runtime));
242
243        // Add and scale a service
244        Box::pin(manager.upsert_service("api".to_string(), mock_spec()))
245            .await
246            .unwrap();
247        manager.scale_service("api", 3).await.unwrap();
248
249        let provider = ServiceManagerContainerProvider::new(manager);
250
251        // Should have 3 containers
252        let containers = provider.get_container_ids("api").await;
253        assert_eq!(containers.len(), 3);
254
255        // Verify container IDs
256        for c in &containers {
257            assert_eq!(c.service, "api");
258            // Replicas are 1-indexed
259            assert!(c.replica >= 1 && c.replica <= 3);
260        }
261
262        // Test get_all_services
263        let all = provider.get_all_services().await;
264        assert_eq!(all.len(), 1);
265        assert!(all.contains_key("api"));
266        assert_eq!(all["api"].len(), 3);
267    }
268
269    #[tokio::test]
270    async fn test_runtime_stats_provider() {
271        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
272        let manager = Arc::new(ServiceManager::new(runtime.clone()));
273
274        // Add and scale a service
275        Box::pin(manager.upsert_service("test".to_string(), mock_spec()))
276            .await
277            .unwrap();
278        manager.scale_service("test", 1).await.unwrap();
279
280        let stats_provider = RuntimeStatsProvider::new(runtime);
281
282        let id = MetricsContainerId {
283            service: "test".to_string(),
284            replica: 1,
285        };
286
287        // MockRuntime returns predefined stats
288        let stats = stats_provider.get_stats(&id).await.unwrap();
289        assert_eq!(stats.cpu_usage_usec, 1_000_000);
290        assert_eq!(stats.memory_bytes, 50 * 1024 * 1024);
291        assert_eq!(stats.memory_limit, 256 * 1024 * 1024);
292    }
293
294    #[tokio::test]
295    async fn test_runtime_stats_provider_not_found() {
296        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
297        let stats_provider = RuntimeStatsProvider::new(runtime);
298
299        let id = MetricsContainerId {
300            service: "nonexistent".to_string(),
301            replica: 1,
302        };
303
304        // Should return error for non-existent container
305        let result = stats_provider.get_stats(&id).await;
306        assert!(result.is_err());
307    }
308
309    #[test]
310    fn test_container_stats_to_raw() {
311        use std::time::Instant;
312
313        let stats = ContainerStats {
314            cpu_usage_usec: 1_000_000,
315            memory_bytes: 100 * 1024 * 1024,
316            memory_limit: 256 * 1024 * 1024,
317            timestamp: Instant::now(),
318        };
319
320        let raw = container_stats_to_raw(&stats);
321
322        assert_eq!(raw.cpu_usage_usec, stats.cpu_usage_usec);
323        assert_eq!(raw.memory_bytes, stats.memory_bytes);
324        assert_eq!(raw.memory_limit, stats.memory_limit);
325        // Note: Instant comparison is tricky, but they should be very close
326    }
327}