1use crate::error::Result;
51use crate::metrics_providers::{RuntimeStatsProvider, ServiceManagerContainerProvider};
52use crate::runtime::Runtime;
53use crate::service::ServiceManager;
54use std::collections::HashMap;
55use std::sync::Arc;
56use std::time::{Duration, Instant};
57use tokio::sync::RwLock;
58use tracing::{debug, error, info, warn};
59use zlayer_scheduler::metrics::{CgroupsMetricsSource, MetricsCollector, MetricsSource};
60use zlayer_scheduler::Autoscaler;
61use zlayer_spec::ScaleSpec;
62
63pub const DEFAULT_AUTOSCALE_INTERVAL: Duration = Duration::from_secs(10);
65
66pub struct AutoscaleController {
72 service_manager: Arc<ServiceManager>,
74 metrics: Arc<MetricsCollector>,
76 autoscaler: Arc<RwLock<Autoscaler>>,
78 service_specs: Arc<RwLock<HashMap<String, ScaleSpec>>>,
80 last_scale_times: Arc<RwLock<HashMap<String, Instant>>>,
82 interval: Duration,
84 shutdown: Arc<tokio::sync::Notify>,
86}
87
88impl AutoscaleController {
89 pub fn new(
106 service_manager: Arc<ServiceManager>,
107 runtime: Arc<dyn Runtime + Send + Sync>,
108 interval: Duration,
109 ) -> Self {
110 let mut metrics = MetricsCollector::new();
112
113 let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
115
116 let service_provider = Arc::new(ServiceManagerContainerProvider::new(
118 service_manager.clone(),
119 ));
120
121 let source: Arc<dyn MetricsSource> =
123 Arc::new(CgroupsMetricsSource::new(service_provider, stats_provider));
124 metrics.add_source(source);
125
126 Self {
127 service_manager,
128 metrics: Arc::new(metrics),
129 autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
130 service_specs: Arc::new(RwLock::new(HashMap::new())),
131 last_scale_times: Arc::new(RwLock::new(HashMap::new())),
132 interval,
133 shutdown: Arc::new(tokio::sync::Notify::new()),
134 }
135 }
136
137 pub fn with_custom_metrics(
139 service_manager: Arc<ServiceManager>,
140 metrics: MetricsCollector,
141 interval: Duration,
142 ) -> Self {
143 Self {
144 service_manager,
145 metrics: Arc::new(metrics),
146 autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
147 service_specs: Arc::new(RwLock::new(HashMap::new())),
148 last_scale_times: Arc::new(RwLock::new(HashMap::new())),
149 interval,
150 shutdown: Arc::new(tokio::sync::Notify::new()),
151 }
152 }
153
154 pub async fn register_service(&self, name: &str, spec: &ScaleSpec, initial_replicas: u32) {
164 if !matches!(spec, ScaleSpec::Adaptive { .. }) {
166 debug!(
167 service = name,
168 "Skipping registration for non-adaptive service"
169 );
170 return;
171 }
172
173 {
175 let mut autoscaler = self.autoscaler.write().await;
176 autoscaler.register_service(name, spec.clone(), initial_replicas);
177 }
178
179 {
181 let mut specs = self.service_specs.write().await;
182 specs.insert(name.to_string(), spec.clone());
183 }
184
185 info!(
186 service = name,
187 initial_replicas, "Registered service for autoscaling"
188 );
189 }
190
191 pub async fn unregister_service(&self, name: &str) {
193 {
194 let mut autoscaler = self.autoscaler.write().await;
195 autoscaler.unregister_service(name);
196 }
197
198 {
199 let mut specs = self.service_specs.write().await;
200 specs.remove(name);
201 }
202
203 {
204 let mut times = self.last_scale_times.write().await;
205 times.remove(name);
206 }
207
208 info!(service = name, "Unregistered service from autoscaling");
209 }
210
211 pub async fn is_registered(&self, name: &str) -> bool {
213 let specs = self.service_specs.read().await;
214 specs.contains_key(name)
215 }
216
217 async fn should_scale(&self, service_name: &str) -> bool {
221 let cooldown = {
223 let specs = self.service_specs.read().await;
224 match specs.get(service_name) {
225 Some(ScaleSpec::Adaptive { cooldown, .. }) => {
226 cooldown.unwrap_or(zlayer_scheduler::DEFAULT_COOLDOWN)
227 }
228 _ => return false, }
230 };
231
232 let last_scale_times = self.last_scale_times.read().await;
234 if let Some(last_time) = last_scale_times.get(service_name) {
235 if last_time.elapsed() < cooldown {
236 let remaining = cooldown
237 .checked_sub(last_time.elapsed())
238 .unwrap_or_default();
239 debug!(
240 service = service_name,
241 remaining_secs = remaining.as_secs(),
242 "Service in cooldown"
243 );
244 return false;
245 }
246 }
247
248 true
249 }
250
251 async fn record_scale_action(&self, service_name: &str) {
253 let mut times = self.last_scale_times.write().await;
254 times.insert(service_name.to_string(), Instant::now());
255 }
256
257 #[allow(clippy::cast_possible_truncation)]
285 pub async fn run_loop(&self) -> Result<()> {
286 let mut ticker = tokio::time::interval(self.interval);
287
288 info!(
289 interval_ms = self.interval.as_millis() as u64,
290 "Starting autoscale controller loop"
291 );
292
293 loop {
294 tokio::select! {
295 _ = ticker.tick() => {
296 self.evaluate_all_services().await;
297 }
298 () = self.shutdown.notified() => {
299 info!("Autoscale controller shutting down");
300 break;
301 }
302 }
303 }
304
305 Ok(())
306 }
307
308 async fn evaluate_all_services(&self) {
310 let service_names: Vec<String> = {
312 let specs = self.service_specs.read().await;
313 specs.keys().cloned().collect()
314 };
315
316 for service_name in service_names {
317 if let Err(e) = self.evaluate_and_scale(&service_name).await {
318 warn!(
320 service = %service_name,
321 error = %e,
322 "Failed to evaluate/scale service"
323 );
324 }
325 }
326 }
327
328 async fn evaluate_and_scale(&self, service_name: &str) -> Result<()> {
330 if !self.should_scale(service_name).await {
332 return Ok(());
333 }
334
335 let aggregated = match self.metrics.collect(service_name).await {
337 Ok(m) => m,
338 Err(e) => {
339 debug!(
342 service = service_name,
343 error = %e,
344 "No metrics available for service"
345 );
346 return Ok(());
347 }
348 };
349
350 let decision = {
352 let mut autoscaler = self.autoscaler.write().await;
353 match autoscaler.evaluate(service_name, &aggregated) {
354 Ok(d) => d,
355 Err(e) => {
356 debug!(
357 service = service_name,
358 error = %e,
359 "Failed to evaluate scaling"
360 );
361 return Ok(());
362 }
363 }
364 };
365
366 debug!(
367 service = service_name,
368 ?decision,
369 cpu = aggregated.avg_cpu_percent,
370 memory = aggregated.avg_memory_percent,
371 instances = aggregated.instance_count,
372 "Autoscale evaluation"
373 );
374
375 if let Some(target) = decision.target_replicas() {
377 info!(
378 service = service_name,
379 target_replicas = target,
380 decision = ?decision,
381 "Executing autoscale"
382 );
383
384 if let Err(e) = self
386 .service_manager
387 .scale_service(service_name, target)
388 .await
389 {
390 error!(
391 service = service_name,
392 target = target,
393 error = %e,
394 "Failed to scale service"
395 );
396 return Err(e);
397 }
398
399 self.record_scale_action(service_name).await;
401
402 {
404 let mut autoscaler = self.autoscaler.write().await;
405 if let Err(e) = autoscaler.record_scale_action(service_name, target) {
406 warn!(
407 service = service_name,
408 error = %e,
409 "Failed to record scale action in autoscaler"
410 );
411 }
412 }
413 }
414
415 Ok(())
416 }
417
418 pub fn shutdown(&self) {
420 self.shutdown.notify_one();
421 }
422
423 #[must_use]
425 pub fn interval(&self) -> Duration {
426 self.interval
427 }
428
429 pub async fn registered_service_count(&self) -> usize {
431 let specs = self.service_specs.read().await;
432 specs.len()
433 }
434}
435
436#[must_use]
441#[allow(clippy::implicit_hasher)]
442pub fn has_adaptive_scaling(services: &HashMap<String, zlayer_spec::ServiceSpec>) -> bool {
443 services
444 .values()
445 .any(|s| matches!(s.scale, ScaleSpec::Adaptive { .. }))
446}
447
448#[cfg(test)]
449#[allow(deprecated)]
450mod tests {
451 use super::*;
452 use crate::runtime::MockRuntime;
453 use zlayer_scheduler::metrics::{MockMetricsSource, ServiceMetrics};
454 use zlayer_spec::ScaleTargets;
455
456 fn mock_spec() -> zlayer_spec::ServiceSpec {
457 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
458 r"
459version: v1
460deployment: test
461services:
462 test:
463 rtype: service
464 image:
465 name: test:latest
466 endpoints:
467 - name: http
468 protocol: http
469 port: 8080
470 scale:
471 mode: fixed
472 replicas: 1
473",
474 )
475 .unwrap()
476 .services
477 .remove("test")
478 .unwrap()
479 }
480
481 fn adaptive_spec(
482 min: u32,
483 max: u32,
484 cpu_target: Option<u8>,
485 memory_target: Option<u8>,
486 ) -> ScaleSpec {
487 ScaleSpec::Adaptive {
488 min,
489 max,
490 cooldown: Some(Duration::from_secs(0)), targets: ScaleTargets {
492 cpu: cpu_target,
493 memory: memory_target,
494 rps: None,
495 },
496 }
497 }
498
499 #[tokio::test]
500 async fn test_autoscale_controller_creation() {
501 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
502 let manager = Arc::new(ServiceManager::new(runtime.clone()));
503
504 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
505
506 assert_eq!(controller.interval(), Duration::from_secs(10));
507 assert_eq!(controller.registered_service_count().await, 0);
508 }
509
510 #[tokio::test]
511 async fn test_register_service() {
512 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
513 let manager = Arc::new(ServiceManager::new(runtime.clone()));
514
515 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
516
517 let spec = adaptive_spec(1, 10, Some(70), None);
519 controller.register_service("api", &spec, 2).await;
520
521 assert!(controller.is_registered("api").await);
522 assert_eq!(controller.registered_service_count().await, 1);
523 }
524
525 #[tokio::test]
526 async fn test_register_fixed_service_ignored() {
527 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
528 let manager = Arc::new(ServiceManager::new(runtime.clone()));
529
530 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
531
532 let spec = ScaleSpec::Fixed { replicas: 3 };
534 controller.register_service("api", &spec, 3).await;
535
536 assert!(!controller.is_registered("api").await);
537 assert_eq!(controller.registered_service_count().await, 0);
538 }
539
540 #[tokio::test]
541 async fn test_unregister_service() {
542 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
543 let manager = Arc::new(ServiceManager::new(runtime.clone()));
544
545 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
546
547 let spec = adaptive_spec(1, 10, Some(70), None);
548 controller.register_service("api", &spec, 2).await;
549
550 assert!(controller.is_registered("api").await);
551
552 controller.unregister_service("api").await;
553
554 assert!(!controller.is_registered("api").await);
555 assert_eq!(controller.registered_service_count().await, 0);
556 }
557
558 #[tokio::test]
559 async fn test_has_adaptive_scaling() {
560 let mut services = HashMap::new();
561
562 let mut fixed_spec = mock_spec();
564 fixed_spec.scale = ScaleSpec::Fixed { replicas: 3 };
565 services.insert("web".to_string(), fixed_spec);
566
567 assert!(!has_adaptive_scaling(&services));
569
570 let mut adaptive = mock_spec();
572 adaptive.scale = adaptive_spec(1, 10, Some(70), None);
573 services.insert("api".to_string(), adaptive);
574
575 assert!(has_adaptive_scaling(&services));
577 }
578
579 #[tokio::test]
580 async fn test_autoscale_controller_with_mock_metrics() {
581 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
582 let manager = Arc::new(ServiceManager::new(runtime.clone()));
583
584 let mock = Arc::new(MockMetricsSource::new());
586
587 mock.set_metrics(
589 "api",
590 vec![
591 ServiceMetrics {
592 cpu_percent: 85.0,
593 memory_bytes: 100 * 1024 * 1024,
594 memory_limit: 512 * 1024 * 1024,
595 rps: None,
596 timestamp: Some(Instant::now()),
597 },
598 ServiceMetrics {
599 cpu_percent: 90.0,
600 memory_bytes: 150 * 1024 * 1024,
601 memory_limit: 512 * 1024 * 1024,
602 rps: None,
603 timestamp: Some(Instant::now()),
604 },
605 ],
606 )
607 .await;
608
609 let mut metrics = MetricsCollector::new();
611 metrics.add_source(mock);
612
613 let controller = AutoscaleController::with_custom_metrics(
614 manager.clone(),
615 metrics,
616 Duration::from_secs(10),
617 );
618
619 manager
621 .upsert_service("api".to_string(), mock_spec())
622 .await
623 .unwrap();
624 manager.scale_service("api", 2).await.unwrap();
625
626 let spec = adaptive_spec(1, 10, Some(70), None);
627 controller.register_service("api", &spec, 2).await;
628
629 controller.evaluate_and_scale("api").await.unwrap();
631
632 let count = manager.service_replica_count("api").await.unwrap();
634 assert_eq!(count, 3);
635 }
636
637 #[tokio::test]
638 async fn test_autoscale_controller_cooldown() {
639 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
640 let manager = Arc::new(ServiceManager::new(runtime.clone()));
641
642 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
643
644 let spec = ScaleSpec::Adaptive {
646 min: 1,
647 max: 10,
648 cooldown: Some(Duration::from_secs(60)), targets: ScaleTargets {
650 cpu: Some(70),
651 memory: None,
652 rps: None,
653 },
654 };
655
656 controller.register_service("api", &spec, 2).await;
657
658 assert!(controller.should_scale("api").await);
660
661 controller.record_scale_action("api").await;
663
664 assert!(!controller.should_scale("api").await);
666 }
667
668 #[tokio::test]
669 async fn test_autoscale_controller_shutdown() {
670 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
671 let manager = Arc::new(ServiceManager::new(runtime.clone()));
672
673 let controller = Arc::new(AutoscaleController::new(
674 manager,
675 runtime,
676 Duration::from_millis(100), ));
678
679 let controller_clone = controller.clone();
680
681 let handle = tokio::spawn(async move { controller_clone.run_loop().await });
683
684 tokio::time::sleep(Duration::from_millis(50)).await;
686
687 controller.shutdown();
689
690 let result = handle.await.unwrap();
692 assert!(result.is_ok());
693 }
694}