Skip to main content

zlayer_agent/
metrics_providers.rs

1//! Metrics provider implementations for autoscaling
2//!
3//! This module provides implementations of the scheduler's metrics traits
4//! that bridge the agent crate's `ServiceManager` and Runtime with the
5//! scheduler crate's `CgroupsMetricsSource`.
6
7use crate::cgroups_stats::ContainerStats;
8use crate::runtime::{ContainerId, Runtime};
9use crate::service::ServiceManager;
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::Arc;
13use zlayer_scheduler::metrics::{
14    ContainerStatsProvider, MetricsContainerId, RawContainerStats, ServiceContainerProvider,
15};
16
17/// Provides container IDs for services from `ServiceManager`
18///
19/// Implements the scheduler's `ServiceContainerProvider` trait to
20/// bridge `ServiceManager` with `CgroupsMetricsSource`.
21///
22/// # Example
23///
24/// ```ignore
25/// use zlayer_agent::metrics_providers::ServiceManagerContainerProvider;
26/// use zlayer_agent::ServiceManager;
27/// use zlayer_scheduler::metrics::CgroupsMetricsSource;
28/// use std::sync::Arc;
29///
30/// let manager = Arc::new(ServiceManager::new(runtime));
31/// let provider = Arc::new(ServiceManagerContainerProvider::new(manager.clone()));
32///
33/// // Use with CgroupsMetricsSource
34/// let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
35/// let source = CgroupsMetricsSource::new(provider, stats_provider);
36/// ```
37pub struct ServiceManagerContainerProvider {
38    manager: Arc<ServiceManager>,
39}
40
41impl ServiceManagerContainerProvider {
42    /// Create a new provider wrapping a `ServiceManager`
43    pub fn new(manager: Arc<ServiceManager>) -> Self {
44        Self { manager }
45    }
46}
47
48#[async_trait]
49impl ServiceContainerProvider for ServiceManagerContainerProvider {
50    /// Get all container IDs for a given service
51    async fn get_container_ids(&self, service_name: &str) -> Vec<MetricsContainerId> {
52        // Get container IDs from ServiceManager
53        let container_ids = self.manager.get_service_containers(service_name).await;
54
55        container_ids
56            .into_iter()
57            .map(|id| MetricsContainerId {
58                service: id.service,
59                replica: id.replica,
60            })
61            .collect()
62    }
63
64    /// Get all services and their containers
65    async fn get_all_services(&self) -> HashMap<String, Vec<MetricsContainerId>> {
66        let service_names = self.manager.list_services().await;
67        let mut result = HashMap::new();
68
69        for name in service_names {
70            let container_ids = self.get_container_ids(&name).await;
71            if !container_ids.is_empty() {
72                result.insert(name, container_ids);
73            }
74        }
75
76        result
77    }
78}
79
80/// Provides container statistics from the Runtime
81///
82/// Implements the scheduler's `ContainerStatsProvider` trait to
83/// bridge the Runtime's `get_container_stats` with `CgroupsMetricsSource`.
84///
85/// # Example
86///
87/// ```ignore
88/// use zlayer_agent::metrics_providers::RuntimeStatsProvider;
89/// use zlayer_agent::YoukiRuntime;
90/// use std::sync::Arc;
91///
92/// let runtime = Arc::new(YoukiRuntime::with_defaults().await?);
93/// let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
94/// ```
95pub struct RuntimeStatsProvider {
96    runtime: Arc<dyn Runtime + Send + Sync>,
97}
98
99impl RuntimeStatsProvider {
100    /// Create a new stats provider wrapping a Runtime
101    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
102        Self { runtime }
103    }
104}
105
106#[async_trait]
107impl ContainerStatsProvider for RuntimeStatsProvider {
108    /// Get raw container statistics from cgroups
109    async fn get_stats(&self, id: &MetricsContainerId) -> Result<RawContainerStats, String> {
110        // Convert MetricsContainerId to ContainerId
111        let container_id = ContainerId::new(id.service.clone(), id.replica);
112
113        // Get stats from Runtime
114        let stats = self
115            .runtime
116            .get_container_stats(&container_id)
117            .await
118            .map_err(|e| e.to_string())?;
119
120        // Convert ContainerStats to RawContainerStats
121        Ok(container_stats_to_raw(&stats))
122    }
123}
124
125/// Convert agent's `ContainerStats` to scheduler's `RawContainerStats`
126///
127/// This function bridges the two stats types, allowing the agent crate
128/// to not depend on scheduler for its core types while still providing
129/// the necessary data for metrics collection.
130fn container_stats_to_raw(stats: &ContainerStats) -> RawContainerStats {
131    RawContainerStats {
132        cpu_usage_usec: stats.cpu_usage_usec,
133        memory_bytes: stats.memory_bytes,
134        memory_limit: stats.memory_limit,
135        timestamp: stats.timestamp,
136    }
137}
138
139#[cfg(test)]
140#[allow(deprecated)]
141mod tests {
142    use super::*;
143    use crate::runtime::MockRuntime;
144    use std::sync::Arc;
145
146    fn mock_spec() -> zlayer_spec::ServiceSpec {
147        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
148            r"
149version: v1
150deployment: test
151services:
152  test:
153    rtype: service
154    image:
155      name: test:latest
156    endpoints:
157      - name: http
158        protocol: http
159        port: 8080
160    scale:
161      mode: fixed
162      replicas: 1
163",
164        )
165        .unwrap()
166        .services
167        .remove("test")
168        .unwrap()
169    }
170
171    #[tokio::test]
172    async fn test_service_manager_container_provider_empty() {
173        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
174        let manager = Arc::new(ServiceManager::new(runtime));
175        let provider = ServiceManagerContainerProvider::new(manager);
176
177        // No services registered yet
178        let containers = provider.get_container_ids("nonexistent").await;
179        assert!(containers.is_empty());
180
181        let all = provider.get_all_services().await;
182        assert!(all.is_empty());
183    }
184
185    #[tokio::test]
186    async fn test_service_manager_container_provider_with_service() {
187        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
188        let manager = Arc::new(ServiceManager::new(runtime));
189
190        // Add and scale a service
191        Box::pin(manager.upsert_service("api".to_string(), mock_spec()))
192            .await
193            .unwrap();
194        manager.scale_service("api", 3).await.unwrap();
195
196        let provider = ServiceManagerContainerProvider::new(manager);
197
198        // Should have 3 containers
199        let containers = provider.get_container_ids("api").await;
200        assert_eq!(containers.len(), 3);
201
202        // Verify container IDs
203        for c in &containers {
204            assert_eq!(c.service, "api");
205            // Replicas are 1-indexed
206            assert!(c.replica >= 1 && c.replica <= 3);
207        }
208
209        // Test get_all_services
210        let all = provider.get_all_services().await;
211        assert_eq!(all.len(), 1);
212        assert!(all.contains_key("api"));
213        assert_eq!(all["api"].len(), 3);
214    }
215
216    #[tokio::test]
217    async fn test_runtime_stats_provider() {
218        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
219        let manager = Arc::new(ServiceManager::new(runtime.clone()));
220
221        // Add and scale a service
222        Box::pin(manager.upsert_service("test".to_string(), mock_spec()))
223            .await
224            .unwrap();
225        manager.scale_service("test", 1).await.unwrap();
226
227        let stats_provider = RuntimeStatsProvider::new(runtime);
228
229        let id = MetricsContainerId {
230            service: "test".to_string(),
231            replica: 1,
232        };
233
234        // MockRuntime returns predefined stats
235        let stats = stats_provider.get_stats(&id).await.unwrap();
236        assert_eq!(stats.cpu_usage_usec, 1_000_000);
237        assert_eq!(stats.memory_bytes, 50 * 1024 * 1024);
238        assert_eq!(stats.memory_limit, 256 * 1024 * 1024);
239    }
240
241    #[tokio::test]
242    async fn test_runtime_stats_provider_not_found() {
243        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
244        let stats_provider = RuntimeStatsProvider::new(runtime);
245
246        let id = MetricsContainerId {
247            service: "nonexistent".to_string(),
248            replica: 1,
249        };
250
251        // Should return error for non-existent container
252        let result = stats_provider.get_stats(&id).await;
253        assert!(result.is_err());
254    }
255
256    #[test]
257    fn test_container_stats_to_raw() {
258        use std::time::Instant;
259
260        let stats = ContainerStats {
261            cpu_usage_usec: 1_000_000,
262            memory_bytes: 100 * 1024 * 1024,
263            memory_limit: 256 * 1024 * 1024,
264            timestamp: Instant::now(),
265        };
266
267        let raw = container_stats_to_raw(&stats);
268
269        assert_eq!(raw.cpu_usage_usec, stats.cpu_usage_usec);
270        assert_eq!(raw.memory_bytes, stats.memory_bytes);
271        assert_eq!(raw.memory_limit, stats.memory_limit);
272        // Note: Instant comparison is tricky, but they should be very close
273    }
274}