Skip to main content

zlayer_agent/
autoscale_controller.rs

1//! `AutoscaleController` - Connects autoscaling decisions to container scaling
2//!
3//! This module provides an `AutoscaleController` that bridges the scheduler's
4//! autoscaling logic with the agent's `ServiceManager` to automatically scale
5//! services based on resource utilization.
6//!
7//! It drives three orthogonal control loops over the same metrics tick:
8//!
9//! 1. **Horizontal** (replica count) — the original adaptive autoscaler that
10//!    grows/shrinks replicas to hit CPU / memory / RPS targets.
11//! 2. **Scale-to-zero** (Phase 2) — when an adaptive service declares
12//!    `idle_window` and `min == 0`, the controller reaps every replica after
13//!    the service has been idle (no meaningful CPU / RPS) for the window. The
14//!    proxy activator wakes it again by calling [`AutoscaleController::mark_active`]
15//!    on the next inbound request.
16//! 3. **Vertical** (right-sizing, Phase 3) — when an adaptive service declares
17//!    a `vertical` block, a [`VpaEngine`] observes per-replica usage and emits
18//!    CPU-millis / memory-MiB recommendations. In `Recommend` mode they are
19//!    logged; in `Auto` mode they are applied via
20//!    [`Runtime::update_container_resources`], with a rolling restart fallback
21//!    when the runtime cannot live-update a running container's cgroup.
22//!
23//! # Architecture
24//!
25//! ```text
26//! ┌────────────────────────────────────────────────────────────────────┐
27//! │                     AutoscaleController                            │
28//! │  ┌─────────────────┐  ┌────────────┐  ┌──────────────────┐       │
29//! │  │ CgroupsMetrics  │  │ Autoscaler │  │ ServiceManager   │       │
30//! │  │    Source       │──│  + VpaEngine│──│  (scaling)       │       │
31//! │  └─────────────────┘  └────────────┘  └──────────────────┘       │
32//! └────────────────────────────────────────────────────────────────────┘
33//! ```
34//!
35//! # Example
36//!
37//! ```ignore
38//! use zlayer_agent::autoscale_controller::AutoscaleController;
39//! use zlayer_agent::{ServiceManager, RuntimeConfig, create_runtime};
40//! use std::sync::Arc;
41//! use std::time::Duration;
42//!
43//! // Create runtime and service manager
44//! let runtime = create_runtime(RuntimeConfig::Mock).await?;
45//! let manager = Arc::new(ServiceManager::new(runtime.clone()));
46//!
47//! // Create autoscale controller
48//! let controller = AutoscaleController::new(
49//!     manager.clone(),
50//!     runtime.clone(),
51//!     Duration::from_secs(10),
52//! );
53//!
54//! // Register services with adaptive scaling
55//! controller.register_service("api", &scale_spec, 2).await;
56//!
57//! // Run the autoscaling loop (in background)
58//! let handle = tokio::spawn(async move {
59//!     controller.run_loop().await
60//! });
61//!
62//! // Later, shutdown
63//! controller.shutdown();
64//! ```
65
66use crate::error::{AgentError, Result};
67use crate::metrics_providers::{LockedServiceManagerContainerProvider, RuntimeStatsProvider};
68use crate::runtime::{ContainerId, ContainerResourceUpdate, Runtime};
69use crate::service::ServiceManager;
70use std::collections::HashMap;
71use std::sync::Arc;
72use std::time::{Duration, Instant};
73use tokio::sync::RwLock;
74use tracing::{debug, error, info, warn};
75use zlayer_scheduler::metrics::{
76    CgroupsMetricsSource, ContainerStatsProvider, MetricsCollector, MetricsContainerId,
77    MetricsSource, RawContainerStats,
78};
79use zlayer_scheduler::Autoscaler;
80use zlayer_spec::{ScaleSpec, ServiceSpec, VerticalMode, VerticalScaleSpec};
81
82/// Default autoscaling evaluation interval
83pub const DEFAULT_AUTOSCALE_INTERVAL: Duration = Duration::from_secs(10);
84
85/// CPU usage (microseconds-per-second-equivalent) below which a replica is
86/// considered idle for scale-to-zero accounting. The cgroups stats provider
87/// reports cumulative `cpu_usage_usec`; the [`VpaEngine`] turns successive
88/// samples into a per-second rate, and any service whose busiest replica
89/// stays under this rate is treated as "no meaningful work".
90///
91/// `5_000` µs/s ≈ 0.5% of one core — low enough to ignore idle bookkeeping
92/// (health probes, GC ticks) but high enough that a real request bumps it.
93const IDLE_CPU_RATE_USEC_PER_SEC: f64 = 5_000.0;
94
95/// Fraction by which a fresh vertical recommendation must differ from the
96/// last-applied value before it is re-applied. Prevents thrashing the cgroup
97/// hierarchy (and, in the rolling-restart fallback, the containers themselves)
98/// on sub-percent jitter. 0.10 == "only act on a ≥10% change".
99const VERTICAL_DEADBAND: f64 = 0.10;
100
101/// A vertical right-sizing recommendation for a single container: the target
102/// CPU allotment in millicores and the target memory in MiB.
103///
104/// Mirrors the shape the agent applies through
105/// [`Runtime::update_container_resources`]: `cpu_millis` becomes a CFS
106/// quota/period pair and `memory_mib` a byte limit.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub struct VpaRecommendation {
109    /// Recommended CPU allotment in millicores (1000 == one full core).
110    pub cpu_millis: u32,
111    /// Recommended memory limit in MiB.
112    pub memory_mib: u32,
113}
114
115/// Per-container vertical-pod-autoscaler engine.
116///
117/// Keeps a small ring buffer of recent usage samples per container, derives a
118/// CPU rate (millicores) from successive cumulative-CPU readings, and emits a
119/// percentile-based recommendation clamped to the service's
120/// [`VerticalScaleSpec`] bounds. This is the agent-local engine used by
121/// [`AutoscaleController`]; it deliberately lives here (rather than in
122/// `zlayer-scheduler`) so the whole vertical-apply path is self-contained in
123/// the controller.
124#[derive(Debug, Default)]
125pub struct VpaEngine {
126    /// Per-container usage history, keyed by the container's display id.
127    history: HashMap<String, ContainerUsageHistory>,
128}
129
130/// Rolling usage history for a single container. Bounded to the most recent
131/// [`Self::CAPACITY`] samples so the percentile computation has a stable,
132/// memory-bounded window.
133#[derive(Debug, Default)]
134struct ContainerUsageHistory {
135    /// Recent CPU rates in millicores, oldest first.
136    cpu_millis: std::collections::VecDeque<f64>,
137    /// Recent memory readings in MiB, oldest first.
138    memory_mib: std::collections::VecDeque<f64>,
139    /// Last cumulative CPU reading (µs) + the wallclock instant it was taken,
140    /// used to derive a rate from the next sample.
141    last_cpu: Option<(u64, Instant)>,
142}
143
144impl ContainerUsageHistory {
145    /// Maximum number of samples retained per container.
146    const CAPACITY: usize = 32;
147
148    fn push_cpu(&mut self, millis: f64) {
149        if self.cpu_millis.len() == Self::CAPACITY {
150            self.cpu_millis.pop_front();
151        }
152        self.cpu_millis.push_back(millis);
153    }
154
155    fn push_memory(&mut self, mib: f64) {
156        if self.memory_mib.len() == Self::CAPACITY {
157            self.memory_mib.pop_front();
158        }
159        self.memory_mib.push_back(mib);
160    }
161
162    /// Percentile (0-100) over a sample window, nearest-rank. Returns `None`
163    /// when the window is empty.
164    fn percentile(samples: &std::collections::VecDeque<f64>, pct: u8) -> Option<f64> {
165        if samples.is_empty() {
166            return None;
167        }
168        let mut sorted: Vec<f64> = samples.iter().copied().collect();
169        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
170        let pct = f64::from(pct.min(100)) / 100.0;
171        // Nearest-rank: index = ceil(pct * n) - 1, clamped into range.
172        #[allow(
173            clippy::cast_possible_truncation,
174            clippy::cast_sign_loss,
175            clippy::cast_precision_loss
176        )]
177        let idx = ((pct * sorted.len() as f64).ceil() as usize)
178            .saturating_sub(1)
179            .min(sorted.len() - 1);
180        Some(sorted[idx])
181    }
182}
183
184impl VpaEngine {
185    /// Construct an empty engine.
186    #[must_use]
187    pub fn new() -> Self {
188        Self::default()
189    }
190
191    /// Record one raw stats sample for `container` and return the busiest-replica
192    /// CPU rate (millicores) this sample implies, so callers can drive
193    /// scale-to-zero idle accounting off the same observation.
194    ///
195    /// CPU rate is derived from the delta between this sample's cumulative
196    /// `cpu_usage_usec` and the previous one over the elapsed wallclock time:
197    /// `millis = (Δusec / Δsec) / 1000`. The first sample for a container has no
198    /// predecessor, so it contributes a memory reading but a `0.0` CPU rate.
199    pub fn observe(&mut self, container: &str, stats: &RawContainerStats) -> f64 {
200        let now = Instant::now();
201        let hist = self.history.entry(container.to_string()).or_default();
202
203        let cpu_millis = if let Some((prev_usec, prev_at)) = hist.last_cpu {
204            let elapsed = now.duration_since(prev_at).as_secs_f64();
205            let delta_usec = stats.cpu_usage_usec.saturating_sub(prev_usec);
206            if elapsed > 0.0 {
207                // (µs busy / s) / 1000 µs-per-ms-of-core == millicores.
208                #[allow(clippy::cast_precision_loss)]
209                let rate = delta_usec as f64 / elapsed / 1000.0;
210                hist.push_cpu(rate);
211                rate
212            } else {
213                0.0
214            }
215        } else {
216            0.0
217        };
218        hist.last_cpu = Some((stats.cpu_usage_usec, now));
219
220        #[allow(clippy::cast_precision_loss)]
221        let mem_mib = stats.memory_bytes as f64 / (1024.0 * 1024.0);
222        hist.push_memory(mem_mib);
223
224        cpu_millis
225    }
226
227    /// Compute a recommendation for `container` from its observed history,
228    /// clamped to `spec`'s bounds. Returns `None` until enough samples exist to
229    /// produce a stable percentile (at least one CPU rate observation).
230    #[must_use]
231    pub fn recommend(
232        &self,
233        container: &str,
234        spec: &VerticalScaleSpec,
235    ) -> Option<VpaRecommendation> {
236        let hist = self.history.get(container)?;
237        let cpu_pct = ContainerUsageHistory::percentile(&hist.cpu_millis, spec.percentile)?;
238        let mem_pct = ContainerUsageHistory::percentile(&hist.memory_mib, spec.percentile)?;
239
240        // Round up to whole units and clamp to the configured bounds.
241        #[allow(
242            clippy::cast_possible_truncation,
243            clippy::cast_sign_loss,
244            clippy::cast_precision_loss
245        )]
246        let cpu_millis = {
247            let mut v = cpu_pct.ceil().max(0.0) as u32;
248            if let Some(min) = spec.min_cpu_millis {
249                v = v.max(min);
250            }
251            if let Some(max) = spec.max_cpu_millis {
252                v = v.min(max);
253            }
254            v.max(1)
255        };
256        #[allow(
257            clippy::cast_possible_truncation,
258            clippy::cast_sign_loss,
259            clippy::cast_precision_loss
260        )]
261        let memory_mib = {
262            let mut v = mem_pct.ceil().max(0.0) as u32;
263            if let Some(min) = spec.min_memory_mib {
264                v = v.max(min);
265            }
266            if let Some(max) = spec.max_memory_mib {
267                v = v.min(max);
268            }
269            v.max(1)
270        };
271
272        Some(VpaRecommendation {
273            cpu_millis,
274            memory_mib,
275        })
276    }
277
278    /// Drop all history for a container (e.g. after it is removed by a
279    /// scale-down or rolling restart) so a recreated replica starts fresh.
280    pub fn forget(&mut self, container: &str) {
281        self.history.remove(container);
282    }
283}
284
285/// True when a fresh recommendation differs from the last-applied one by more
286/// than [`VERTICAL_DEADBAND`] on either axis. A `None` previous value always
287/// passes (first application).
288fn outside_deadband(prev: Option<VpaRecommendation>, next: VpaRecommendation) -> bool {
289    let Some(prev) = prev else { return true };
290    let exceeds = |old: u32, new: u32| {
291        if old == 0 {
292            return new != 0;
293        }
294        let delta = (f64::from(new) - f64::from(old)).abs();
295        delta / f64::from(old) > VERTICAL_DEADBAND
296    };
297    exceeds(prev.cpu_millis, next.cpu_millis) || exceeds(prev.memory_mib, next.memory_mib)
298}
299
300/// Build the [`ContainerResourceUpdate`] that applies `rec` to a container:
301/// `cpu_millis` becomes a CFS quota over a fixed 100 ms period, and
302/// `memory_mib` becomes a byte limit.
303fn resource_update_for(rec: VpaRecommendation) -> ContainerResourceUpdate {
304    ContainerResourceUpdate {
305        cpu_period: Some(100_000),
306        cpu_quota: Some(100_000 * i64::from(rec.cpu_millis) / 1000),
307        memory: Some(i64::from(rec.memory_mib) * 1024 * 1024),
308        ..Default::default()
309    }
310}
311
312/// Controller that connects autoscaling decisions to actual container scaling
313///
314/// The `AutoscaleController` periodically collects metrics from running containers,
315/// evaluates whether scaling is needed using the `Autoscaler`, and executes scaling
316/// decisions through the `ServiceManager`. On the same tick it runs the
317/// scale-to-zero idle reaper and the vertical right-sizing pass.
318pub struct AutoscaleController {
319    /// Service manager for executing scaling operations. Held as the daemon's
320    /// post-`Arc::try_unwrap` `Arc<RwLock<ServiceManager>>`; each call site takes
321    /// a short read guard.
322    service_manager: Arc<RwLock<ServiceManager>>,
323    /// Metrics collector with cgroups source
324    metrics: Arc<MetricsCollector>,
325    /// Autoscaler decision engine
326    autoscaler: Arc<RwLock<Autoscaler>>,
327    /// Service specs for scale configuration (`service_name` -> spec)
328    service_specs: Arc<RwLock<HashMap<String, ScaleSpec>>>,
329    /// Last scale times for cooldown tracking (`service_name` -> instant)
330    last_scale_times: Arc<RwLock<HashMap<String, Instant>>>,
331    /// Evaluation interval
332    interval: Duration,
333    /// Shutdown signal
334    shutdown: Arc<tokio::sync::Notify>,
335    // --- Phase 2 / Phase 3 state (kept at the struct end so the horizontal
336    //     path above is untouched) ---
337    /// Container runtime handle, retained so the vertical-apply path can call
338    /// [`Runtime::update_container_resources`] and the rolling-restart fallback
339    /// can recreate containers. Cloned before the same `Arc` is moved into the
340    /// stats provider in [`AutoscaleController::new`].
341    runtime: Arc<dyn Runtime + Send + Sync>,
342    /// The stats provider wrapping the runtime, retained so the vertical pass
343    /// can fetch per-replica [`RawContainerStats`] directly (the horizontal
344    /// path goes through the aggregating [`MetricsCollector`]).
345    stats_provider: Arc<RuntimeStatsProvider>,
346    /// Last time each service showed meaningful activity, for scale-to-zero.
347    last_active: Arc<RwLock<HashMap<String, Instant>>>,
348    /// Idle window per service (`Some` enables scale-to-zero when `min == 0`).
349    idle_window: Arc<RwLock<HashMap<String, Duration>>>,
350    /// Minimum replicas per service (scale-to-zero only fires when `min == 0`).
351    min_replicas: Arc<RwLock<HashMap<String, u32>>>,
352    /// Vertical right-sizing spec per service (`Some` enables the VPA pass).
353    vertical_specs: Arc<RwLock<HashMap<String, VerticalScaleSpec>>>,
354    /// Full base [`ServiceSpec`] per service, supplied via
355    /// [`AutoscaleController::set_service_template`]. Required for the
356    /// rolling-restart fallback so a recreated replica keeps its endpoints,
357    /// env, volumes, etc. while picking up the new resources.
358    service_templates: Arc<RwLock<HashMap<String, ServiceSpec>>>,
359    /// Vertical decision engine + last-applied recommendation per service, used
360    /// to apply the deadband.
361    vpa: Arc<RwLock<VpaState>>,
362}
363
364/// Internal vertical-autoscaler state: the engine plus the last recommendation
365/// actually applied to each service (for deadband comparison).
366#[derive(Default)]
367struct VpaState {
368    engine: VpaEngine,
369    last_applied: HashMap<String, VpaRecommendation>,
370}
371
372impl AutoscaleController {
373    /// Create a new autoscale controller
374    ///
375    /// # Arguments
376    /// * `service_manager` - The service manager used to execute scaling operations
377    /// * `runtime` - The container runtime for collecting metrics and applying
378    ///   vertical resource updates
379    /// * `interval` - How often to evaluate scaling decisions
380    ///
381    /// The `runtime` handle is retained on the controller (for the vertical
382    /// apply / rolling-restart paths) *and* moved into the metrics stats
383    /// provider, so it is cloned once here.
384    ///
385    /// # Example
386    ///
387    /// ```ignore
388    /// let controller = AutoscaleController::new(
389    ///     service_manager,
390    ///     runtime,
391    ///     Duration::from_secs(10),
392    /// );
393    /// ```
394    #[must_use]
395    pub fn new(
396        service_manager: Arc<RwLock<ServiceManager>>,
397        runtime: Arc<dyn Runtime + Send + Sync>,
398        interval: Duration,
399    ) -> Self {
400        // Create metrics collector with cgroups source
401        let mut metrics = MetricsCollector::new();
402
403        // Create the stats provider wrapping the runtime. Clone the runtime
404        // first so the controller keeps its own handle for vertical apply.
405        let runtime_for_controller = runtime.clone();
406        let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
407
408        // Create the service container provider wrapping the locked service
409        // manager.
410        let service_provider = Arc::new(LockedServiceManagerContainerProvider::new(
411            service_manager.clone(),
412        ));
413
414        // Create cgroups metrics source
415        let source: Arc<dyn MetricsSource> = Arc::new(CgroupsMetricsSource::new(
416            service_provider,
417            stats_provider.clone(),
418        ));
419        metrics.add_source(source);
420
421        Self {
422            service_manager,
423            metrics: Arc::new(metrics),
424            autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
425            service_specs: Arc::new(RwLock::new(HashMap::new())),
426            last_scale_times: Arc::new(RwLock::new(HashMap::new())),
427            interval,
428            shutdown: Arc::new(tokio::sync::Notify::new()),
429            runtime: runtime_for_controller,
430            stats_provider,
431            last_active: Arc::new(RwLock::new(HashMap::new())),
432            idle_window: Arc::new(RwLock::new(HashMap::new())),
433            min_replicas: Arc::new(RwLock::new(HashMap::new())),
434            vertical_specs: Arc::new(RwLock::new(HashMap::new())),
435            service_templates: Arc::new(RwLock::new(HashMap::new())),
436            vpa: Arc::new(RwLock::new(VpaState::default())),
437        }
438    }
439
440    /// Create with a custom metrics collector (useful for testing).
441    ///
442    /// Requires a `runtime` handle so the vertical-apply path is still wired in
443    /// tests; pass a mock runtime when only the horizontal/scale-to-zero paths
444    /// are under test.
445    #[must_use]
446    pub fn with_custom_metrics(
447        service_manager: Arc<RwLock<ServiceManager>>,
448        runtime: Arc<dyn Runtime + Send + Sync>,
449        metrics: MetricsCollector,
450        interval: Duration,
451    ) -> Self {
452        let runtime_for_controller = runtime.clone();
453        let stats_provider = Arc::new(RuntimeStatsProvider::new(runtime));
454        Self {
455            service_manager,
456            metrics: Arc::new(metrics),
457            autoscaler: Arc::new(RwLock::new(Autoscaler::new())),
458            service_specs: Arc::new(RwLock::new(HashMap::new())),
459            last_scale_times: Arc::new(RwLock::new(HashMap::new())),
460            interval,
461            shutdown: Arc::new(tokio::sync::Notify::new()),
462            runtime: runtime_for_controller,
463            stats_provider,
464            last_active: Arc::new(RwLock::new(HashMap::new())),
465            idle_window: Arc::new(RwLock::new(HashMap::new())),
466            min_replicas: Arc::new(RwLock::new(HashMap::new())),
467            vertical_specs: Arc::new(RwLock::new(HashMap::new())),
468            service_templates: Arc::new(RwLock::new(HashMap::new())),
469            vpa: Arc::new(RwLock::new(VpaState::default())),
470        }
471    }
472
473    /// Push an additional [`MetricsSource`] into the controller's collector.
474    ///
475    /// The daemon uses this to feed real requests-per-second from the L7 proxy
476    /// (wrapped in a [`zlayer_scheduler::metrics::ProxyRpsMetricsSource`]) into
477    /// the same [`MetricsCollector`] that drives the horizontal pass, so
478    /// [`zlayer_scheduler::metrics::AggregatedMetrics::total_rps`] is populated
479    /// and RPS targets / triggers fire on live traffic.
480    ///
481    /// This is a builder method that must be called **before** the controller is
482    /// shared (the collector is still uniquely owned at construction time, so
483    /// the in-place mutation always succeeds). If the collector has already been
484    /// cloned, the source is dropped with a warning rather than panicking.
485    #[must_use]
486    pub fn with_extra_metrics_source(mut self, source: Arc<dyn MetricsSource>) -> Self {
487        if let Some(collector) = Arc::get_mut(&mut self.metrics) {
488            collector.add_source(source);
489        } else {
490            warn!(
491                "with_extra_metrics_source called after the metrics collector was shared; \
492                 source ignored"
493            );
494        }
495        self
496    }
497
498    /// Register a service for autoscaling
499    ///
500    /// Only services with `ScaleSpec::Adaptive` will be evaluated for autoscaling.
501    /// Services with `Fixed` or `Manual` scaling are ignored by the autoscaler loop.
502    ///
503    /// Beyond the horizontal registration, this captures the Phase 2 / Phase 3
504    /// configuration carried on the adaptive spec: `idle_window` + `min` (for
505    /// scale-to-zero) and `vertical` (for right-sizing). The service is seeded
506    /// as freshly-active so a just-registered service is not immediately reaped.
507    ///
508    /// # Arguments
509    /// * `name` - Service name
510    /// * `spec` - The service's scale specification
511    /// * `initial_replicas` - Current number of replicas
512    pub async fn register_service(&self, name: &str, spec: &ScaleSpec, initial_replicas: u32) {
513        // Only register adaptive services
514        let ScaleSpec::Adaptive {
515            min,
516            idle_window,
517            vertical,
518            ..
519        } = spec
520        else {
521            debug!(
522                service = name,
523                "Skipping registration for non-adaptive service"
524            );
525            return;
526        };
527
528        // Register with autoscaler
529        {
530            let mut autoscaler = self.autoscaler.write().await;
531            autoscaler.register_service(name, spec.clone(), initial_replicas);
532        }
533
534        // Store spec for reference
535        {
536            let mut specs = self.service_specs.write().await;
537            specs.insert(name.to_string(), spec.clone());
538        }
539
540        // Phase 2: scale-to-zero bookkeeping.
541        {
542            let mut mins = self.min_replicas.write().await;
543            mins.insert(name.to_string(), *min);
544        }
545        if let Some(window) = idle_window {
546            self.idle_window
547                .write()
548                .await
549                .insert(name.to_string(), *window);
550        } else {
551            self.idle_window.write().await.remove(name);
552        }
553        // Seed activity so a just-registered service has a full idle window
554        // before it can be reaped.
555        self.last_active
556            .write()
557            .await
558            .insert(name.to_string(), Instant::now());
559
560        // Phase 3: vertical right-sizing.
561        if let Some(v) = vertical {
562            if matches!(v.mode, VerticalMode::Recommend | VerticalMode::Auto) {
563                self.vertical_specs
564                    .write()
565                    .await
566                    .insert(name.to_string(), v.clone());
567            } else {
568                self.vertical_specs.write().await.remove(name);
569            }
570        } else {
571            self.vertical_specs.write().await.remove(name);
572        }
573
574        info!(
575            service = name,
576            initial_replicas,
577            idle_window_secs = idle_window.as_ref().map(Duration::as_secs),
578            min = *min,
579            vertical = vertical.is_some(),
580            "Registered service for autoscaling"
581        );
582    }
583
584    /// Supply (or refresh) the full base [`ServiceSpec`] for a service.
585    ///
586    /// Optional, but required for the vertical-apply **rolling restart**
587    /// fallback (`update_container_resources` → `Unsupported`): recreating a
588    /// replica needs its endpoints / env / volumes, which the [`ScaleSpec`]
589    /// alone does not carry. The daemon wires this alongside
590    /// [`AutoscaleController::register_service`]. Without a template, the
591    /// fallback degrades to a full `scale 0 → scale n` bounce.
592    pub async fn set_service_template(&self, name: &str, spec: ServiceSpec) {
593        self.service_templates
594            .write()
595            .await
596            .insert(name.to_string(), spec);
597    }
598
599    /// Mark a service as active *now*, resetting its scale-to-zero idle clock.
600    ///
601    /// Called by the proxy activator when an inbound request wakes (or keeps
602    /// awake) a service so the idle reaper does not tear it down while it is
603    /// actively serving traffic. Safe to call for services that are not
604    /// registered for scale-to-zero — it simply records a timestamp that the
605    /// idle pass ignores.
606    pub fn mark_active(&self, service: &str) {
607        // Synchronous entry point for callers outside an async context: take
608        // the lock with `blocking_write` only when we must, but the common
609        // path (inside the proxy's async task) should prefer the async helper.
610        let last_active = self.last_active.clone();
611        let service = service.to_string();
612        if let Ok(mut guard) = last_active.try_write() {
613            guard.insert(service, Instant::now());
614            return;
615        }
616        // Lock contended: fall back to a detached task so we never block the
617        // caller. The activation timestamp is still recorded promptly.
618        tokio::spawn(async move {
619            last_active.write().await.insert(service, Instant::now());
620        });
621    }
622
623    /// Async variant of [`AutoscaleController::mark_active`] for callers already
624    /// inside an async context (avoids the `try_write`/spawn dance).
625    pub async fn mark_active_async(&self, service: &str) {
626        self.last_active
627            .write()
628            .await
629            .insert(service.to_string(), Instant::now());
630    }
631
632    /// Unregister a service from autoscaling
633    pub async fn unregister_service(&self, name: &str) {
634        {
635            let mut autoscaler = self.autoscaler.write().await;
636            autoscaler.unregister_service(name);
637        }
638
639        self.service_specs.write().await.remove(name);
640        self.last_scale_times.write().await.remove(name);
641        self.last_active.write().await.remove(name);
642        self.idle_window.write().await.remove(name);
643        self.min_replicas.write().await.remove(name);
644        self.vertical_specs.write().await.remove(name);
645        self.service_templates.write().await.remove(name);
646        self.vpa.write().await.last_applied.remove(name);
647
648        info!(service = name, "Unregistered service from autoscaling");
649    }
650
651    /// Check if a service is registered for autoscaling
652    pub async fn is_registered(&self, name: &str) -> bool {
653        let specs = self.service_specs.read().await;
654        specs.contains_key(name)
655    }
656
657    /// Check if a service is in cooldown period
658    ///
659    /// Returns true if the service was scaled recently and is still in cooldown.
660    async fn should_scale(&self, service_name: &str) -> bool {
661        // Get the cooldown duration from the spec
662        let cooldown = {
663            let specs = self.service_specs.read().await;
664            match specs.get(service_name) {
665                Some(ScaleSpec::Adaptive { cooldown, .. }) => {
666                    cooldown.unwrap_or(zlayer_scheduler::DEFAULT_COOLDOWN)
667                }
668                _ => return false, // Not adaptive, shouldn't scale
669            }
670        };
671
672        // Check if we're past the cooldown period
673        let last_scale_times = self.last_scale_times.read().await;
674        if let Some(last_time) = last_scale_times.get(service_name) {
675            if last_time.elapsed() < cooldown {
676                let remaining = cooldown
677                    .checked_sub(last_time.elapsed())
678                    .unwrap_or_default();
679                debug!(
680                    service = service_name,
681                    remaining_secs = remaining.as_secs(),
682                    "Service in cooldown"
683                );
684                return false;
685            }
686        }
687
688        true
689    }
690
691    /// Record that a scale action occurred
692    async fn record_scale_action(&self, service_name: &str) {
693        let mut times = self.last_scale_times.write().await;
694        times.insert(service_name.to_string(), Instant::now());
695    }
696
697    /// Run the autoscaling loop
698    ///
699    /// This method should be spawned as a background task. It will continuously
700    /// evaluate scaling decisions at the configured interval until shutdown is
701    /// signaled.
702    ///
703    /// # Returns
704    /// Returns `Ok(())` when shutdown is signaled, or an error if something
705    /// goes wrong.
706    ///
707    /// # Example
708    ///
709    /// ```ignore
710    /// let controller = Arc::new(AutoscaleController::new(...));
711    /// let controller_clone = controller.clone();
712    ///
713    /// // Spawn the autoscale loop
714    /// let handle = tokio::spawn(async move {
715    ///     controller_clone.run_loop().await
716    /// });
717    ///
718    /// // Later, shutdown
719    /// controller.shutdown();
720    /// handle.await.unwrap();
721    /// ```
722    /// # Errors
723    /// Returns an error if the autoscale loop encounters an unrecoverable error.
724    #[allow(clippy::cast_possible_truncation)]
725    pub async fn run_loop(&self) -> Result<()> {
726        let mut ticker = tokio::time::interval(self.interval);
727
728        info!(
729            interval_ms = self.interval.as_millis() as u64,
730            "Starting autoscale controller loop"
731        );
732
733        loop {
734            tokio::select! {
735                _ = ticker.tick() => {
736                    // Boxed: the evaluation future is large; keep it off the
737                    // select!'s stack future (clippy::large_future).
738                    Box::pin(self.evaluate_all_services()).await;
739                }
740                () = self.shutdown.notified() => {
741                    info!("Autoscale controller shutting down");
742                    break;
743                }
744            }
745        }
746
747        Ok(())
748    }
749
750    /// Self-discover services from the live [`ServiceManager`] and (re)register
751    /// any adaptive service whose [`ScaleSpec`] is new or has changed since the
752    /// last tick. Non-adaptive services are ignored by `register_service`.
753    ///
754    /// This is what makes the controller scale *real* services: the daemon never
755    /// has to imperatively call `register_service` on deploy — every tick the
756    /// controller reconciles its registration set against the manager's current
757    /// view. A service whose adaptive spec is unchanged is left alone (so its
758    /// cooldown / idle clocks are preserved); a removed service is unregistered.
759    async fn discover_services(&self) {
760        let live = self.service_manager.read().await.service_specs().await;
761
762        // Register / refresh adaptive services.
763        let mut seen_adaptive: Vec<String> = Vec::new();
764        for (name, spec) in &live {
765            if !matches!(spec.scale, ScaleSpec::Adaptive { .. }) {
766                continue;
767            }
768            seen_adaptive.push(name.clone());
769
770            // Only (re)register when the adaptive spec is new or changed, so we
771            // don't reset cooldown/idle bookkeeping on every tick.
772            let needs_register = {
773                let specs = self.service_specs.read().await;
774                specs.get(name) != Some(&spec.scale)
775            };
776            if needs_register {
777                // Seed the initial replica count from the live service so the
778                // autoscaler's internal state starts from reality.
779                let initial = u32::try_from(
780                    self.service_manager
781                        .read()
782                        .await
783                        .service_replica_count(name)
784                        .await
785                        .unwrap_or(0),
786                )
787                .unwrap_or(0);
788                self.register_service(name, &spec.scale, initial).await;
789            }
790            // Always refresh the full template so the rolling-restart fallback
791            // and any vertical apply use the current spec (cheap clone).
792            self.set_service_template(name, spec.clone()).await;
793        }
794
795        // Unregister services that vanished from the manager (or stopped being
796        // adaptive) since the last tick.
797        let registered: Vec<String> = {
798            let specs = self.service_specs.read().await;
799            specs.keys().cloned().collect()
800        };
801        for name in registered {
802            if !seen_adaptive.contains(&name) {
803                self.unregister_service(&name).await;
804            }
805        }
806    }
807
808    /// Evaluate and potentially scale all registered services
809    async fn evaluate_all_services(&self) {
810        // Reconcile the registration set against the live ServiceManager before
811        // evaluating, so newly-deployed adaptive services start scaling without
812        // any explicit registration call.
813        self.discover_services().await;
814
815        // Get list of registered services
816        let service_names: Vec<String> = {
817            let specs = self.service_specs.read().await;
818            specs.keys().cloned().collect()
819        };
820
821        for service_name in service_names {
822            // Vertical (right-sizing) pass first so a recommendation applied
823            // this tick is reflected in the next horizontal observation.
824            if let Err(e) = Box::pin(self.evaluate_vertical(&service_name)).await {
825                warn!(
826                    service = %service_name,
827                    error = %e,
828                    "Failed vertical (VPA) evaluation"
829                );
830            }
831
832            // Scale-to-zero idle reaper. Runs before the horizontal pass so a
833            // reaped service short-circuits the (now pointless) replica math.
834            match self.evaluate_idle(&service_name).await {
835                Ok(true) => continue, // Service was reaped to zero this tick.
836                Ok(false) => {}
837                Err(e) => warn!(
838                    service = %service_name,
839                    error = %e,
840                    "Failed scale-to-zero evaluation"
841                ),
842            }
843
844            if let Err(e) = self.evaluate_and_scale(&service_name).await {
845                // Log but don't fail the entire loop
846                warn!(
847                    service = %service_name,
848                    error = %e,
849                    "Failed to evaluate/scale service"
850                );
851            }
852        }
853    }
854
855    /// Scale-to-zero idle reaper (Phase 2).
856    ///
857    /// Bumps `last_active` whenever the service's busiest replica shows
858    /// non-trivial CPU usage, then — for a service with `idle_window: Some(w)`
859    /// and `min == 0` — tears every replica down once it has been idle longer
860    /// than `w` (respecting the scaling cooldown). Returns `Ok(true)` when the
861    /// service was reaped this tick so the caller can skip the horizontal pass.
862    async fn evaluate_idle(&self, service_name: &str) -> Result<bool> {
863        // Per-replica stats drive activity detection. The cgroups stats
864        // provider reports cumulative CPU; the VPA engine turns that into a
865        // per-second rate. Any replica over the idle threshold counts as
866        // activity. (RPS-based activity flows through `mark_active` from the
867        // proxy, so even a zero-CPU request keeps the service warm.)
868        let containers = self
869            .service_manager
870            .read()
871            .await
872            .get_service_containers(service_name)
873            .await;
874        let mut busiest_cpu_millis = 0.0_f64;
875        {
876            let mut vpa = self.vpa.write().await;
877            for id in &containers {
878                let metrics_id = MetricsContainerId {
879                    service: id.service.clone(),
880                    replica: id.replica,
881                };
882                if let Ok(stats) = self.stats_provider.get_stats(&metrics_id).await {
883                    let rate = vpa.engine.observe(&id.to_string(), &stats);
884                    if rate > busiest_cpu_millis {
885                        busiest_cpu_millis = rate;
886                    }
887                }
888            }
889        }
890
891        if busiest_cpu_millis * 1000.0 >= IDLE_CPU_RATE_USEC_PER_SEC {
892            self.last_active
893                .write()
894                .await
895                .insert(service_name.to_string(), Instant::now());
896        }
897
898        // Scale-to-zero only applies when an idle window is configured *and*
899        // the service's floor is zero.
900        let window = {
901            let windows = self.idle_window.read().await;
902            match windows.get(service_name) {
903                Some(w) => *w,
904                None => return Ok(false),
905            }
906        };
907        let min = self
908            .min_replicas
909            .read()
910            .await
911            .get(service_name)
912            .copied()
913            .unwrap_or(1);
914        if min != 0 {
915            return Ok(false);
916        }
917
918        // Nothing to reap if already at zero.
919        let current = self
920            .service_manager
921            .read()
922            .await
923            .service_replica_count(service_name)
924            .await
925            .unwrap_or(0);
926        if current == 0 {
927            return Ok(false);
928        }
929
930        let idle_for = {
931            let last_active = self.last_active.read().await;
932            last_active
933                .get(service_name)
934                .map_or(Duration::ZERO, Instant::elapsed)
935        };
936        if idle_for <= window {
937            return Ok(false);
938        }
939
940        // Respect the scaling cooldown so a flapping service doesn't get
941        // reaped and re-woken in a tight loop.
942        if !self.should_scale(service_name).await {
943            return Ok(false);
944        }
945
946        info!(
947            service = service_name,
948            idle_secs = idle_for.as_secs(),
949            window_secs = window.as_secs(),
950            "Scaling service to zero (idle past window)"
951        );
952        self.service_manager
953            .read()
954            .await
955            .scale_service(service_name, 0)
956            .await?;
957        self.record_scale_action(service_name).await;
958        {
959            let mut autoscaler = self.autoscaler.write().await;
960            if let Err(e) = autoscaler.record_scale_action(service_name, 0) {
961                warn!(
962                    service = service_name,
963                    error = %e,
964                    "Failed to record scale-to-zero in autoscaler"
965                );
966            }
967        }
968        // Forget per-replica history so a re-woken service starts fresh.
969        {
970            let mut vpa = self.vpa.write().await;
971            for id in &containers {
972                vpa.engine.forget(&id.to_string());
973            }
974        }
975        Ok(true)
976    }
977
978    /// Vertical right-sizing pass (Phase 3).
979    ///
980    /// For each running replica, observes usage into the [`VpaEngine`] and, in
981    /// `Auto` mode, applies the recommendation via
982    /// [`Runtime::update_container_resources`] (subject to the deadband). In
983    /// `Recommend` mode it only logs. An `Unsupported` runtime triggers a real
984    /// rolling restart; a `NotFound` container is skipped.
985    async fn evaluate_vertical(&self, service_name: &str) -> Result<()> {
986        let spec = {
987            let specs = self.vertical_specs.read().await;
988            match specs.get(service_name) {
989                Some(s) => s.clone(),
990                None => return Ok(()), // Vertical not enabled for this service.
991            }
992        };
993
994        let containers = self
995            .service_manager
996            .read()
997            .await
998            .get_service_containers(service_name)
999            .await;
1000        if containers.is_empty() {
1001            return Ok(());
1002        }
1003
1004        // Observe every replica and pick the busiest recommendation: sizing to
1005        // the hottest replica avoids starving any single one.
1006        let mut chosen: Option<VpaRecommendation> = None;
1007        {
1008            let mut vpa = self.vpa.write().await;
1009            for id in &containers {
1010                let metrics_id = MetricsContainerId {
1011                    service: id.service.clone(),
1012                    replica: id.replica,
1013                };
1014                match self.stats_provider.get_stats(&metrics_id).await {
1015                    Ok(stats) => {
1016                        vpa.engine.observe(&id.to_string(), &stats);
1017                        if let Some(rec) = vpa.engine.recommend(&id.to_string(), &spec) {
1018                            chosen = Some(match chosen {
1019                                Some(c) => VpaRecommendation {
1020                                    cpu_millis: c.cpu_millis.max(rec.cpu_millis),
1021                                    memory_mib: c.memory_mib.max(rec.memory_mib),
1022                                },
1023                                None => rec,
1024                            });
1025                        }
1026                    }
1027                    Err(e) => debug!(
1028                        service = service_name,
1029                        container = %id,
1030                        error = %e,
1031                        "vertical: no stats for replica; skipping"
1032                    ),
1033                }
1034            }
1035        }
1036
1037        let Some(rec) = chosen else {
1038            // Not enough samples yet to recommend anything.
1039            return Ok(());
1040        };
1041
1042        match spec.mode {
1043            VerticalMode::Off => Ok(()),
1044            VerticalMode::Recommend => {
1045                info!(
1046                    service = service_name,
1047                    cpu_millis = rec.cpu_millis,
1048                    memory_mib = rec.memory_mib,
1049                    "vertical recommendation (recommend mode; not applied)"
1050                );
1051                Ok(())
1052            }
1053            VerticalMode::Auto => {
1054                Box::pin(self.apply_vertical(service_name, rec, &containers)).await
1055            }
1056        }
1057    }
1058
1059    /// Apply a vertical recommendation in `Auto` mode, honoring the deadband
1060    /// and falling back to a rolling restart when the runtime cannot live-update.
1061    async fn apply_vertical(
1062        &self,
1063        service_name: &str,
1064        rec: VpaRecommendation,
1065        containers: &[ContainerId],
1066    ) -> Result<()> {
1067        // Deadband: skip if the recommendation barely moved.
1068        let prev = self
1069            .vpa
1070            .read()
1071            .await
1072            .last_applied
1073            .get(service_name)
1074            .copied();
1075        if !outside_deadband(prev, rec) {
1076            debug!(
1077                service = service_name,
1078                cpu_millis = rec.cpu_millis,
1079                memory_mib = rec.memory_mib,
1080                "vertical recommendation within deadband; skipping"
1081            );
1082            return Ok(());
1083        }
1084
1085        let update = resource_update_for(rec);
1086        let mut needs_rolling_restart = false;
1087
1088        for id in containers {
1089            match self.runtime.update_container_resources(id, &update).await {
1090                Ok(outcome) => {
1091                    if !outcome.warnings.is_empty() {
1092                        debug!(
1093                            service = service_name,
1094                            container = %id,
1095                            warnings = ?outcome.warnings,
1096                            "vertical apply produced warnings"
1097                        );
1098                    }
1099                }
1100                Err(AgentError::Unsupported(reason)) => {
1101                    debug!(
1102                        service = service_name,
1103                        container = %id,
1104                        reason = %reason,
1105                        "runtime cannot live-update resources; will roll the service"
1106                    );
1107                    needs_rolling_restart = true;
1108                    break;
1109                }
1110                Err(AgentError::NotFound { .. }) => {
1111                    debug!(
1112                        service = service_name,
1113                        container = %id,
1114                        "vertical apply: container vanished; skipping"
1115                    );
1116                }
1117                Err(e) => return Err(e),
1118            }
1119        }
1120
1121        if needs_rolling_restart {
1122            Box::pin(self.rolling_restart_with_resources(service_name, rec)).await?;
1123        }
1124
1125        info!(
1126            service = service_name,
1127            cpu_millis = rec.cpu_millis,
1128            memory_mib = rec.memory_mib,
1129            rolled = needs_rolling_restart,
1130            "applied vertical recommendation"
1131        );
1132        self.vpa
1133            .write()
1134            .await
1135            .last_applied
1136            .insert(service_name.to_string(), rec);
1137        Ok(())
1138    }
1139
1140    /// Recreate a service's replicas one at a time, picking up `rec`'s
1141    /// resources, when the runtime cannot live-update a running container.
1142    ///
1143    /// REAL rolling restart: with a registered base [`ServiceSpec`] template
1144    /// (see [`AutoscaleController::set_service_template`]) we mutate its
1145    /// `resources` to the new CPU/memory and recreate each replica
1146    /// `stop → remove → create → start` individually, so the service never
1147    /// drops to zero capacity. Without a template we fall back to a
1148    /// `scale 0 → scale n` bounce (the recreated replicas then inherit the
1149    /// service's stored spec; resources are applied on the next live-update
1150    /// attempt where supported).
1151    async fn rolling_restart_with_resources(
1152        &self,
1153        service_name: &str,
1154        rec: VpaRecommendation,
1155    ) -> Result<()> {
1156        let template = self
1157            .service_templates
1158            .read()
1159            .await
1160            .get(service_name)
1161            .cloned();
1162        let containers = self
1163            .service_manager
1164            .read()
1165            .await
1166            .get_service_containers(service_name)
1167            .await;
1168
1169        let Some(mut spec) = template else {
1170            // No template: bounce the whole service. Recreating replicas is a
1171            // real restart even if we can't re-create them one-by-one with the
1172            // exact runtime resources here.
1173            warn!(
1174                service = service_name,
1175                "rolling restart without a base ServiceSpec template; bouncing the service \
1176                 (call set_service_template to enable one-at-a-time recreation)"
1177            );
1178            let count = u32::try_from(containers.len()).unwrap_or(u32::MAX);
1179            {
1180                let sm = self.service_manager.read().await;
1181                sm.scale_service(service_name, 0).await?;
1182                sm.scale_service(service_name, count).await?;
1183            }
1184            self.record_scale_action(service_name).await;
1185            return Ok(());
1186        };
1187
1188        // Mutate the template's resources to the recommendation. CPU is in
1189        // cores (millis/1000); memory is a Docker-style size string.
1190        let mut resources = spec.resources.clone();
1191        resources.cpu = Some(f64::from(rec.cpu_millis) / 1000.0);
1192        resources.memory = Some(format!("{}Mi", rec.memory_mib));
1193        spec.resources = resources;
1194
1195        // Recreate replicas one at a time so the service keeps serving.
1196        for id in &containers {
1197            info!(
1198                service = service_name,
1199                container = %id,
1200                cpu_millis = rec.cpu_millis,
1201                memory_mib = rec.memory_mib,
1202                "rolling restart: recreating replica with new resources"
1203            );
1204            // Best-effort teardown of the old container, then recreate with the
1205            // resized spec. Errors on an individual replica are logged and the
1206            // roll continues so one bad replica doesn't strand the rest.
1207            if let Err(e) = self
1208                .runtime
1209                .stop_container(id, Duration::from_secs(10))
1210                .await
1211            {
1212                debug!(service = service_name, container = %id, error = %e, "rolling restart: stop failed (continuing)");
1213            }
1214            if let Err(e) = self.runtime.remove_container(id).await {
1215                debug!(service = service_name, container = %id, error = %e, "rolling restart: remove failed (continuing)");
1216            }
1217            self.vpa.write().await.engine.forget(&id.to_string());
1218
1219            if let Err(e) = self.runtime.create_container(id, &spec).await {
1220                error!(service = service_name, container = %id, error = %e, "rolling restart: recreate failed");
1221                return Err(e);
1222            }
1223            if let Err(e) = self.runtime.start_container(id).await {
1224                error!(service = service_name, container = %id, error = %e, "rolling restart: start failed");
1225                return Err(e);
1226            }
1227        }
1228        self.record_scale_action(service_name).await;
1229        Ok(())
1230    }
1231
1232    /// Evaluate a single service and execute scaling if needed
1233    async fn evaluate_and_scale(&self, service_name: &str) -> Result<()> {
1234        // Check cooldown first
1235        if !self.should_scale(service_name).await {
1236            return Ok(());
1237        }
1238
1239        // Collect metrics
1240        let aggregated = match self.metrics.collect(service_name).await {
1241            Ok(m) => m,
1242            Err(e) => {
1243                // Missing metrics is not necessarily an error - the service might
1244                // not have any running containers yet
1245                debug!(
1246                    service = service_name,
1247                    error = %e,
1248                    "No metrics available for service"
1249                );
1250                return Ok(());
1251            }
1252        };
1253
1254        // Make scaling decision
1255        let decision = {
1256            let mut autoscaler = self.autoscaler.write().await;
1257            match autoscaler.evaluate(service_name, &aggregated) {
1258                Ok(d) => d,
1259                Err(e) => {
1260                    debug!(
1261                        service = service_name,
1262                        error = %e,
1263                        "Failed to evaluate scaling"
1264                    );
1265                    return Ok(());
1266                }
1267            }
1268        };
1269
1270        debug!(
1271            service = service_name,
1272            ?decision,
1273            cpu = aggregated.avg_cpu_percent,
1274            memory = aggregated.avg_memory_percent,
1275            instances = aggregated.instance_count,
1276            "Autoscale evaluation"
1277        );
1278
1279        // Execute scaling if needed
1280        if let Some(target) = decision.target_replicas() {
1281            info!(
1282                service = service_name,
1283                target_replicas = target,
1284                decision = ?decision,
1285                "Executing autoscale"
1286            );
1287
1288            // Execute the scaling
1289            if let Err(e) = self
1290                .service_manager
1291                .read()
1292                .await
1293                .scale_service(service_name, target)
1294                .await
1295            {
1296                error!(
1297                    service = service_name,
1298                    target = target,
1299                    error = %e,
1300                    "Failed to scale service"
1301                );
1302                return Err(e);
1303            }
1304
1305            // A non-zero scale event counts as activity so the idle reaper
1306            // doesn't immediately tear back down what the autoscaler grew.
1307            if target > 0 {
1308                self.last_active
1309                    .write()
1310                    .await
1311                    .insert(service_name.to_string(), Instant::now());
1312            }
1313
1314            // Record the scale action
1315            self.record_scale_action(service_name).await;
1316
1317            // Update the autoscaler's internal state
1318            {
1319                let mut autoscaler = self.autoscaler.write().await;
1320                if let Err(e) = autoscaler.record_scale_action(service_name, target) {
1321                    warn!(
1322                        service = service_name,
1323                        error = %e,
1324                        "Failed to record scale action in autoscaler"
1325                    );
1326                }
1327            }
1328        }
1329
1330        Ok(())
1331    }
1332
1333    /// Signal shutdown of the autoscale loop
1334    pub fn shutdown(&self) {
1335        self.shutdown.notify_one();
1336    }
1337
1338    /// Get the current evaluation interval
1339    #[must_use]
1340    pub fn interval(&self) -> Duration {
1341        self.interval
1342    }
1343
1344    /// Get registered service count
1345    pub async fn registered_service_count(&self) -> usize {
1346        let specs = self.service_specs.read().await;
1347        specs.len()
1348    }
1349}
1350
1351/// Check if any service in a deployment has adaptive scaling
1352///
1353/// This is a helper function to determine if the autoscale controller should
1354/// be started for a deployment.
1355#[must_use]
1356#[allow(clippy::implicit_hasher)]
1357pub fn has_adaptive_scaling(services: &HashMap<String, zlayer_spec::ServiceSpec>) -> bool {
1358    services
1359        .values()
1360        .any(|s| matches!(s.scale, ScaleSpec::Adaptive { .. }))
1361}
1362
1363#[cfg(test)]
1364#[allow(deprecated)]
1365mod tests {
1366    use super::*;
1367    use crate::runtime::MockRuntime;
1368    use zlayer_scheduler::metrics::{MockMetricsSource, ServiceMetrics};
1369    use zlayer_spec::ScaleTargets;
1370
1371    fn mock_spec() -> zlayer_spec::ServiceSpec {
1372        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
1373            r"
1374version: v1
1375deployment: test
1376services:
1377  test:
1378    rtype: service
1379    image:
1380      name: test:latest
1381    endpoints:
1382      - name: http
1383        protocol: http
1384        port: 8080
1385    scale:
1386      mode: fixed
1387      replicas: 1
1388",
1389        )
1390        .unwrap()
1391        .services
1392        .remove("test")
1393        .unwrap()
1394    }
1395
1396    fn adaptive_spec(
1397        min: u32,
1398        max: u32,
1399        cpu_target: Option<u8>,
1400        memory_target: Option<u8>,
1401    ) -> ScaleSpec {
1402        ScaleSpec::Adaptive {
1403            min,
1404            max,
1405            cooldown: Some(Duration::from_secs(0)), // No cooldown for tests
1406            targets: ScaleTargets {
1407                cpu: cpu_target,
1408                memory: memory_target,
1409                rps: None,
1410                custom: Vec::new(),
1411                external: Vec::new(),
1412            },
1413            behavior: None,
1414            triggers: Vec::new(),
1415            idle_window: None,
1416            vertical: None,
1417            predictive: None,
1418        }
1419    }
1420
1421    fn raw_stats(cpu_usec: u64, mem_bytes: u64) -> RawContainerStats {
1422        RawContainerStats {
1423            cpu_usage_usec: cpu_usec,
1424            memory_bytes: mem_bytes,
1425            memory_limit: 512 * 1024 * 1024,
1426            timestamp: Instant::now(),
1427        }
1428    }
1429
1430    #[test]
1431    fn test_vpa_percentile_nearest_rank() {
1432        let mut samples = std::collections::VecDeque::new();
1433        for v in [10.0, 20.0, 30.0, 40.0, 50.0] {
1434            samples.push_back(v);
1435        }
1436        assert_eq!(ContainerUsageHistory::percentile(&samples, 100), Some(50.0));
1437        assert_eq!(ContainerUsageHistory::percentile(&samples, 0), Some(10.0));
1438        // p90 nearest-rank of 5 samples => index ceil(0.9*5)-1 = 4 => 50.0
1439        assert_eq!(ContainerUsageHistory::percentile(&samples, 90), Some(50.0));
1440    }
1441
1442    #[test]
1443    fn test_vpa_recommend_clamps_to_bounds() {
1444        let mut engine = VpaEngine::new();
1445        let id = "svc-rep-1";
1446        // Two samples with ZERO CPU delta: the container is idle, so the derived
1447        // rate is 0 millicores regardless of the wall-clock gap between samples
1448        // (deterministic — no flaky timing dependence). Memory holds steady at
1449        // 300 MiB. The recommendation should therefore clamp CPU *up* to the
1450        // 500 floor and memory *down* to the 256 ceiling.
1451        engine.observe(id, &raw_stats(1_000_000, 300 * 1024 * 1024));
1452        std::thread::sleep(Duration::from_millis(5));
1453        engine.observe(id, &raw_stats(1_000_000, 300 * 1024 * 1024));
1454
1455        let spec = VerticalScaleSpec {
1456            mode: VerticalMode::Auto,
1457            min_cpu_millis: Some(500),
1458            max_cpu_millis: Some(2000),
1459            min_memory_mib: Some(128),
1460            max_memory_mib: Some(256),
1461            percentile: 90,
1462        };
1463        let rec = engine.recommend(id, &spec).expect("recommendation");
1464        // CPU clamped up to the 500 floor (idle => 0 raw); memory clamped down to
1465        // the 256 ceiling (300 MiB raw).
1466        assert_eq!(rec.cpu_millis, 500);
1467        assert_eq!(rec.memory_mib, 256);
1468    }
1469
1470    #[test]
1471    fn test_deadband() {
1472        let base = VpaRecommendation {
1473            cpu_millis: 1000,
1474            memory_mib: 512,
1475        };
1476        // No previous => always apply.
1477        assert!(outside_deadband(None, base));
1478        // 5% change => within deadband.
1479        assert!(!outside_deadband(
1480            Some(base),
1481            VpaRecommendation {
1482                cpu_millis: 1050,
1483                memory_mib: 512
1484            }
1485        ));
1486        // 20% change => outside.
1487        assert!(outside_deadband(
1488            Some(base),
1489            VpaRecommendation {
1490                cpu_millis: 1200,
1491                memory_mib: 512
1492            }
1493        ));
1494    }
1495
1496    #[test]
1497    fn test_resource_update_for() {
1498        let rec = VpaRecommendation {
1499            cpu_millis: 1500,
1500            memory_mib: 256,
1501        };
1502        let update = resource_update_for(rec);
1503        assert_eq!(update.cpu_period, Some(100_000));
1504        // 1500 millicores over a 100ms period => 150ms quota.
1505        assert_eq!(update.cpu_quota, Some(150_000));
1506        assert_eq!(update.memory, Some(256 * 1024 * 1024));
1507    }
1508
1509    /// Wrap a freshly-built `ServiceManager` in the `Arc<RwLock<_>>` shape the
1510    /// controller now consumes (mirrors the daemon's post-`try_unwrap` handle).
1511    fn locked(runtime: &Arc<dyn Runtime + Send + Sync>) -> Arc<RwLock<ServiceManager>> {
1512        Arc::new(RwLock::new(ServiceManager::new(runtime.clone())))
1513    }
1514
1515    #[tokio::test]
1516    async fn test_autoscale_controller_creation() {
1517        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1518        let manager = locked(&runtime);
1519
1520        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1521
1522        assert_eq!(controller.interval(), Duration::from_secs(10));
1523        assert_eq!(controller.registered_service_count().await, 0);
1524    }
1525
1526    #[tokio::test]
1527    async fn test_register_service() {
1528        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1529        let manager = locked(&runtime);
1530
1531        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1532
1533        // Register an adaptive service
1534        let spec = adaptive_spec(1, 10, Some(70), None);
1535        controller.register_service("api", &spec, 2).await;
1536
1537        assert!(controller.is_registered("api").await);
1538        assert_eq!(controller.registered_service_count().await, 1);
1539    }
1540
1541    #[tokio::test]
1542    async fn test_register_fixed_service_ignored() {
1543        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1544        let manager = locked(&runtime);
1545
1546        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1547
1548        // Try to register a fixed service - should be ignored
1549        let spec = ScaleSpec::Fixed { replicas: 3 };
1550        controller.register_service("api", &spec, 3).await;
1551
1552        assert!(!controller.is_registered("api").await);
1553        assert_eq!(controller.registered_service_count().await, 0);
1554    }
1555
1556    #[tokio::test]
1557    async fn test_unregister_service() {
1558        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1559        let manager = locked(&runtime);
1560
1561        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1562
1563        let spec = adaptive_spec(1, 10, Some(70), None);
1564        controller.register_service("api", &spec, 2).await;
1565
1566        assert!(controller.is_registered("api").await);
1567
1568        controller.unregister_service("api").await;
1569
1570        assert!(!controller.is_registered("api").await);
1571        assert_eq!(controller.registered_service_count().await, 0);
1572    }
1573
1574    #[tokio::test]
1575    async fn test_has_adaptive_scaling() {
1576        let mut services = HashMap::new();
1577
1578        // Add a fixed service
1579        let mut fixed_spec = mock_spec();
1580        fixed_spec.scale = ScaleSpec::Fixed { replicas: 3 };
1581        services.insert("web".to_string(), fixed_spec);
1582
1583        // No adaptive services yet
1584        assert!(!has_adaptive_scaling(&services));
1585
1586        // Add an adaptive service
1587        let mut adaptive = mock_spec();
1588        adaptive.scale = adaptive_spec(1, 10, Some(70), None);
1589        services.insert("api".to_string(), adaptive);
1590
1591        // Now has adaptive
1592        assert!(has_adaptive_scaling(&services));
1593    }
1594
1595    #[tokio::test]
1596    async fn test_autoscale_controller_with_mock_metrics() {
1597        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1598        let manager = locked(&runtime);
1599
1600        // Create mock metrics source
1601        let mock = Arc::new(MockMetricsSource::new());
1602
1603        // Set high CPU metrics
1604        mock.set_metrics(
1605            "api",
1606            vec![
1607                ServiceMetrics {
1608                    cpu_percent: 85.0,
1609                    memory_bytes: 100 * 1024 * 1024,
1610                    memory_limit: 512 * 1024 * 1024,
1611                    rps: None,
1612                    timestamp: Some(Instant::now()),
1613                    ..Default::default()
1614                },
1615                ServiceMetrics {
1616                    cpu_percent: 90.0,
1617                    memory_bytes: 150 * 1024 * 1024,
1618                    memory_limit: 512 * 1024 * 1024,
1619                    rps: None,
1620                    timestamp: Some(Instant::now()),
1621                    ..Default::default()
1622                },
1623            ],
1624        )
1625        .await;
1626
1627        // Create controller with custom metrics
1628        let mut metrics = MetricsCollector::new();
1629        metrics.add_source(mock);
1630
1631        let controller = AutoscaleController::with_custom_metrics(
1632            manager.clone(),
1633            runtime,
1634            metrics,
1635            Duration::from_secs(10),
1636        );
1637
1638        // Register service
1639        Box::pin(
1640            manager
1641                .read()
1642                .await
1643                .upsert_service("api".to_string(), mock_spec()),
1644        )
1645        .await
1646        .unwrap();
1647        manager.read().await.scale_service("api", 2).await.unwrap();
1648
1649        let spec = adaptive_spec(1, 10, Some(70), None);
1650        controller.register_service("api", &spec, 2).await;
1651
1652        // Evaluate - should want to scale up due to high CPU
1653        controller.evaluate_and_scale("api").await.unwrap();
1654
1655        // Check that scale happened (from 2 to 3)
1656        let count = manager
1657            .read()
1658            .await
1659            .service_replica_count("api")
1660            .await
1661            .unwrap();
1662        assert_eq!(count, 3);
1663    }
1664
1665    #[tokio::test]
1666    async fn test_autoscale_controller_cooldown() {
1667        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1668        let manager = locked(&runtime);
1669
1670        let controller = AutoscaleController::new(manager, runtime, Duration::from_secs(10));
1671
1672        // Use a spec with 1 second cooldown
1673        let spec = ScaleSpec::Adaptive {
1674            min: 1,
1675            max: 10,
1676            cooldown: Some(Duration::from_secs(60)), // Long cooldown
1677            targets: ScaleTargets {
1678                cpu: Some(70),
1679                memory: None,
1680                rps: None,
1681                custom: Vec::new(),
1682                external: Vec::new(),
1683            },
1684            behavior: None,
1685            triggers: Vec::new(),
1686            idle_window: None,
1687            vertical: None,
1688            predictive: None,
1689        };
1690
1691        controller.register_service("api", &spec, 2).await;
1692
1693        // Initially should be able to scale
1694        assert!(controller.should_scale("api").await);
1695
1696        // Record a scale action
1697        controller.record_scale_action("api").await;
1698
1699        // Now should be in cooldown
1700        assert!(!controller.should_scale("api").await);
1701    }
1702
1703    #[tokio::test]
1704    async fn test_scale_to_zero_after_idle_window() {
1705        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1706        let manager = locked(&runtime);
1707
1708        let controller =
1709            AutoscaleController::new(manager.clone(), runtime, Duration::from_secs(10));
1710
1711        // Stand up a service with two replicas.
1712        Box::pin(
1713            manager
1714                .read()
1715                .await
1716                .upsert_service("api".to_string(), mock_spec()),
1717        )
1718        .await
1719        .unwrap();
1720        manager.read().await.scale_service("api", 2).await.unwrap();
1721
1722        // min == 0 + a tiny idle window so scale-to-zero is eligible.
1723        let spec = ScaleSpec::Adaptive {
1724            min: 0,
1725            max: 10,
1726            cooldown: Some(Duration::from_secs(0)),
1727            targets: ScaleTargets::default(),
1728            behavior: None,
1729            triggers: Vec::new(),
1730            idle_window: Some(Duration::from_millis(10)),
1731            vertical: None,
1732            predictive: None,
1733        };
1734        controller.register_service("api", &spec, 2).await;
1735
1736        // Force the idle clock well into the past so the window has elapsed.
1737        controller.last_active.write().await.insert(
1738            "api".to_string(),
1739            Instant::now().checked_sub(Duration::from_secs(60)).unwrap(),
1740        );
1741
1742        // First eval reaps to zero (MockRuntime stats are low CPU, so no
1743        // activity bump fights the reaper).
1744        let reaped = controller.evaluate_idle("api").await.unwrap();
1745        assert!(reaped, "service should have been reaped to zero");
1746        assert_eq!(
1747            manager
1748                .read()
1749                .await
1750                .service_replica_count("api")
1751                .await
1752                .unwrap(),
1753            0
1754        );
1755    }
1756
1757    #[tokio::test]
1758    async fn test_mark_active_resets_idle_clock() {
1759        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1760        let manager = locked(&runtime);
1761
1762        let controller =
1763            AutoscaleController::new(manager.clone(), runtime, Duration::from_secs(10));
1764
1765        Box::pin(
1766            manager
1767                .read()
1768                .await
1769                .upsert_service("api".to_string(), mock_spec()),
1770        )
1771        .await
1772        .unwrap();
1773        manager.read().await.scale_service("api", 1).await.unwrap();
1774
1775        let spec = ScaleSpec::Adaptive {
1776            min: 0,
1777            max: 10,
1778            cooldown: Some(Duration::from_secs(0)),
1779            targets: ScaleTargets::default(),
1780            behavior: None,
1781            triggers: Vec::new(),
1782            idle_window: Some(Duration::from_secs(300)),
1783            vertical: None,
1784            predictive: None,
1785        };
1786        controller.register_service("api", &spec, 1).await;
1787
1788        // Push the clock back, then mark active to reset it.
1789        controller.last_active.write().await.insert(
1790            "api".to_string(),
1791            Instant::now()
1792                .checked_sub(Duration::from_secs(600))
1793                .unwrap(),
1794        );
1795        controller.mark_active_async("api").await;
1796
1797        // Not idle anymore => not reaped.
1798        let reaped = controller.evaluate_idle("api").await.unwrap();
1799        assert!(!reaped, "marked-active service must not be reaped");
1800        assert_eq!(
1801            manager
1802                .read()
1803                .await
1804                .service_replica_count("api")
1805                .await
1806                .unwrap(),
1807            1
1808        );
1809    }
1810
1811    #[tokio::test]
1812    async fn test_no_scale_to_zero_when_min_nonzero() {
1813        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1814        let manager = locked(&runtime);
1815
1816        let controller =
1817            AutoscaleController::new(manager.clone(), runtime, Duration::from_secs(10));
1818
1819        Box::pin(
1820            manager
1821                .read()
1822                .await
1823                .upsert_service("api".to_string(), mock_spec()),
1824        )
1825        .await
1826        .unwrap();
1827        manager.read().await.scale_service("api", 2).await.unwrap();
1828
1829        // min == 1 disables scale-to-zero even with an elapsed idle window.
1830        let spec = ScaleSpec::Adaptive {
1831            min: 1,
1832            max: 10,
1833            cooldown: Some(Duration::from_secs(0)),
1834            targets: ScaleTargets::default(),
1835            behavior: None,
1836            triggers: Vec::new(),
1837            idle_window: Some(Duration::from_millis(1)),
1838            vertical: None,
1839            predictive: None,
1840        };
1841        controller.register_service("api", &spec, 2).await;
1842        controller.last_active.write().await.insert(
1843            "api".to_string(),
1844            Instant::now().checked_sub(Duration::from_secs(60)).unwrap(),
1845        );
1846
1847        let reaped = controller.evaluate_idle("api").await.unwrap();
1848        assert!(!reaped, "min>0 must never scale to zero");
1849        assert_eq!(
1850            manager
1851                .read()
1852                .await
1853                .service_replica_count("api")
1854                .await
1855                .unwrap(),
1856            2
1857        );
1858    }
1859
1860    #[tokio::test]
1861    async fn test_autoscale_controller_shutdown() {
1862        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1863        let manager = locked(&runtime);
1864
1865        let controller = Arc::new(AutoscaleController::new(
1866            manager,
1867            runtime,
1868            Duration::from_millis(100), // Fast interval for test
1869        ));
1870
1871        let controller_clone = controller.clone();
1872
1873        // Spawn the loop (boxed: run_loop's future is large — clippy::large_future).
1874        let handle = tokio::spawn(async move { Box::pin(controller_clone.run_loop()).await });
1875
1876        // Let it run briefly
1877        tokio::time::sleep(Duration::from_millis(50)).await;
1878
1879        // Signal shutdown
1880        controller.shutdown();
1881
1882        // Should complete without error
1883        let result = handle.await.unwrap();
1884        assert!(result.is_ok());
1885    }
1886}