Skip to main content

zlayer_agent/
metrics_providers.rs

1//! Metrics provider implementations for autoscaling
2//!
3//! This module provides implementations of the scheduler's metrics traits
4//! that bridge the agent crate's `ServiceManager` and Runtime with the
5//! scheduler crate's `CgroupsMetricsSource`.
6
7use crate::cgroups_stats::ContainerStats;
8use crate::runtime::{ContainerId, Runtime};
9use crate::service::ServiceManager;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::Arc;
13use zlayer_scheduler::metrics::{
14    ContainerStatsProvider, MetricsContainerId, RawContainerStats, ServiceContainerProvider,
15};
16
17/// Provides container IDs for services from `ServiceManager`
18///
19/// Implements the scheduler's `ServiceContainerProvider` trait to
20/// bridge `ServiceManager` with `CgroupsMetricsSource`.
21///
22/// # Example
23///
24/// ```ignore
25/// use zlayer_agent::metrics_providers::ServiceManagerContainerProvider;
26/// use zlayer_agent::ServiceManager;
27/// use zlayer_scheduler::metrics::CgroupsMetricsSource;
28/// use std::sync::Arc;
29///
30/// let manager = Arc::new(ServiceManager::new(runtime));
31/// let provider = Arc::new(ServiceManagerContainerProvider::new(manager.clone()));
32///
33/// // Use with CgroupsMetricsSource
34/// let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
35/// let source = CgroupsMetricsSource::new(provider, stats_provider);
36/// ```
37pub struct ServiceManagerContainerProvider {
38    manager: Arc<ServiceManager>,
39}
40
41impl ServiceManagerContainerProvider {
42    /// Create a new provider wrapping a `ServiceManager`
43    pub fn new(manager: Arc<ServiceManager>) -> Self {
44        Self { manager }
45    }
46}
47
48#[async_trait]
49impl ServiceContainerProvider for ServiceManagerContainerProvider {
50    /// Get all container IDs for a given service
51    async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
52        // Get container IDs from ServiceManager
53        let container_ids = self.manager.get_service_containers(service_name).await;
54
55        container_ids
56            .into_iter()
57            .map(|id| MetricsContainerId {
58                service: id.service,
59                replica: id.replica,
60            })
61            .collect()
62    }
63
64    /// Get all services and their containers
65    async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
66        let service_names = self.manager.list_services().await;
67        let mut result = HashMap::new();
68
69        for name in service_names {
70            let container_ids = self.get_container_ids(&name).await;
71            if !container_ids.is_empty() {
72                result.insert(name, container_ids);
73            }
74        }
75
76        result
77    }
78}
79
80/// Provides container statistics from the Runtime
81///
82/// Implements the scheduler's `ContainerStatsProvider` trait to
83/// bridge the Runtime's `get_container_stats` with `CgroupsMetricsSource`.
84///
85/// # Example
86///
87/// ```ignore
88/// use zlayer_agent::metrics_providers::RuntimeStatsProvider;
89/// use zlayer_agent::YoukiRuntime;
90/// use std::sync::Arc;
91///
92/// let runtime = Arc::new(YoukiRuntime::with_defaults().await?);
93/// let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
94/// ```
95pub struct RuntimeStatsProvider {
96    runtime: Arc<dyn Runtime + Send + Sync>,
97}
98
99impl RuntimeStatsProvider {
100    /// Create a new stats provider wrapping a Runtime
101    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
102        Self { runtime }
103    }
104}
105
106#[async_trait]
107impl ContainerStatsProvider for RuntimeStatsProvider {
108    /// Get raw container statistics from cgroups
109    async fn get_stats(&self, id: &MetricsContainerId) -> Result<RawContainerStats, String> {
110        // Convert MetricsContainerId to ContainerId
111        let container_id = ContainerId {
112            service: id.service.clone(),
113            replica: id.replica,
114        };
115
116        // Get stats from Runtime
117        let stats = self
118            .runtime
119            .get_container_stats(&container_id)
120            .await
121            .map_err(|e| e.to_string())?;
122
123        // Convert ContainerStats to RawContainerStats
124        Ok(container_stats_to_raw(&stats))
125    }
126}
127
128/// Convert agent's `ContainerStats` to scheduler's `RawContainerStats`
129///
130/// This function bridges the two stats types, allowing the agent crate
131/// to not depend on scheduler for its core types while still providing
132/// the necessary data for metrics collection.
133fn container_stats_to_raw(stats: &ContainerStats) -> RawContainerStats {
134    RawContainerStats {
135        cpu_usage_usec: stats.cpu_usage_usec,
136        memory_bytes: stats.memory_bytes,
137        memory_limit: stats.memory_limit,
138        timestamp: stats.timestamp,
139    }
140}
141
142#[cfg(test)]
143#[allow(deprecated)]
144mod tests {
145    use super::*;
146    use crate::runtime::MockRuntime;
147    use std::sync::Arc;
148
149    fn mock_spec() -> zlayer_spec::ServiceSpec {
150        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
151            r"
152version: v1
153deployment: test
154services:
155  test:
156    rtype: service
157    image:
158      name: test:latest
159    endpoints:
160      - name: http
161        protocol: http
162        port: 8080
163    scale:
164      mode: fixed
165      replicas: 1
166",
167        )
168        .unwrap()
169        .services
170        .remove("test")
171        .unwrap()
172    }
173
174    #[tokio::test]
175    async fn test_service_manager_container_provider_empty() {
176        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
177        let manager = Arc::new(ServiceManager::new(runtime));
178        let provider = ServiceManagerContainerProvider::new(manager);
179
180        // No services registered yet
181        let containers = provider.get_container_ids("nonexistent").await;
182        assert!(containers.is_empty());
183
184        let all = provider.get_all_services().await;
185        assert!(all.is_empty());
186    }
187
188    #[tokio::test]
189    async fn test_service_manager_container_provider_with_service() {
190        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
191        let manager = Arc::new(ServiceManager::new(runtime));
192
193        // Add and scale a service
194        manager
195            .upsert_service("api".to_string(), mock_spec())
196            .await
197            .unwrap();
198        manager.scale_service("api", 3).await.unwrap();
199
200        let provider = ServiceManagerContainerProvider::new(manager);
201
202        // Should have 3 containers
203        let containers = provider.get_container_ids("api").await;
204        assert_eq!(containers.len(), 3);
205
206        // Verify container IDs
207        for c in &containers {
208            assert_eq!(c.service, "api");
209            // Replicas are 1-indexed
210            assert!(c.replica >= 1 && c.replica <= 3);
211        }
212
213        // Test get_all_services
214        let all = provider.get_all_services().await;
215        assert_eq!(all.len(), 1);
216        assert!(all.contains_key("api"));
217        assert_eq!(all["api"].len(), 3);
218    }
219
220    #[tokio::test]
221    async fn test_runtime_stats_provider() {
222        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
223        let manager = Arc::new(ServiceManager::new(runtime.clone()));
224
225        // Add and scale a service
226        manager
227            .upsert_service("test".to_string(), mock_spec())
228            .await
229            .unwrap();
230        manager.scale_service("test", 1).await.unwrap();
231
232        let stats_provider = RuntimeStatsProvider::new(runtime);
233
234        let id = MetricsContainerId {
235            service: "test".to_string(),
236            replica: 1,
237        };
238
239        // MockRuntime returns predefined stats
240        let stats = stats_provider.get_stats(&id).await.unwrap();
241        assert_eq!(stats.cpu_usage_usec, 1_000_000);
242        assert_eq!(stats.memory_bytes, 50 * 1024 * 1024);
243        assert_eq!(stats.memory_limit, 256 * 1024 * 1024);
244    }
245
246    #[tokio::test]
247    async fn test_runtime_stats_provider_not_found() {
248        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
249        let stats_provider = RuntimeStatsProvider::new(runtime);
250
251        let id = MetricsContainerId {
252            service: "nonexistent".to_string(),
253            replica: 1,
254        };
255
256        // Should return error for non-existent container
257        let result = stats_provider.get_stats(&id).await;
258        assert!(result.is_err());
259    }
260
261    #[test]
262    fn test_container_stats_to_raw() {
263        use std::time::Instant;
264
265        let stats = ContainerStats {
266            cpu_usage_usec: 1_000_000,
267            memory_bytes: 100 * 1024 * 1024,
268            memory_limit: 256 * 1024 * 1024,
269            timestamp: Instant::now(),
270        };
271
272        let raw = container_stats_to_raw(&stats);
273
274        assert_eq!(raw.cpu_usage_usec, stats.cpu_usage_usec);
275        assert_eq!(raw.memory_bytes, stats.memory_bytes);
276        assert_eq!(raw.memory_limit, stats.memory_limit);
277        // Note: Instant comparison is tricky, but they should be very close
278    }
279}