Skip to main content

zlayer_agent/
autoscale_controller.rs

1//! `AutoscaleController` - Connects autoscaling decisions to container scaling
2//!
3//! This module provides an `AutoscaleController` that bridges the scheduler's
4//! autoscaling logic with the agent's `ServiceManager` to automatically scale
5//! services based on resource utilization.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌────────────────────────────────────────────────────────────────────┐
11//! │                     AutoscaleController                            │
12//! │  ┌─────────────────┐  ┌────────────┐  ┌──────────────────┐       │
13//! │  │ CgroupsMetrics  │  │ Autoscaler │  │ ServiceManager   │       │
14//! │  │    Source       │──│            │──│  (scaling)       │       │
15//! │  └─────────────────┘  └────────────┘  └──────────────────┘       │
16//! └────────────────────────────────────────────────────────────────────┘
17//! ```
18//!
19//! # Example
20//!
21//! ```ignore
22//! use zlayer_agent::autoscale_controller::AutoscaleController;
23//! use zlayer_agent::{ServiceManager, RuntimeConfig, create_runtime};
24//! use std::sync::Arc;
25//! use std::time::Duration;
26//!
27//! // Create runtime and service manager
28//! let runtime = create_runtime(RuntimeConfig::Mock).await?;
29//! let manager = Arc::new(ServiceManager::new(runtime.clone()));
30//!
31//! // Create autoscale controller
32//! let controller = AutoscaleController::new(
33//!     manager.clone(),
34//!     runtime.clone(),
35//!     Duration::from_secs(10),
36//! );
37//!
38//! // Register services with adaptive scaling
39//! controller.register_service("api", &scale_spec, 2).await;
40//!
41//! // Run the autoscaling loop (in background)
42//! let handle = tokio::spawn(async move {
43//!     controller.run_loop().await
44//! });
45//!
46//! // Later, shutdown
47//! controller.shutdown();
48//! ```
49
50use crate::error::Result;
51use crate::metrics_providers::{RuntimeStatsProvider, ServiceManagerContainerProvider};
52use crate::runtime::Runtime;
53use crate::service::ServiceManager;
54use std::collections::HashMap;
55use std::sync::Arc;
56use std::time::{Duration, Instant};
57use tokio::sync::RwLock;
58use tracing::{debug, error, info, warn};
59use zlayer_scheduler::metrics::{CgroupsMetricsSource, MetricsCollector, MetricsSource};
60use zlayer_scheduler::Autoscaler;
61use zlayer_spec::ScaleSpec;
62
63/// Default autoscaling evaluation interval
64pub const DEFAULT_AUTOSCALE_INTERVAL: Duration = Duration::from_secs(10);
65
66/// Controller that connects autoscaling decisions to actual container scaling
67///
68/// The `AutoscaleController` periodically collects metrics from running containers,
69/// evaluates whether scaling is needed using the `Autoscaler`, and executes scaling
70/// decisions through the `ServiceManager`.
71pub struct AutoscaleController {
72    /// Service manager for executing scaling operations
73    service_manager: Arc<ServiceManager>,
74    /// Metrics collector with cgroups source
75    metrics: Arc<MetricsCollector>,
76    /// Autoscaler decision engine
77    autoscaler: Arc<RwLock<Autoscaler>>,
78    /// Service specs for scale configuration (`service_name` -> spec)
79    service_specs: Arc<RwLock<HashMap<String, ScaleSpec>>>,
80    /// Last scale times for cooldown tracking (`service_name` -> instant)
81    last_scale_times: Arc<RwLock<HashMap<String, Instant>>>,
82    /// Evaluation interval
83    interval: Duration,
84    /// Shutdown signal
85    shutdown: Arc<tokio::sync::Notify>,
86}
87
88impl AutoscaleController {
89    /// Create a new autoscale controller
90    ///
91    /// # Arguments
92    /// * `service_manager` - The service manager used to execute scaling operations
93    /// * `runtime` - The container runtime for collecting metrics
94    /// * `interval` - How often to evaluate scaling decisions
95    ///
96    /// # Example
97    ///
98    /// ```ignore
99    /// let controller = AutoscaleController::new(
100    ///     service_manager,
101    ///     runtime,
102    ///     Duration::from_secs(10),
103    /// );
104    /// ```
105    pub fn new(
106        service_manager: Arc<ServiceManager>,
107        runtime: Arc<dyn Runtime + Send + Sync>,
108        interval: Duration,
109    ) -> Self {
110        // Create metrics collector with cgroups source
111        let mut metrics = MetricsCollector::new();
112
113        // Create the stats provider wrapping the runtime
114        let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
115
116        // Create the service container provider wrapping the service manager
117        let service_provider = Arc::new(ServiceManagerContainerProvider::new(
118            service_manager.clone(),
119        ));
120
121        // Create cgroups metrics source
122        let source: Arc<dyn MetricsSource> =
123            Arc::new(CgroupsMetricsSource::new(service_provider, stats_provider));
124        metrics.add_source(source);
125
126        Self {
127            service_manager,
128            metrics: Arc::new(metrics),
129            autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
130            service_specs: Arc::new(RwLock::new(HashMap::new())),
131            last_scale_times: Arc::new(RwLock::new(HashMap::new())),
132            interval,
133            shutdown: Arc::new(tokio::sync::Notify::new()),
134        }
135    }
136
137    /// Create with a custom metrics collector (useful for testing)
138    pub fn with_custom_metrics(
139        service_manager: Arc<ServiceManager>,
140        metrics: MetricsCollector,
141        interval: Duration,
142    ) -> Self {
143        Self {
144            service_manager,
145            metrics: Arc::new(metrics),
146            autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
147            service_specs: Arc::new(RwLock::new(HashMap::new())),
148            last_scale_times: Arc::new(RwLock::new(HashMap::new())),
149            interval,
150            shutdown: Arc::new(tokio::sync::Notify::new()),
151        }
152    }
153
154    /// Register a service for autoscaling
155    ///
156    /// Only services with `ScaleSpec::Adaptive` will be evaluated for autoscaling.
157    /// Services with `Fixed` or `Manual` scaling are ignored by the autoscaler loop.
158    ///
159    /// # Arguments
160    /// * `name` - Service name
161    /// * `spec` - The service's scale specification
162    /// * `initial_replicas` - Current number of replicas
163    pub async fn register_service(&self, name: &str, spec: &ScaleSpec, initial_replicas: u32) {
164        // Only register adaptive services
165        if !matches!(spec, ScaleSpec::Adaptive { .. }) {
166            debug!(
167                service = name,
168                "Skipping registration for non-adaptive service"
169            );
170            return;
171        }
172
173        // Register with autoscaler
174        {
175            let mut autoscaler = self.autoscaler.write().await;
176            autoscaler.register_service(name, spec.clone(), initial_replicas);
177        }
178
179        // Store spec for reference
180        {
181            let mut specs = self.service_specs.write().await;
182            specs.insert(name.to_string(), spec.clone());
183        }
184
185        info!(
186            service = name,
187            initial_replicas, "Registered service for autoscaling"
188        );
189    }
190
191    /// Unregister a service from autoscaling
192    pub async fn unregister_service(&self, name: &str) {
193        {
194            let mut autoscaler = self.autoscaler.write().await;
195            autoscaler.unregister_service(name);
196        }
197
198        {
199            let mut specs = self.service_specs.write().await;
200            specs.remove(name);
201        }
202
203        {
204            let mut times = self.last_scale_times.write().await;
205            times.remove(name);
206        }
207
208        info!(service = name, "Unregistered service from autoscaling");
209    }
210
211    /// Check if a service is registered for autoscaling
212    pub async fn is_registered(&self, name: &str) -> bool {
213        let specs = self.service_specs.read().await;
214        specs.contains_key(name)
215    }
216
217    /// Check if a service is in cooldown period
218    ///
219    /// Returns true if the service was scaled recently and is still in cooldown.
220    async fn should_scale(&self, service_name: &str) -> bool {
221        // Get the cooldown duration from the spec
222        let cooldown = {
223            let specs = self.service_specs.read().await;
224            match specs.get(service_name) {
225                Some(ScaleSpec::Adaptive { cooldown, .. }) => {
226                    cooldown.unwrap_or(zlayer_scheduler::DEFAULT_COOLDOWN)
227                }
228                _ => return false, // Not adaptive, shouldn't scale
229            }
230        };
231
232        // Check if we're past the cooldown period
233        let last_scale_times = self.last_scale_times.read().await;
234        if let Some(last_time) = last_scale_times.get(service_name) {
235            if last_time.elapsed() < cooldown {
236                let remaining = cooldown
237                    .checked_sub(last_time.elapsed())
238                    .unwrap_or_default();
239                debug!(
240                    service = service_name,
241                    remaining_secs = remaining.as_secs(),
242                    "Service in cooldown"
243                );
244                return false;
245            }
246        }
247
248        true
249    }
250
251    /// Record that a scale action occurred
252    async fn record_scale_action(&self, service_name: &str) {
253        let mut times = self.last_scale_times.write().await;
254        times.insert(service_name.to_string(), Instant::now());
255    }
256
257    /// Run the autoscaling loop
258    ///
259    /// This method should be spawned as a background task. It will continuously
260    /// evaluate scaling decisions at the configured interval until shutdown is
261    /// signaled.
262    ///
263    /// # Returns
264    /// Returns `Ok(())` when shutdown is signaled, or an error if something
265    /// goes wrong.
266    ///
267    /// # Example
268    ///
269    /// ```ignore
270    /// let controller = Arc::new(AutoscaleController::new(...));
271    /// let controller_clone = controller.clone();
272    ///
273    /// // Spawn the autoscale loop
274    /// let handle = tokio::spawn(async move {
275    ///     controller_clone.run_loop().await
276    /// });
277    ///
278    /// // Later, shutdown
279    /// controller.shutdown();
280    /// handle.await.unwrap();
281    /// ```
282    /// # Errors
283    /// Returns an error if the autoscale loop encounters an unrecoverable error.
284    #[allow(clippy::cast_possible_truncation)]
285    pub async fn run_loop(&self) -> Result<()> {
286        let mut ticker = tokio::time::interval(self.interval);
287
288        info!(
289            interval_ms = self.interval.as_millis() as u64,
290            "Starting autoscale controller loop"
291        );
292
293        loop {
294            tokio::select! {
295                _ = ticker.tick() => {
296                    self.evaluate_all_services().await;
297                }
298                () = self.shutdown.notified() => {
299                    info!("Autoscale controller shutting down");
300                    break;
301                }
302            }
303        }
304
305        Ok(())
306    }
307
308    /// Evaluate and potentially scale all registered services
309    async fn evaluate_all_services(&self) {
310        // Get list of registered services
311        let service_names: Vec<String> = {
312            let specs = self.service_specs.read().await;
313            specs.keys().cloned().collect()
314        };
315
316        for service_name in service_names {
317            if let Err(e) = self.evaluate_and_scale(&service_name).await {
318                // Log but don't fail the entire loop
319                warn!(
320                    service = %service_name,
321                    error = %e,
322                    "Failed to evaluate/scale service"
323                );
324            }
325        }
326    }
327
328    /// Evaluate a single service and execute scaling if needed
329    async fn evaluate_and_scale(&self, service_name: &str) -> Result<()> {
330        // Check cooldown first
331        if !self.should_scale(service_name).await {
332            return Ok(());
333        }
334
335        // Collect metrics
336        let aggregated = match self.metrics.collect(service_name).await {
337            Ok(m) => m,
338            Err(e) => {
339                // Missing metrics is not necessarily an error - the service might
340                // not have any running containers yet
341                debug!(
342                    service = service_name,
343                    error = %e,
344                    "No metrics available for service"
345                );
346                return Ok(());
347            }
348        };
349
350        // Make scaling decision
351        let decision = {
352            let mut autoscaler = self.autoscaler.write().await;
353            match autoscaler.evaluate(service_name, &aggregated) {
354                Ok(d) => d,
355                Err(e) => {
356                    debug!(
357                        service = service_name,
358                        error = %e,
359                        "Failed to evaluate scaling"
360                    );
361                    return Ok(());
362                }
363            }
364        };
365
366        debug!(
367            service = service_name,
368            ?decision,
369            cpu = aggregated.avg_cpu_percent,
370            memory = aggregated.avg_memory_percent,
371            instances = aggregated.instance_count,
372            "Autoscale evaluation"
373        );
374
375        // Execute scaling if needed
376        if let Some(target) = decision.target_replicas() {
377            info!(
378                service = service_name,
379                target_replicas = target,
380                decision = ?decision,
381                "Executing autoscale"
382            );
383
384            // Execute the scaling
385            if let Err(e) = self
386                .service_manager
387                .scale_service(service_name, target)
388                .await
389            {
390                error!(
391                    service = service_name,
392                    target = target,
393                    error = %e,
394                    "Failed to scale service"
395                );
396                return Err(e);
397            }
398
399            // Record the scale action
400            self.record_scale_action(service_name).await;
401
402            // Update the autoscaler's internal state
403            {
404                let mut autoscaler = self.autoscaler.write().await;
405                if let Err(e) = autoscaler.record_scale_action(service_name, target) {
406                    warn!(
407                        service = service_name,
408                        error = %e,
409                        "Failed to record scale action in autoscaler"
410                    );
411                }
412            }
413        }
414
415        Ok(())
416    }
417
418    /// Signal shutdown of the autoscale loop
419    pub fn shutdown(&self) {
420        self.shutdown.notify_one();
421    }
422
423    /// Get the current evaluation interval
424    #[must_use]
425    pub fn interval(&self) -> Duration {
426        self.interval
427    }
428
429    /// Get registered service count
430    pub async fn registered_service_count(&self) -> usize {
431        let specs = self.service_specs.read().await;
432        specs.len()
433    }
434}
435
436/// Check if any service in a deployment has adaptive scaling
437///
438/// This is a helper function to determine if the autoscale controller should
439/// be started for a deployment.
440#[must_use]
441#[allow(clippy::implicit_hasher)]
442pub fn has_adaptive_scaling(services: &HashMap<String, zlayer_spec::ServiceSpec>) -> bool {
443    services
444        .values()
445        .any(|s| matches!(s.scale, ScaleSpec::Adaptive { .. }))
446}
447
448#[cfg(test)]
449#[allow(deprecated)]
450mod tests {
451    use super::*;
452    use crate::runtime::MockRuntime;
453    use zlayer_scheduler::metrics::{MockMetricsSource, ServiceMetrics};
454    use zlayer_spec::ScaleTargets;
455
456    fn mock_spec() -> zlayer_spec::ServiceSpec {
457        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
458            r"
459version: v1
460deployment: test
461services:
462  test:
463    rtype: service
464    image:
465      name: test:latest
466    endpoints:
467      - name: http
468        protocol: http
469        port: 8080
470    scale:
471      mode: fixed
472      replicas: 1
473",
474        )
475        .unwrap()
476        .services
477        .remove("test")
478        .unwrap()
479    }
480
481    fn adaptive_spec(
482        min: u32,
483        max: u32,
484        cpu_target: Option<u8>,
485        memory_target: Option<u8>,
486    ) -> ScaleSpec {
487        ScaleSpec::Adaptive {
488            min,
489            max,
490            cooldown: Some(Duration::from_secs(0)), // No cooldown for tests
491            targets: ScaleTargets {
492                cpu: cpu_target,
493                memory: memory_target,
494                rps: None,
495            },
496        }
497    }
498
499    #[tokio::test]
500    async fn test_autoscale_controller_creation() {
501        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
502        let manager = Arc::new(ServiceManager::new(runtime.clone()));
503
504        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
505
506        assert_eq!(controller.interval(), Duration::from_secs(10));
507        assert_eq!(controller.registered_service_count().await, 0);
508    }
509
510    #[tokio::test]
511    async fn test_register_service() {
512        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
513        let manager = Arc::new(ServiceManager::new(runtime.clone()));
514
515        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
516
517        // Register an adaptive service
518        let spec = adaptive_spec(1, 10, Some(70), None);
519        controller.register_service("api", &spec, 2).await;
520
521        assert!(controller.is_registered("api").await);
522        assert_eq!(controller.registered_service_count().await, 1);
523    }
524
525    #[tokio::test]
526    async fn test_register_fixed_service_ignored() {
527        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
528        let manager = Arc::new(ServiceManager::new(runtime.clone()));
529
530        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
531
532        // Try to register a fixed service - should be ignored
533        let spec = ScaleSpec::Fixed { replicas: 3 };
534        controller.register_service("api", &spec, 3).await;
535
536        assert!(!controller.is_registered("api").await);
537        assert_eq!(controller.registered_service_count().await, 0);
538    }
539
540    #[tokio::test]
541    async fn test_unregister_service() {
542        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
543        let manager = Arc::new(ServiceManager::new(runtime.clone()));
544
545        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
546
547        let spec = adaptive_spec(1, 10, Some(70), None);
548        controller.register_service("api", &spec, 2).await;
549
550        assert!(controller.is_registered("api").await);
551
552        controller.unregister_service("api").await;
553
554        assert!(!controller.is_registered("api").await);
555        assert_eq!(controller.registered_service_count().await, 0);
556    }
557
558    #[tokio::test]
559    async fn test_has_adaptive_scaling() {
560        let mut services = HashMap::new();
561
562        // Add a fixed service
563        let mut fixed_spec = mock_spec();
564        fixed_spec.scale = ScaleSpec::Fixed { replicas: 3 };
565        services.insert("web".to_string(), fixed_spec);
566
567        // No adaptive services yet
568        assert!(!has_adaptive_scaling(&services));
569
570        // Add an adaptive service
571        let mut adaptive = mock_spec();
572        adaptive.scale = adaptive_spec(1, 10, Some(70), None);
573        services.insert("api".to_string(), adaptive);
574
575        // Now has adaptive
576        assert!(has_adaptive_scaling(&services));
577    }
578
579    #[tokio::test]
580    async fn test_autoscale_controller_with_mock_metrics() {
581        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
582        let manager = Arc::new(ServiceManager::new(runtime.clone()));
583
584        // Create mock metrics source
585        let mock = Arc::new(MockMetricsSource::new());
586
587        // Set high CPU metrics
588        mock.set_metrics(
589            "api",
590            vec![
591                ServiceMetrics {
592                    cpu_percent: 85.0,
593                    memory_bytes: 100 * 1024 * 1024,
594                    memory_limit: 512 * 1024 * 1024,
595                    rps: None,
596                    timestamp: Some(Instant::now()),
597                },
598                ServiceMetrics {
599                    cpu_percent: 90.0,
600                    memory_bytes: 150 * 1024 * 1024,
601                    memory_limit: 512 * 1024 * 1024,
602                    rps: None,
603                    timestamp: Some(Instant::now()),
604                },
605            ],
606        )
607        .await;
608
609        // Create controller with custom metrics
610        let mut metrics = MetricsCollector::new();
611        metrics.add_source(mock);
612
613        let controller = AutoscaleController::with_custom_metrics(
614            manager.clone(),
615            metrics,
616            Duration::from_secs(10),
617        );
618
619        // Register service
620        manager
621            .upsert_service("api".to_string(), mock_spec())
622            .await
623            .unwrap();
624        manager.scale_service("api", 2).await.unwrap();
625
626        let spec = adaptive_spec(1, 10, Some(70), None);
627        controller.register_service("api", &spec, 2).await;
628
629        // Evaluate - should want to scale up due to high CPU
630        controller.evaluate_and_scale("api").await.unwrap();
631
632        // Check that scale happened (from 2 to 3)
633        let count = manager.service_replica_count("api").await.unwrap();
634        assert_eq!(count, 3);
635    }
636
637    #[tokio::test]
638    async fn test_autoscale_controller_cooldown() {
639        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
640        let manager = Arc::new(ServiceManager::new(runtime.clone()));
641
642        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
643
644        // Use a spec with 1 second cooldown
645        let spec = ScaleSpec::Adaptive {
646            min: 1,
647            max: 10,
648            cooldown: Some(Duration::from_secs(60)), // Long cooldown
649            targets: ScaleTargets {
650                cpu: Some(70),
651                memory: None,
652                rps: None,
653            },
654        };
655
656        controller.register_service("api", &spec, 2).await;
657
658        // Initially should be able to scale
659        assert!(controller.should_scale("api").await);
660
661        // Record a scale action
662        controller.record_scale_action("api").await;
663
664        // Now should be in cooldown
665        assert!(!controller.should_scale("api").await);
666    }
667
668    #[tokio::test]
669    async fn test_autoscale_controller_shutdown() {
670        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
671        let manager = Arc::new(ServiceManager::new(runtime.clone()));
672
673        let controller = Arc::new(AutoscaleController::new(
674            manager,
675            runtime,
676            Duration::from_millis(100), // Fast interval for test
677        ));
678
679        let controller_clone = controller.clone();
680
681        // Spawn the loop
682        let handle = tokio::spawn(async move { controller_clone.run_loop().await });
683
684        // Let it run briefly
685        tokio::time::sleep(Duration::from_millis(50)).await;
686
687        // Signal shutdown
688        controller.shutdown();
689
690        // Should complete without error
691        let result = handle.await.unwrap();
692        assert!(result.is_ok());
693    }
694}