Skip to main content

hyperi_rustlib/cli/
runtime.rs

1// Project:   hyperi-rustlib
2// File:      src/cli/runtime.rs
3// Purpose:   ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. Apps receive it fully wired -- eliminates
13//! ~50 lines of identical boilerplate per DFE app.
14//!
15//! ## What's included (always)
16//!
17//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
18//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
19//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
20//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
21//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
22//!
23//! ## What's included (when features enabled)
24//!
25//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
26//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
27//!
28//! ## What stays app-specific
29//!
30//! - Readiness check criteria (each app defines "ready" differently)
31//! - Config hot-reload (optional, app-specific reload logic)
32//! - Pipeline creation (100% domain-specific)
33//! - DLQ setup (varies per app)
34//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
35
36use std::sync::Arc;
37
38use tokio_util::sync::CancellationToken;
39
40use crate::env::{RuntimeContext, runtime_context};
41#[cfg(feature = "memory")]
42use crate::memory::{MemoryGuard, MemoryGuardConfig};
43use crate::metrics::MetricsManager;
44
45use super::error::CliError;
46
47/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
48///
49/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
50///
51/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
52/// are automatically disabled. On K8s, they're automatically enabled.
53pub struct ServiceRuntime {
54    /// Metrics manager -- already started, serving endpoints.
55    /// Use for registering app-specific metrics and metric groups.
56    pub metrics: MetricsManager,
57
58    /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
59    pub dfe: Arc<crate::metrics::DfeMetrics>,
60
61    /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
62    /// Auto-detected from env prefix + cgroup limits.
63    #[cfg(feature = "memory")]
64    pub memory_guard: Arc<MemoryGuard>,
65
66    /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
67    /// Clone and pass to your pipeline loops.
68    pub shutdown: CancellationToken,
69
70    /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
71    pub context: &'static RuntimeContext,
72
73    /// Adaptive worker pool for parallel batch processing (`worker` feature).
74    /// `None` if the `worker` feature is not enabled or config fails.
75    #[cfg(feature = "worker-pool")]
76    pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
77
78    /// Batch processing engine with SIMD parsing and pre-route filtering
79    /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
80    /// or worker pool creation failed.
81    #[cfg(feature = "worker-batch")]
82    pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
83
84    /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
85    /// `None` if the `scaling` feature is not enabled.
86    #[cfg(feature = "scaling")]
87    pub scaling: Option<Arc<crate::ScalingPressure>>,
88
89    /// Horizontal scaling-pressure ENGINE (CEL over local, correlated metrics).
90    /// Emits `{ns}_scaling_pressure{name}` per configured pressure plus the
91    /// gratis compound `{ns}_transport_{inbound,outbound}_pressure_ratio` and
92    /// `{ns}_scaling_circuit_open`. `None` unless both `scaling` + `expression`
93    /// are enabled. Runs its own periodic tick (CPU sampled internally).
94    #[cfg(all(feature = "scaling", feature = "expression"))]
95    pub scaling_engine: Option<Arc<crate::scaling::ScalingEngine>>,
96
97    /// Lock-free cell for pushing per-pod transport scaling signals (kafka
98    /// assigned-lag, in-flight, shed rate, circuit, ...) that the
99    /// `scaling_engine` reads each tick. Update it from
100    /// your receive/send loops; CPU is sampled by the engine itself. When no
101    /// signals are pushed, the smart default reduces to CPU-only (ACR F2).
102    #[cfg(feature = "scaling")]
103    pub scaling_signals: Arc<crate::scaling::ScalingSignalsCell>,
104
105    /// Self-regulation governor (`governor` feature). Default-ON, opt-out via
106    /// `self_regulation.enabled = false`. `None` when disabled -- nothing is
107    /// constructed and the data path is byte-identical to pre-governor.
108    ///
109    /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
110    /// receive transports' inbound gate / `with_pressure` hooks. The
111    /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
112    /// the [`batch_engine`](Self::batch_engine) governed run path.
113    #[cfg(feature = "governor")]
114    pub governor: Option<crate::SelfRegulationGovernor>,
115}
116
117impl ServiceRuntime {
118    /// Build the service runtime from app configuration.
119    ///
120    /// This is called by `run_app()` -- apps don't call it directly.
121    ///
122    /// # Errors
123    ///
124    /// Returns `CliError` if the metrics server fails to start.
125    pub(crate) async fn build(
126        app_name: &str,
127        env_prefix: &str,
128        metrics_addr: &str,
129        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
130        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
131        #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
132    ) -> Result<Self, CliError> {
133        let ctx = runtime_context();
134
135        // --- Metrics ---
136        let mut metrics = MetricsManager::new(app_name);
137        let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
138
139        // App info metric (version, commit, service name)
140        #[cfg(feature = "metrics-dfe")]
141        {
142            let _app_metrics =
143                crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
144        }
145
146        // --- Memory guard ---
147        // Cascade `memory:` section is the base, flat {PREFIX}_MEMORY_* env
148        // overlaid on top. `from_env` alone (pre-2.8.12) read only the flat
149        // env and silently ignored the YAML `memory:` section.
150        #[cfg(feature = "memory")]
151        let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_cascade_with_env(
152            env_prefix,
153        )));
154
155        // --- Self-regulation governor (default-ON, opt-out) ---
156        //
157        // Constructed HERE -- before the worker pool, batch engine, and the
158        // transports the app builds in run_service() -- so the shared pressure
159        // and byte budget can be threaded into all of them. When
160        // `self_regulation.enabled = false`, `build` returns None and nothing
161        // is constructed: every downstream Option stays None and the data path
162        // is byte-identical to pre-governor behaviour.
163        #[cfg(feature = "governor")]
164        let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
165
166        // --- Scaling pressure ---
167        #[cfg(feature = "scaling")]
168        let scaling = {
169            let config = crate::ScalingPressureConfig::from_cascade();
170            let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
171            metrics.set_scaling_pressure(Arc::clone(&pressure));
172            Some(pressure)
173        };
174
175        // --- Worker pool ---
176        #[cfg(feature = "worker-pool")]
177        let worker_pool = {
178            match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
179                Ok(pool) => {
180                    let pool = Arc::new(pool);
181                    pool.register_metrics(&metrics);
182                    #[cfg(feature = "memory")]
183                    pool.set_memory_guard(Arc::clone(&memory_guard));
184                    #[cfg(feature = "scaling")]
185                    if let Some(ref sp) = scaling {
186                        pool.set_scaling_pressure(Arc::clone(sp));
187                    }
188                    tracing::info!(
189                        max_threads = pool.max_threads(),
190                        "Adaptive worker pool enabled"
191                    );
192                    Some(pool)
193                }
194                Err(e) => {
195                    tracing::warn!(
196                        error = %e,
197                        "Worker pool not configured, falling back to sequential"
198                    );
199                    None
200                }
201            }
202        };
203
204        // --- Batch engine (worker-batch tier only) ---
205        #[cfg(feature = "worker-batch")]
206        let batch_engine = {
207            if let Some(ref pool) = worker_pool {
208                let config =
209                    crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
210                        .unwrap_or_default();
211                let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
212                engine.auto_wire(
213                    &metrics,
214                    #[cfg(feature = "memory")]
215                    Some(&memory_guard),
216                );
217                // Wire the governor's byte-budget lever so the engine's governed
218                // run path streams in budget-sized sub-blocks. None (governor
219                // off) leaves the engine on the whole-batch loop.
220                #[cfg(feature = "governor")]
221                if let Some(ref gov) = governor {
222                    engine.set_byte_budget(gov.budget());
223                }
224                Some(Arc::new(engine))
225            } else {
226                None
227            }
228        };
229
230        // --- Shutdown ---
231        let shutdown = crate::shutdown::install_signal_handler();
232
233        // Start worker pool scaling loop after shutdown token exists
234        #[cfg(feature = "worker-pool")]
235        if let Some(ref pool) = worker_pool {
236            pool.start_scaling_loop(shutdown.clone());
237        }
238
239        // --- Horizontal scaling-pressure engine (CEL over local metrics) ---
240        #[cfg(feature = "scaling")]
241        let scaling_signals = Arc::new(crate::scaling::ScalingSignalsCell::new());
242
243        #[cfg(all(feature = "scaling", feature = "expression"))]
244        let scaling_engine = {
245            let sp_cfg = crate::scaling::ScalingEngineConfig::from_cascade();
246            let inbound = sp_cfg.transport.inbound.as_deref().map_or(
247                crate::scaling::ScalingTransport::Other,
248                crate::scaling::ScalingTransport::from_label,
249            );
250            let outbound = sp_cfg.transport.outbound.as_deref().map_or(
251                crate::scaling::ScalingTransport::Other,
252                crate::scaling::ScalingTransport::from_label,
253            );
254            let (engine, errors) =
255                crate::scaling::ScalingEngine::new(app_name, &sp_cfg, inbound, outbound);
256            for e in &errors {
257                tracing::error!(target: "scaling", "{e}");
258            }
259            let engine = Arc::new(engine);
260            if engine.is_enabled() {
261                tokio::spawn(run_scaling_pressure_loop(
262                    Arc::clone(&engine),
263                    Arc::clone(&scaling_signals),
264                    sp_cfg.interval_secs,
265                    #[cfg(feature = "memory")]
266                    Arc::clone(&memory_guard),
267                    shutdown.clone(),
268                ));
269            }
270            Some(engine)
271        };
272
273        // --- Start metrics server ---
274        if let Err(e) = metrics.start_server(metrics_addr).await {
275            tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
276        }
277
278        // --- Version check (fire-and-forget) ---
279        #[cfg(feature = "version-check")]
280        {
281            crate::VersionCheck::new(crate::VersionCheckConfig {
282                product: app_name.to_string(),
283                current_version: version.to_string(),
284                ..Default::default()
285            })
286            .check_on_startup();
287        }
288
289        // Turns the previously-silent "from_cascade defaulted everything"
290        // failure into one observable startup line.
291        #[cfg(feature = "config")]
292        log_cascade_section_summary();
293
294        // Log runtime context
295        tracing::info!(
296            environment = %ctx.environment,
297            pod_name = ?ctx.pod_name,
298            namespace = ?ctx.namespace,
299            "Service runtime initialised"
300        );
301
302        Ok(Self {
303            metrics,
304            dfe,
305            #[cfg(feature = "memory")]
306            memory_guard,
307            shutdown,
308            context: ctx,
309            #[cfg(feature = "worker-pool")]
310            worker_pool,
311            #[cfg(feature = "worker-batch")]
312            batch_engine,
313            #[cfg(feature = "scaling")]
314            scaling,
315            #[cfg(all(feature = "scaling", feature = "expression"))]
316            scaling_engine,
317            #[cfg(feature = "scaling")]
318            scaling_signals,
319            #[cfg(feature = "governor")]
320            governor,
321        })
322    }
323
324    /// Set the readiness check callback.
325    ///
326    /// Each app defines its own readiness criteria. Call this in `run_service()`
327    /// once you know what "ready" means for your app.
328    pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
329        self.metrics.set_readiness_check(check);
330    }
331
332    /// Return the batch processing engine, if the `worker-batch` feature is
333    /// enabled and the worker pool was successfully created.
334    #[cfg(feature = "worker-batch")]
335    #[must_use]
336    pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
337        self.batch_engine.as_ref()
338    }
339
340    /// Build a governed receive transport from config in ONE call
341    /// (`governor` + `transport` features).
342    ///
343    /// Reads the transport config at `key` and threads the runtime's
344    /// [`governor`](Self::governor) pressure into the receiver's inbound brake
345    /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so apps
346    /// skip the `gate_actuator -> InboundGate -> with_inbound_gate` dance.
347    ///
348    /// When the governor is disabled (`self_regulation.enabled = false`,
349    /// [`governor`](Self::governor) is `None`) this falls back to the plain
350    /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
351    /// -- data path stays byte-identical to pre-governor.
352    ///
353    /// # Errors
354    ///
355    /// Returns the underlying transport error if the config is missing/invalid
356    /// or the backend fails to construct.
357    #[cfg(all(feature = "governor", feature = "transport"))]
358    pub async fn governed_receiver(
359        &self,
360        key: &str,
361    ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
362        use crate::transport::factory::AnyReceiver;
363        match self.governor {
364            Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
365            None => AnyReceiver::from_config(key).await,
366        }
367    }
368}
369
370/// Emit one startup line summarising which platform config sections were found
371/// in the cascade vs defaulted. Cheap (key-presence checks, no deserialisation).
372/// This is the observable counterpart to the silent pre-2.8.11 failure where
373/// `from_cascade` defaulted everything because the cascade was never populated.
374#[cfg(feature = "config")]
375fn log_cascade_section_summary() {
376    let cfg = crate::config::try_get();
377    let present = |key: &str| cfg.is_some_and(|c| c.contains(key));
378    tracing::info!(
379        cascade_initialised = cfg.is_some(),
380        self_regulation = present("self_regulation"),
381        worker_pool = present("worker_pool"),
382        batch_processing = present("batch_processing"),
383        scaling = present("scaling"),
384        expression = present("expression"),
385        "Config cascade sections (true = found in config, false = using defaults)"
386    );
387}
388
389/// Periodic scaling-pressure tick: sample CPU (rate of the cumulative counter
390/// over the wall window / cores), read the pushed transport signals, and let the
391/// engine evaluate + publish its gauges. Off the data hot-path (interval-driven).
392#[cfg(all(feature = "scaling", feature = "expression"))]
393async fn run_scaling_pressure_loop(
394    engine: Arc<crate::scaling::ScalingEngine>,
395    signals: Arc<crate::scaling::ScalingSignalsCell>,
396    interval_secs: u64,
397    #[cfg(feature = "memory")] memory_guard: Arc<MemoryGuard>,
398    shutdown: CancellationToken,
399) {
400    use std::time::{Duration, Instant};
401
402    // CPU utilisation denominator: the cgroup CPU limit, else the visible core
403    // count. Never 0.
404    let cores = crate::metrics::cpu_limit_cores()
405        .or_else(|| {
406            std::thread::available_parallelism()
407                .ok()
408                .map(|n| n.get() as f64)
409        })
410        .filter(|c| *c > 0.0)
411        .unwrap_or(1.0);
412
413    let mut last_cpu = crate::metrics::cumulative_cpu_seconds();
414    let mut last_at = Instant::now();
415    let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs.max(1)));
416    // tokio's first interval tick fires immediately -- consume it so the first
417    // real sample below spans a full interval (no divide-by-near-zero CPU spike).
418    ticker.tick().await;
419
420    loop {
421        tokio::select! {
422            () = shutdown.cancelled() => break,
423            _ = ticker.tick() => {
424                let now_cpu = crate::metrics::cumulative_cpu_seconds();
425                let now_at = Instant::now();
426                let cpu_ratio = match (last_cpu, now_cpu) {
427                    (Some(prev), Some(cur)) => {
428                        let elapsed = now_at.duration_since(last_at).as_secs_f64().max(1e-3);
429                        // (cur - prev) can go negative on a counter reset -> floor 0.
430                        ((cur - prev).max(0.0) / elapsed) / cores
431                    }
432                    _ => 0.0,
433                };
434                last_cpu = now_cpu;
435                last_at = now_at;
436
437                #[cfg(feature = "memory")]
438                let memory_ratio = memory_guard.pressure_ratio();
439                #[cfg(not(feature = "memory"))]
440                let memory_ratio = 0.0;
441
442                engine.tick(&signals.snapshot(), cpu_ratio, memory_ratio);
443            }
444        }
445    }
446
447    tracing::info!(target: "scaling", "Scaling-pressure loop shutting down");
448}