1use crate::error::{AgentError, Result};
67use crate::metrics_providers::{LockedServiceManagerContainerProvider, RuntimeStatsProvider};
68use crate::runtime::{ContainerId, ContainerResourceUpdate, Runtime};
69use crate::service::ServiceManager;
70use std::collections::HashMap;
71use std::sync::Arc;
72use std::time::{Duration, Instant};
73use tokio::sync::RwLock;
74use tracing::{debug, error, info, warn};
75use zlayer_scheduler::metrics::{
76 CgroupsMetricsSource, ContainerStatsProvider, MetricsCollector, MetricsContainerId,
77 MetricsSource, RawContainerStats,
78};
79use zlayer_scheduler::Autoscaler;
80use zlayer_spec::{ScaleSpec, ServiceSpec, VerticalMode, VerticalScaleSpec};
81
82pub const DEFAULT_AUTOSCALE_INTERVAL: Duration = Duration::from_secs(10);
84
85const IDLE_CPU_RATE_USEC_PER_SEC: f64 = 5_000.0;
94
95const VERTICAL_DEADBAND: f64 = 0.10;
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub struct VpaRecommendation {
109 pub cpu_millis: u32,
111 pub memory_mib: u32,
113}
114
115#[derive(Debug, Default)]
125pub struct VpaEngine {
126 history: HashMap<String, ContainerUsageHistory>,
128}
129
130#[derive(Debug, Default)]
134struct ContainerUsageHistory {
135 cpu_millis: std::collections::VecDeque<f64>,
137 memory_mib: std::collections::VecDeque<f64>,
139 last_cpu: Option<(u64, Instant)>,
142}
143
144impl ContainerUsageHistory {
145 const CAPACITY: usize = 32;
147
148 fn push_cpu(&mut self, millis: f64) {
149 if self.cpu_millis.len() == Self::CAPACITY {
150 self.cpu_millis.pop_front();
151 }
152 self.cpu_millis.push_back(millis);
153 }
154
155 fn push_memory(&mut self, mib: f64) {
156 if self.memory_mib.len() == Self::CAPACITY {
157 self.memory_mib.pop_front();
158 }
159 self.memory_mib.push_back(mib);
160 }
161
162 fn percentile(samples: &std::collections::VecDeque<f64>, pct: u8) -> Option<f64> {
165 if samples.is_empty() {
166 return None;
167 }
168 let mut sorted: Vec<f64> = samples.iter().copied().collect();
169 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
170 let pct = f64::from(pct.min(100)) / 100.0;
171 #[allow(
173 clippy::cast_possible_truncation,
174 clippy::cast_sign_loss,
175 clippy::cast_precision_loss
176 )]
177 let idx = ((pct * sorted.len() as f64).ceil() as usize)
178 .saturating_sub(1)
179 .min(sorted.len() - 1);
180 Some(sorted[idx])
181 }
182}
183
184impl VpaEngine {
185 #[must_use]
187 pub fn new() -> Self {
188 Self::default()
189 }
190
191 pub fn observe(&mut self, container: &str, stats: &RawContainerStats) -> f64 {
200 let now = Instant::now();
201 let hist = self.history.entry(container.to_string()).or_default();
202
203 let cpu_millis = if let Some((prev_usec, prev_at)) = hist.last_cpu {
204 let elapsed = now.duration_since(prev_at).as_secs_f64();
205 let delta_usec = stats.cpu_usage_usec.saturating_sub(prev_usec);
206 if elapsed > 0.0 {
207 #[allow(clippy::cast_precision_loss)]
209 let rate = delta_usec as f64 / elapsed / 1000.0;
210 hist.push_cpu(rate);
211 rate
212 } else {
213 0.0
214 }
215 } else {
216 0.0
217 };
218 hist.last_cpu = Some((stats.cpu_usage_usec, now));
219
220 #[allow(clippy::cast_precision_loss)]
221 let mem_mib = stats.memory_bytes as f64 / (1024.0 * 1024.0);
222 hist.push_memory(mem_mib);
223
224 cpu_millis
225 }
226
227 #[must_use]
231 pub fn recommend(
232 &self,
233 container: &str,
234 spec: &VerticalScaleSpec,
235 ) -> Option<VpaRecommendation> {
236 let hist = self.history.get(container)?;
237 let cpu_pct = ContainerUsageHistory::percentile(&hist.cpu_millis, spec.percentile)?;
238 let mem_pct = ContainerUsageHistory::percentile(&hist.memory_mib, spec.percentile)?;
239
240 #[allow(
242 clippy::cast_possible_truncation,
243 clippy::cast_sign_loss,
244 clippy::cast_precision_loss
245 )]
246 let cpu_millis = {
247 let mut v = cpu_pct.ceil().max(0.0) as u32;
248 if let Some(min) = spec.min_cpu_millis {
249 v = v.max(min);
250 }
251 if let Some(max) = spec.max_cpu_millis {
252 v = v.min(max);
253 }
254 v.max(1)
255 };
256 #[allow(
257 clippy::cast_possible_truncation,
258 clippy::cast_sign_loss,
259 clippy::cast_precision_loss
260 )]
261 let memory_mib = {
262 let mut v = mem_pct.ceil().max(0.0) as u32;
263 if let Some(min) = spec.min_memory_mib {
264 v = v.max(min);
265 }
266 if let Some(max) = spec.max_memory_mib {
267 v = v.min(max);
268 }
269 v.max(1)
270 };
271
272 Some(VpaRecommendation {
273 cpu_millis,
274 memory_mib,
275 })
276 }
277
278 pub fn forget(&mut self, container: &str) {
281 self.history.remove(container);
282 }
283}
284
285fn outside_deadband(prev: Option<VpaRecommendation>, next: VpaRecommendation) -> bool {
289 let Some(prev) = prev else { return true };
290 let exceeds = |old: u32, new: u32| {
291 if old == 0 {
292 return new != 0;
293 }
294 let delta = (f64::from(new) - f64::from(old)).abs();
295 delta / f64::from(old) > VERTICAL_DEADBAND
296 };
297 exceeds(prev.cpu_millis, next.cpu_millis) || exceeds(prev.memory_mib, next.memory_mib)
298}
299
300fn resource_update_for(rec: VpaRecommendation) -> ContainerResourceUpdate {
304 ContainerResourceUpdate {
305 cpu_period: Some(100_000),
306 cpu_quota: Some(100_000 * i64::from(rec.cpu_millis) / 1000),
307 memory: Some(i64::from(rec.memory_mib) * 1024 * 1024),
308 ..Default::default()
309 }
310}
311
312pub struct AutoscaleController {
319 service_manager: Arc<RwLock<ServiceManager>>,
323 metrics: Arc<MetricsCollector>,
325 autoscaler: Arc<RwLock<Autoscaler>>,
327 service_specs: Arc<RwLock<HashMap<String, ScaleSpec>>>,
329 last_scale_times: Arc<RwLock<HashMap<String, Instant>>>,
331 interval: Duration,
333 shutdown: Arc<tokio::sync::Notify>,
335 runtime: Arc<dyn Runtime + Send + Sync>,
342 stats_provider: Arc<RuntimeStatsProvider>,
346 last_active: Arc<RwLock<HashMap<String, Instant>>>,
348 idle_window: Arc<RwLock<HashMap<String, Duration>>>,
350 min_replicas: Arc<RwLock<HashMap<String, u32>>>,
352 vertical_specs: Arc<RwLock<HashMap<String, VerticalScaleSpec>>>,
354 service_templates: Arc<RwLock<HashMap<String, ServiceSpec>>>,
359 vpa: Arc<RwLock<VpaState>>,
362}
363
364#[derive(Default)]
367struct VpaState {
368 engine: VpaEngine,
369 last_applied: HashMap<String, VpaRecommendation>,
370}
371
372impl AutoscaleController {
373 #[must_use]
395 pub fn new(
396 service_manager: Arc<RwLock<ServiceManager>>,
397 runtime: Arc<dyn Runtime + Send + Sync>,
398 interval: Duration,
399 ) -> Self {
400 let mut metrics = MetricsCollector::new();
402
403 let runtime_for_controller = runtime.clone();
406 let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
407
408 let service_provider = Arc::new(LockedServiceManagerContainerProvider::new(
411 service_manager.clone(),
412 ));
413
414 let source: Arc<dyn MetricsSource> = Arc::new(CgroupsMetricsSource::new(
416 service_provider,
417 stats_provider.clone(),
418 ));
419 metrics.add_source(source);
420
421 Self {
422 service_manager,
423 metrics: Arc::new(metrics),
424 autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
425 service_specs: Arc::new(RwLock::new(HashMap::new())),
426 last_scale_times: Arc::new(RwLock::new(HashMap::new())),
427 interval,
428 shutdown: Arc::new(tokio::sync::Notify::new()),
429 runtime: runtime_for_controller,
430 stats_provider,
431 last_active: Arc::new(RwLock::new(HashMap::new())),
432 idle_window: Arc::new(RwLock::new(HashMap::new())),
433 min_replicas: Arc::new(RwLock::new(HashMap::new())),
434 vertical_specs: Arc::new(RwLock::new(HashMap::new())),
435 service_templates: Arc::new(RwLock::new(HashMap::new())),
436 vpa: Arc::new(RwLock::new(VpaState::default())),
437 }
438 }
439
440 #[must_use]
446 pub fn with_custom_metrics(
447 service_manager: Arc<RwLock<ServiceManager>>,
448 runtime: Arc<dyn Runtime + Send + Sync>,
449 metrics: MetricsCollector,
450 interval: Duration,
451 ) -> Self {
452 let runtime_for_controller = runtime.clone();
453 let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
454 Self {
455 service_manager,
456 metrics: Arc::new(metrics),
457 autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
458 service_specs: Arc::new(RwLock::new(HashMap::new())),
459 last_scale_times: Arc::new(RwLock::new(HashMap::new())),
460 interval,
461 shutdown: Arc::new(tokio::sync::Notify::new()),
462 runtime: runtime_for_controller,
463 stats_provider,
464 last_active: Arc::new(RwLock::new(HashMap::new())),
465 idle_window: Arc::new(RwLock::new(HashMap::new())),
466 min_replicas: Arc::new(RwLock::new(HashMap::new())),
467 vertical_specs: Arc::new(RwLock::new(HashMap::new())),
468 service_templates: Arc::new(RwLock::new(HashMap::new())),
469 vpa: Arc::new(RwLock::new(VpaState::default())),
470 }
471 }
472
473 #[must_use]
486 pub fn with_extra_metrics_source(mut self, source: Arc<dyn MetricsSource>) -> Self {
487 if let Some(collector) = Arc::get_mut(&mut self.metrics) {
488 collector.add_source(source);
489 } else {
490 warn!(
491 "with_extra_metrics_source called after the metrics collector was shared; \
492 source ignored"
493 );
494 }
495 self
496 }
497
498 pub async fn register_service(&self, name: &str, spec: &ScaleSpec, initial_replicas: u32) {
513 let ScaleSpec::Adaptive {
515 min,
516 idle_window,
517 vertical,
518 ..
519 } = spec
520 else {
521 debug!(
522 service = name,
523 "Skipping registration for non-adaptive service"
524 );
525 return;
526 };
527
528 {
530 let mut autoscaler = self.autoscaler.write().await;
531 autoscaler.register_service(name, spec.clone(), initial_replicas);
532 }
533
534 {
536 let mut specs = self.service_specs.write().await;
537 specs.insert(name.to_string(), spec.clone());
538 }
539
540 {
542 let mut mins = self.min_replicas.write().await;
543 mins.insert(name.to_string(), *min);
544 }
545 if let Some(window) = idle_window {
546 self.idle_window
547 .write()
548 .await
549 .insert(name.to_string(), *window);
550 } else {
551 self.idle_window.write().await.remove(name);
552 }
553 self.last_active
556 .write()
557 .await
558 .insert(name.to_string(), Instant::now());
559
560 if let Some(v) = vertical {
562 if matches!(v.mode, VerticalMode::Recommend | VerticalMode::Auto) {
563 self.vertical_specs
564 .write()
565 .await
566 .insert(name.to_string(), v.clone());
567 } else {
568 self.vertical_specs.write().await.remove(name);
569 }
570 } else {
571 self.vertical_specs.write().await.remove(name);
572 }
573
574 info!(
575 service = name,
576 initial_replicas,
577 idle_window_secs = idle_window.as_ref().map(Duration::as_secs),
578 min = *min,
579 vertical = vertical.is_some(),
580 "Registered service for autoscaling"
581 );
582 }
583
584 pub async fn set_service_template(&self, name: &str, spec: ServiceSpec) {
593 self.service_templates
594 .write()
595 .await
596 .insert(name.to_string(), spec);
597 }
598
599 pub fn mark_active(&self, service: &str) {
607 let last_active = self.last_active.clone();
611 let service = service.to_string();
612 if let Ok(mut guard) = last_active.try_write() {
613 guard.insert(service, Instant::now());
614 return;
615 }
616 tokio::spawn(async move {
619 last_active.write().await.insert(service, Instant::now());
620 });
621 }
622
623 pub async fn mark_active_async(&self, service: &str) {
626 self.last_active
627 .write()
628 .await
629 .insert(service.to_string(), Instant::now());
630 }
631
632 pub async fn unregister_service(&self, name: &str) {
634 {
635 let mut autoscaler = self.autoscaler.write().await;
636 autoscaler.unregister_service(name);
637 }
638
639 self.service_specs.write().await.remove(name);
640 self.last_scale_times.write().await.remove(name);
641 self.last_active.write().await.remove(name);
642 self.idle_window.write().await.remove(name);
643 self.min_replicas.write().await.remove(name);
644 self.vertical_specs.write().await.remove(name);
645 self.service_templates.write().await.remove(name);
646 self.vpa.write().await.last_applied.remove(name);
647
648 info!(service = name, "Unregistered service from autoscaling");
649 }
650
651 pub async fn is_registered(&self, name: &str) -> bool {
653 let specs = self.service_specs.read().await;
654 specs.contains_key(name)
655 }
656
657 async fn should_scale(&self, service_name: &str) -> bool {
661 let cooldown = {
663 let specs = self.service_specs.read().await;
664 match specs.get(service_name) {
665 Some(ScaleSpec::Adaptive { cooldown, .. }) => {
666 cooldown.unwrap_or(zlayer_scheduler::DEFAULT_COOLDOWN)
667 }
668 _ => return false, }
670 };
671
672 let last_scale_times = self.last_scale_times.read().await;
674 if let Some(last_time) = last_scale_times.get(service_name) {
675 if last_time.elapsed() < cooldown {
676 let remaining = cooldown
677 .checked_sub(last_time.elapsed())
678 .unwrap_or_default();
679 debug!(
680 service = service_name,
681 remaining_secs = remaining.as_secs(),
682 "Service in cooldown"
683 );
684 return false;
685 }
686 }
687
688 true
689 }
690
691 async fn record_scale_action(&self, service_name: &str) {
693 let mut times = self.last_scale_times.write().await;
694 times.insert(service_name.to_string(), Instant::now());
695 }
696
697 #[allow(clippy::cast_possible_truncation)]
725 pub async fn run_loop(&self) -> Result<()> {
726 let mut ticker = tokio::time::interval(self.interval);
727
728 info!(
729 interval_ms = self.interval.as_millis() as u64,
730 "Starting autoscale controller loop"
731 );
732
733 loop {
734 tokio::select! {
735 _ = ticker.tick() => {
736 Box::pin(self.evaluate_all_services()).await;
739 }
740 () = self.shutdown.notified() => {
741 info!("Autoscale controller shutting down");
742 break;
743 }
744 }
745 }
746
747 Ok(())
748 }
749
750 async fn discover_services(&self) {
760 let live = self.service_manager.read().await.service_specs().await;
761
762 let mut seen_adaptive: Vec<String> = Vec::new();
764 for (name, spec) in &live {
765 if !matches!(spec.scale, ScaleSpec::Adaptive { .. }) {
766 continue;
767 }
768 seen_adaptive.push(name.clone());
769
770 let needs_register = {
773 let specs = self.service_specs.read().await;
774 specs.get(name) != Some(&spec.scale)
775 };
776 if needs_register {
777 let initial = u32::try_from(
780 self.service_manager
781 .read()
782 .await
783 .service_replica_count(name)
784 .await
785 .unwrap_or(0),
786 )
787 .unwrap_or(0);
788 self.register_service(name, &spec.scale, initial).await;
789 }
790 self.set_service_template(name, spec.clone()).await;
793 }
794
795 let registered: Vec<String> = {
798 let specs = self.service_specs.read().await;
799 specs.keys().cloned().collect()
800 };
801 for name in registered {
802 if !seen_adaptive.contains(&name) {
803 self.unregister_service(&name).await;
804 }
805 }
806 }
807
808 async fn evaluate_all_services(&self) {
810 self.discover_services().await;
814
815 let service_names: Vec<String> = {
817 let specs = self.service_specs.read().await;
818 specs.keys().cloned().collect()
819 };
820
821 for service_name in service_names {
822 if let Err(e) = Box::pin(self.evaluate_vertical(&service_name)).await {
825 warn!(
826 service = %service_name,
827 error = %e,
828 "Failed vertical (VPA) evaluation"
829 );
830 }
831
832 match self.evaluate_idle(&service_name).await {
835 Ok(true) => continue, Ok(false) => {}
837 Err(e) => warn!(
838 service = %service_name,
839 error = %e,
840 "Failed scale-to-zero evaluation"
841 ),
842 }
843
844 if let Err(e) = self.evaluate_and_scale(&service_name).await {
845 warn!(
847 service = %service_name,
848 error = %e,
849 "Failed to evaluate/scale service"
850 );
851 }
852 }
853 }
854
855 async fn evaluate_idle(&self, service_name: &str) -> Result<bool> {
863 let containers = self
869 .service_manager
870 .read()
871 .await
872 .get_service_containers(service_name)
873 .await;
874 let mut busiest_cpu_millis = 0.0_f64;
875 {
876 let mut vpa = self.vpa.write().await;
877 for id in &containers {
878 let metrics_id = MetricsContainerId {
879 service: id.service.clone(),
880 replica: id.replica,
881 };
882 if let Ok(stats) = self.stats_provider.get_stats(&metrics_id).await {
883 let rate = vpa.engine.observe(&id.to_string(), &stats);
884 if rate > busiest_cpu_millis {
885 busiest_cpu_millis = rate;
886 }
887 }
888 }
889 }
890
891 if busiest_cpu_millis * 1000.0 >= IDLE_CPU_RATE_USEC_PER_SEC {
892 self.last_active
893 .write()
894 .await
895 .insert(service_name.to_string(), Instant::now());
896 }
897
898 let window = {
901 let windows = self.idle_window.read().await;
902 match windows.get(service_name) {
903 Some(w) => *w,
904 None => return Ok(false),
905 }
906 };
907 let min = self
908 .min_replicas
909 .read()
910 .await
911 .get(service_name)
912 .copied()
913 .unwrap_or(1);
914 if min != 0 {
915 return Ok(false);
916 }
917
918 let current = self
920 .service_manager
921 .read()
922 .await
923 .service_replica_count(service_name)
924 .await
925 .unwrap_or(0);
926 if current == 0 {
927 return Ok(false);
928 }
929
930 let idle_for = {
931 let last_active = self.last_active.read().await;
932 last_active
933 .get(service_name)
934 .map_or(Duration::ZERO, Instant::elapsed)
935 };
936 if idle_for <= window {
937 return Ok(false);
938 }
939
940 if !self.should_scale(service_name).await {
943 return Ok(false);
944 }
945
946 info!(
947 service = service_name,
948 idle_secs = idle_for.as_secs(),
949 window_secs = window.as_secs(),
950 "Scaling service to zero (idle past window)"
951 );
952 self.service_manager
953 .read()
954 .await
955 .scale_service(service_name, 0)
956 .await?;
957 self.record_scale_action(service_name).await;
958 {
959 let mut autoscaler = self.autoscaler.write().await;
960 if let Err(e) = autoscaler.record_scale_action(service_name, 0) {
961 warn!(
962 service = service_name,
963 error = %e,
964 "Failed to record scale-to-zero in autoscaler"
965 );
966 }
967 }
968 {
970 let mut vpa = self.vpa.write().await;
971 for id in &containers {
972 vpa.engine.forget(&id.to_string());
973 }
974 }
975 Ok(true)
976 }
977
978 async fn evaluate_vertical(&self, service_name: &str) -> Result<()> {
986 let spec = {
987 let specs = self.vertical_specs.read().await;
988 match specs.get(service_name) {
989 Some(s) => s.clone(),
990 None => return Ok(()), }
992 };
993
994 let containers = self
995 .service_manager
996 .read()
997 .await
998 .get_service_containers(service_name)
999 .await;
1000 if containers.is_empty() {
1001 return Ok(());
1002 }
1003
1004 let mut chosen: Option<VpaRecommendation> = None;
1007 {
1008 let mut vpa = self.vpa.write().await;
1009 for id in &containers {
1010 let metrics_id = MetricsContainerId {
1011 service: id.service.clone(),
1012 replica: id.replica,
1013 };
1014 match self.stats_provider.get_stats(&metrics_id).await {
1015 Ok(stats) => {
1016 vpa.engine.observe(&id.to_string(), &stats);
1017 if let Some(rec) = vpa.engine.recommend(&id.to_string(), &spec) {
1018 chosen = Some(match chosen {
1019 Some(c) => VpaRecommendation {
1020 cpu_millis: c.cpu_millis.max(rec.cpu_millis),
1021 memory_mib: c.memory_mib.max(rec.memory_mib),
1022 },
1023 None => rec,
1024 });
1025 }
1026 }
1027 Err(e) => debug!(
1028 service = service_name,
1029 container = %id,
1030 error = %e,
1031 "vertical: no stats for replica; skipping"
1032 ),
1033 }
1034 }
1035 }
1036
1037 let Some(rec) = chosen else {
1038 return Ok(());
1040 };
1041
1042 match spec.mode {
1043 VerticalMode::Off => Ok(()),
1044 VerticalMode::Recommend => {
1045 info!(
1046 service = service_name,
1047 cpu_millis = rec.cpu_millis,
1048 memory_mib = rec.memory_mib,
1049 "vertical recommendation (recommend mode; not applied)"
1050 );
1051 Ok(())
1052 }
1053 VerticalMode::Auto => {
1054 Box::pin(self.apply_vertical(service_name, rec, &containers)).await
1055 }
1056 }
1057 }
1058
1059 async fn apply_vertical(
1062 &self,
1063 service_name: &str,
1064 rec: VpaRecommendation,
1065 containers: &[ContainerId],
1066 ) -> Result<()> {
1067 let prev = self
1069 .vpa
1070 .read()
1071 .await
1072 .last_applied
1073 .get(service_name)
1074 .copied();
1075 if !outside_deadband(prev, rec) {
1076 debug!(
1077 service = service_name,
1078 cpu_millis = rec.cpu_millis,
1079 memory_mib = rec.memory_mib,
1080 "vertical recommendation within deadband; skipping"
1081 );
1082 return Ok(());
1083 }
1084
1085 let update = resource_update_for(rec);
1086 let mut needs_rolling_restart = false;
1087
1088 for id in containers {
1089 match self.runtime.update_container_resources(id, &update).await {
1090 Ok(outcome) => {
1091 if !outcome.warnings.is_empty() {
1092 debug!(
1093 service = service_name,
1094 container = %id,
1095 warnings = ?outcome.warnings,
1096 "vertical apply produced warnings"
1097 );
1098 }
1099 }
1100 Err(AgentError::Unsupported(reason)) => {
1101 debug!(
1102 service = service_name,
1103 container = %id,
1104 reason = %reason,
1105 "runtime cannot live-update resources; will roll the service"
1106 );
1107 needs_rolling_restart = true;
1108 break;
1109 }
1110 Err(AgentError::NotFound { .. }) => {
1111 debug!(
1112 service = service_name,
1113 container = %id,
1114 "vertical apply: container vanished; skipping"
1115 );
1116 }
1117 Err(e) => return Err(e),
1118 }
1119 }
1120
1121 if needs_rolling_restart {
1122 Box::pin(self.rolling_restart_with_resources(service_name, rec)).await?;
1123 }
1124
1125 info!(
1126 service = service_name,
1127 cpu_millis = rec.cpu_millis,
1128 memory_mib = rec.memory_mib,
1129 rolled = needs_rolling_restart,
1130 "applied vertical recommendation"
1131 );
1132 self.vpa
1133 .write()
1134 .await
1135 .last_applied
1136 .insert(service_name.to_string(), rec);
1137 Ok(())
1138 }
1139
1140 async fn rolling_restart_with_resources(
1152 &self,
1153 service_name: &str,
1154 rec: VpaRecommendation,
1155 ) -> Result<()> {
1156 let template = self
1157 .service_templates
1158 .read()
1159 .await
1160 .get(service_name)
1161 .cloned();
1162 let containers = self
1163 .service_manager
1164 .read()
1165 .await
1166 .get_service_containers(service_name)
1167 .await;
1168
1169 let Some(mut spec) = template else {
1170 warn!(
1174 service = service_name,
1175 "rolling restart without a base ServiceSpec template; bouncing the service \
1176 (call set_service_template to enable one-at-a-time recreation)"
1177 );
1178 let count = u32::try_from(containers.len()).unwrap_or(u32::MAX);
1179 {
1180 let sm = self.service_manager.read().await;
1181 sm.scale_service(service_name, 0).await?;
1182 sm.scale_service(service_name, count).await?;
1183 }
1184 self.record_scale_action(service_name).await;
1185 return Ok(());
1186 };
1187
1188 let mut resources = spec.resources.clone();
1191 resources.cpu = Some(f64::from(rec.cpu_millis) / 1000.0);
1192 resources.memory = Some(format!("{}Mi", rec.memory_mib));
1193 spec.resources = resources;
1194
1195 for id in &containers {
1197 info!(
1198 service = service_name,
1199 container = %id,
1200 cpu_millis = rec.cpu_millis,
1201 memory_mib = rec.memory_mib,
1202 "rolling restart: recreating replica with new resources"
1203 );
1204 if let Err(e) = self
1208 .runtime
1209 .stop_container(id, Duration::from_secs(10))
1210 .await
1211 {
1212 debug!(service = service_name, container = %id, error = %e, "rolling restart: stop failed (continuing)");
1213 }
1214 if let Err(e) = self.runtime.remove_container(id).await {
1215 debug!(service = service_name, container = %id, error = %e, "rolling restart: remove failed (continuing)");
1216 }
1217 self.vpa.write().await.engine.forget(&id.to_string());
1218
1219 if let Err(e) = self.runtime.create_container(id, &spec).await {
1220 error!(service = service_name, container = %id, error = %e, "rolling restart: recreate failed");
1221 return Err(e);
1222 }
1223 if let Err(e) = self.runtime.start_container(id).await {
1224 error!(service = service_name, container = %id, error = %e, "rolling restart: start failed");
1225 return Err(e);
1226 }
1227 }
1228 self.record_scale_action(service_name).await;
1229 Ok(())
1230 }
1231
1232 async fn evaluate_and_scale(&self, service_name: &str) -> Result<()> {
1234 if !self.should_scale(service_name).await {
1236 return Ok(());
1237 }
1238
1239 let aggregated = match self.metrics.collect(service_name).await {
1241 Ok(m) => m,
1242 Err(e) => {
1243 debug!(
1246 service = service_name,
1247 error = %e,
1248 "No metrics available for service"
1249 );
1250 return Ok(());
1251 }
1252 };
1253
1254 let decision = {
1256 let mut autoscaler = self.autoscaler.write().await;
1257 match autoscaler.evaluate(service_name, &aggregated) {
1258 Ok(d) => d,
1259 Err(e) => {
1260 debug!(
1261 service = service_name,
1262 error = %e,
1263 "Failed to evaluate scaling"
1264 );
1265 return Ok(());
1266 }
1267 }
1268 };
1269
1270 debug!(
1271 service = service_name,
1272 ?decision,
1273 cpu = aggregated.avg_cpu_percent,
1274 memory = aggregated.avg_memory_percent,
1275 instances = aggregated.instance_count,
1276 "Autoscale evaluation"
1277 );
1278
1279 if let Some(target) = decision.target_replicas() {
1281 info!(
1282 service = service_name,
1283 target_replicas = target,
1284 decision = ?decision,
1285 "Executing autoscale"
1286 );
1287
1288 if let Err(e) = self
1290 .service_manager
1291 .read()
1292 .await
1293 .scale_service(service_name, target)
1294 .await
1295 {
1296 error!(
1297 service = service_name,
1298 target = target,
1299 error = %e,
1300 "Failed to scale service"
1301 );
1302 return Err(e);
1303 }
1304
1305 if target > 0 {
1308 self.last_active
1309 .write()
1310 .await
1311 .insert(service_name.to_string(), Instant::now());
1312 }
1313
1314 self.record_scale_action(service_name).await;
1316
1317 {
1319 let mut autoscaler = self.autoscaler.write().await;
1320 if let Err(e) = autoscaler.record_scale_action(service_name, target) {
1321 warn!(
1322 service = service_name,
1323 error = %e,
1324 "Failed to record scale action in autoscaler"
1325 );
1326 }
1327 }
1328 }
1329
1330 Ok(())
1331 }
1332
1333 pub fn shutdown(&self) {
1335 self.shutdown.notify_one();
1336 }
1337
1338 #[must_use]
1340 pub fn interval(&self) -> Duration {
1341 self.interval
1342 }
1343
1344 pub async fn registered_service_count(&self) -> usize {
1346 let specs = self.service_specs.read().await;
1347 specs.len()
1348 }
1349}
1350
1351#[must_use]
1356#[allow(clippy::implicit_hasher)]
1357pub fn has_adaptive_scaling(services: &HashMap<String, zlayer_spec::ServiceSpec>) -> bool {
1358 services
1359 .values()
1360 .any(|s| matches!(s.scale, ScaleSpec::Adaptive { .. }))
1361}
1362
1363#[cfg(test)]
1364#[allow(deprecated)]
1365mod tests {
1366 use super::*;
1367 use crate::runtime::MockRuntime;
1368 use zlayer_scheduler::metrics::{MockMetricsSource, ServiceMetrics};
1369 use zlayer_spec::ScaleTargets;
1370
1371 fn mock_spec() -> zlayer_spec::ServiceSpec {
1372 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
1373 r"
1374version: v1
1375deployment: test
1376services:
1377 test:
1378 rtype: service
1379 image:
1380 name: test:latest
1381 endpoints:
1382 - name: http
1383 protocol: http
1384 port: 8080
1385 scale:
1386 mode: fixed
1387 replicas: 1
1388",
1389 )
1390 .unwrap()
1391 .services
1392 .remove("test")
1393 .unwrap()
1394 }
1395
1396 fn adaptive_spec(
1397 min: u32,
1398 max: u32,
1399 cpu_target: Option<u8>,
1400 memory_target: Option<u8>,
1401 ) -> ScaleSpec {
1402 ScaleSpec::Adaptive {
1403 min,
1404 max,
1405 cooldown: Some(Duration::from_secs(0)), targets: ScaleTargets {
1407 cpu: cpu_target,
1408 memory: memory_target,
1409 rps: None,
1410 custom: Vec::new(),
1411 external: Vec::new(),
1412 },
1413 behavior: None,
1414 triggers: Vec::new(),
1415 idle_window: None,
1416 vertical: None,
1417 predictive: None,
1418 }
1419 }
1420
1421 fn raw_stats(cpu_usec: u64, mem_bytes: u64) -> RawContainerStats {
1422 RawContainerStats {
1423 cpu_usage_usec: cpu_usec,
1424 memory_bytes: mem_bytes,
1425 memory_limit: 512 * 1024 * 1024,
1426 timestamp: Instant::now(),
1427 }
1428 }
1429
1430 #[test]
1431 fn test_vpa_percentile_nearest_rank() {
1432 let mut samples = std::collections::VecDeque::new();
1433 for v in [10.0, 20.0, 30.0, 40.0, 50.0] {
1434 samples.push_back(v);
1435 }
1436 assert_eq!(ContainerUsageHistory::percentile(&samples, 100), Some(50.0));
1437 assert_eq!(ContainerUsageHistory::percentile(&samples, 0), Some(10.0));
1438 assert_eq!(ContainerUsageHistory::percentile(&samples, 90), Some(50.0));
1440 }
1441
1442 #[test]
1443 fn test_vpa_recommend_clamps_to_bounds() {
1444 let mut engine = VpaEngine::new();
1445 let id = "svc-rep-1";
1446 engine.observe(id, &raw_stats(1_000_000, 300 * 1024 * 1024));
1452 std::thread::sleep(Duration::from_millis(5));
1453 engine.observe(id, &raw_stats(1_000_000, 300 * 1024 * 1024));
1454
1455 let spec = VerticalScaleSpec {
1456 mode: VerticalMode::Auto,
1457 min_cpu_millis: Some(500),
1458 max_cpu_millis: Some(2000),
1459 min_memory_mib: Some(128),
1460 max_memory_mib: Some(256),
1461 percentile: 90,
1462 };
1463 let rec = engine.recommend(id, &spec).expect("recommendation");
1464 assert_eq!(rec.cpu_millis, 500);
1467 assert_eq!(rec.memory_mib, 256);
1468 }
1469
1470 #[test]
1471 fn test_deadband() {
1472 let base = VpaRecommendation {
1473 cpu_millis: 1000,
1474 memory_mib: 512,
1475 };
1476 assert!(outside_deadband(None, base));
1478 assert!(!outside_deadband(
1480 Some(base),
1481 VpaRecommendation {
1482 cpu_millis: 1050,
1483 memory_mib: 512
1484 }
1485 ));
1486 assert!(outside_deadband(
1488 Some(base),
1489 VpaRecommendation {
1490 cpu_millis: 1200,
1491 memory_mib: 512
1492 }
1493 ));
1494 }
1495
1496 #[test]
1497 fn test_resource_update_for() {
1498 let rec = VpaRecommendation {
1499 cpu_millis: 1500,
1500 memory_mib: 256,
1501 };
1502 let update = resource_update_for(rec);
1503 assert_eq!(update.cpu_period, Some(100_000));
1504 assert_eq!(update.cpu_quota, Some(150_000));
1506 assert_eq!(update.memory, Some(256 * 1024 * 1024));
1507 }
1508
1509 fn locked(runtime: &Arc<dyn Runtime + Send + Sync>) -> Arc<RwLock<ServiceManager>> {
1512 Arc::new(RwLock::new(ServiceManager::new(runtime.clone())))
1513 }
1514
1515 #[tokio::test]
1516 async fn test_autoscale_controller_creation() {
1517 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1518 let manager = locked(&runtime);
1519
1520 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1521
1522 assert_eq!(controller.interval(), Duration::from_secs(10));
1523 assert_eq!(controller.registered_service_count().await, 0);
1524 }
1525
1526 #[tokio::test]
1527 async fn test_register_service() {
1528 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1529 let manager = locked(&runtime);
1530
1531 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1532
1533 let spec = adaptive_spec(1, 10, Some(70), None);
1535 controller.register_service("api", &spec, 2).await;
1536
1537 assert!(controller.is_registered("api").await);
1538 assert_eq!(controller.registered_service_count().await, 1);
1539 }
1540
1541 #[tokio::test]
1542 async fn test_register_fixed_service_ignored() {
1543 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1544 let manager = locked(&runtime);
1545
1546 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1547
1548 let spec = ScaleSpec::Fixed { replicas: 3 };
1550 controller.register_service("api", &spec, 3).await;
1551
1552 assert!(!controller.is_registered("api").await);
1553 assert_eq!(controller.registered_service_count().await, 0);
1554 }
1555
1556 #[tokio::test]
1557 async fn test_unregister_service() {
1558 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1559 let manager = locked(&runtime);
1560
1561 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1562
1563 let spec = adaptive_spec(1, 10, Some(70), None);
1564 controller.register_service("api", &spec, 2).await;
1565
1566 assert!(controller.is_registered("api").await);
1567
1568 controller.unregister_service("api").await;
1569
1570 assert!(!controller.is_registered("api").await);
1571 assert_eq!(controller.registered_service_count().await, 0);
1572 }
1573
1574 #[tokio::test]
1575 async fn test_has_adaptive_scaling() {
1576 let mut services = HashMap::new();
1577
1578 let mut fixed_spec = mock_spec();
1580 fixed_spec.scale = ScaleSpec::Fixed { replicas: 3 };
1581 services.insert("web".to_string(), fixed_spec);
1582
1583 assert!(!has_adaptive_scaling(&services));
1585
1586 let mut adaptive = mock_spec();
1588 adaptive.scale = adaptive_spec(1, 10, Some(70), None);
1589 services.insert("api".to_string(), adaptive);
1590
1591 assert!(has_adaptive_scaling(&services));
1593 }
1594
1595 #[tokio::test]
1596 async fn test_autoscale_controller_with_mock_metrics() {
1597 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1598 let manager = locked(&runtime);
1599
1600 let mock = Arc::new(MockMetricsSource::new());
1602
1603 mock.set_metrics(
1605 "api",
1606 vec![
1607 ServiceMetrics {
1608 cpu_percent: 85.0,
1609 memory_bytes: 100 * 1024 * 1024,
1610 memory_limit: 512 * 1024 * 1024,
1611 rps: None,
1612 timestamp: Some(Instant::now()),
1613 ..Default::default()
1614 },
1615 ServiceMetrics {
1616 cpu_percent: 90.0,
1617 memory_bytes: 150 * 1024 * 1024,
1618 memory_limit: 512 * 1024 * 1024,
1619 rps: None,
1620 timestamp: Some(Instant::now()),
1621 ..Default::default()
1622 },
1623 ],
1624 )
1625 .await;
1626
1627 let mut metrics = MetricsCollector::new();
1629 metrics.add_source(mock);
1630
1631 let controller = AutoscaleController::with_custom_metrics(
1632 manager.clone(),
1633 runtime,
1634 metrics,
1635 Duration::from_secs(10),
1636 );
1637
1638 Box::pin(
1640 manager
1641 .read()
1642 .await
1643 .upsert_service("api".to_string(), mock_spec()),
1644 )
1645 .await
1646 .unwrap();
1647 manager.read().await.scale_service("api", 2).await.unwrap();
1648
1649 let spec = adaptive_spec(1, 10, Some(70), None);
1650 controller.register_service("api", &spec, 2).await;
1651
1652 controller.evaluate_and_scale("api").await.unwrap();
1654
1655 let count = manager
1657 .read()
1658 .await
1659 .service_replica_count("api")
1660 .await
1661 .unwrap();
1662 assert_eq!(count, 3);
1663 }
1664
1665 #[tokio::test]
1666 async fn test_autoscale_controller_cooldown() {
1667 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1668 let manager = locked(&runtime);
1669
1670 let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1671
1672 let spec = ScaleSpec::Adaptive {
1674 min: 1,
1675 max: 10,
1676 cooldown: Some(Duration::from_secs(60)), targets: ScaleTargets {
1678 cpu: Some(70),
1679 memory: None,
1680 rps: None,
1681 custom: Vec::new(),
1682 external: Vec::new(),
1683 },
1684 behavior: None,
1685 triggers: Vec::new(),
1686 idle_window: None,
1687 vertical: None,
1688 predictive: None,
1689 };
1690
1691 controller.register_service("api", &spec, 2).await;
1692
1693 assert!(controller.should_scale("api").await);
1695
1696 controller.record_scale_action("api").await;
1698
1699 assert!(!controller.should_scale("api").await);
1701 }
1702
1703 #[tokio::test]
1704 async fn test_scale_to_zero_after_idle_window() {
1705 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1706 let manager = locked(&runtime);
1707
1708 let controller =
1709 AutoscaleController::new(manager.clone(), runtime, Duration::from_secs(10));
1710
1711 Box::pin(
1713 manager
1714 .read()
1715 .await
1716 .upsert_service("api".to_string(), mock_spec()),
1717 )
1718 .await
1719 .unwrap();
1720 manager.read().await.scale_service("api", 2).await.unwrap();
1721
1722 let spec = ScaleSpec::Adaptive {
1724 min: 0,
1725 max: 10,
1726 cooldown: Some(Duration::from_secs(0)),
1727 targets: ScaleTargets::default(),
1728 behavior: None,
1729 triggers: Vec::new(),
1730 idle_window: Some(Duration::from_millis(10)),
1731 vertical: None,
1732 predictive: None,
1733 };
1734 controller.register_service("api", &spec, 2).await;
1735
1736 controller.last_active.write().await.insert(
1738 "api".to_string(),
1739 Instant::now().checked_sub(Duration::from_secs(60)).unwrap(),
1740 );
1741
1742 let reaped = controller.evaluate_idle("api").await.unwrap();
1745 assert!(reaped, "service should have been reaped to zero");
1746 assert_eq!(
1747 manager
1748 .read()
1749 .await
1750 .service_replica_count("api")
1751 .await
1752 .unwrap(),
1753 0
1754 );
1755 }
1756
1757 #[tokio::test]
1758 async fn test_mark_active_resets_idle_clock() {
1759 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1760 let manager = locked(&runtime);
1761
1762 let controller =
1763 AutoscaleController::new(manager.clone(), runtime, Duration::from_secs(10));
1764
1765 Box::pin(
1766 manager
1767 .read()
1768 .await
1769 .upsert_service("api".to_string(), mock_spec()),
1770 )
1771 .await
1772 .unwrap();
1773 manager.read().await.scale_service("api", 1).await.unwrap();
1774
1775 let spec = ScaleSpec::Adaptive {
1776 min: 0,
1777 max: 10,
1778 cooldown: Some(Duration::from_secs(0)),
1779 targets: ScaleTargets::default(),
1780 behavior: None,
1781 triggers: Vec::new(),
1782 idle_window: Some(Duration::from_secs(300)),
1783 vertical: None,
1784 predictive: None,
1785 };
1786 controller.register_service("api", &spec, 1).await;
1787
1788 controller.last_active.write().await.insert(
1790 "api".to_string(),
1791 Instant::now()
1792 .checked_sub(Duration::from_secs(600))
1793 .unwrap(),
1794 );
1795 controller.mark_active_async("api").await;
1796
1797 let reaped = controller.evaluate_idle("api").await.unwrap();
1799 assert!(!reaped, "marked-active service must not be reaped");
1800 assert_eq!(
1801 manager
1802 .read()
1803 .await
1804 .service_replica_count("api")
1805 .await
1806 .unwrap(),
1807 1
1808 );
1809 }
1810
1811 #[tokio::test]
1812 async fn test_no_scale_to_zero_when_min_nonzero() {
1813 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1814 let manager = locked(&runtime);
1815
1816 let controller =
1817 AutoscaleController::new(manager.clone(), runtime, Duration::from_secs(10));
1818
1819 Box::pin(
1820 manager
1821 .read()
1822 .await
1823 .upsert_service("api".to_string(), mock_spec()),
1824 )
1825 .await
1826 .unwrap();
1827 manager.read().await.scale_service("api", 2).await.unwrap();
1828
1829 let spec = ScaleSpec::Adaptive {
1831 min: 1,
1832 max: 10,
1833 cooldown: Some(Duration::from_secs(0)),
1834 targets: ScaleTargets::default(),
1835 behavior: None,
1836 triggers: Vec::new(),
1837 idle_window: Some(Duration::from_millis(1)),
1838 vertical: None,
1839 predictive: None,
1840 };
1841 controller.register_service("api", &spec, 2).await;
1842 controller.last_active.write().await.insert(
1843 "api".to_string(),
1844 Instant::now().checked_sub(Duration::from_secs(60)).unwrap(),
1845 );
1846
1847 let reaped = controller.evaluate_idle("api").await.unwrap();
1848 assert!(!reaped, "min>0 must never scale to zero");
1849 assert_eq!(
1850 manager
1851 .read()
1852 .await
1853 .service_replica_count("api")
1854 .await
1855 .unwrap(),
1856 2
1857 );
1858 }
1859
1860 #[tokio::test]
1861 async fn test_autoscale_controller_shutdown() {
1862 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1863 let manager = locked(&runtime);
1864
1865 let controller = Arc::new(AutoscaleController::new(
1866 manager,
1867 runtime,
1868 Duration::from_millis(100), ));
1870
1871 let controller_clone = controller.clone();
1872
1873 let handle = tokio::spawn(async move { Box::pin(controller_clone.run_loop()).await });
1875
1876 tokio::time::sleep(Duration::from_millis(50)).await;
1878
1879 controller.shutdown();
1881
1882 let result = handle.await.unwrap();
1884 assert!(result.is_ok());
1885 }
1886}