Skip to main content

hyperi_rustlib/cli/
runtime.rs

1// Project:   hyperi-rustlib
2// File:      src/cli/runtime.rs
3// Purpose:   ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. Apps receive it fully wired -- eliminates
13//! ~50 lines of identical boilerplate per DFE app.
14//!
15//! ## What's included (always)
16//!
17//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
18//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
19//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
20//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
21//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
22//!
23//! ## What's included (when features enabled)
24//!
25//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
26//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
27//!
28//! ## What stays app-specific
29//!
30//! - Readiness check criteria (each app defines "ready" differently)
31//! - Config hot-reload (optional, app-specific reload logic)
32//! - Pipeline creation (100% domain-specific)
33//! - DLQ setup (varies per app)
34//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
35
36use std::sync::Arc;
37
38use tokio_util::sync::CancellationToken;
39
40use crate::env::{RuntimeContext, runtime_context};
41#[cfg(feature = "memory")]
42use crate::memory::{MemoryGuard, MemoryGuardConfig};
43use crate::metrics::MetricsManager;
44
45use super::error::CliError;
46
47/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
48///
49/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
50///
51/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
52/// are automatically disabled. On K8s, they're automatically enabled.
53pub struct ServiceRuntime {
54    /// Metrics manager -- already started, serving endpoints.
55    /// Use for registering app-specific metrics and metric groups.
56    pub metrics: MetricsManager,
57
58    /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
59    pub dfe: Arc<crate::metrics::DfeMetrics>,
60
61    /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
62    /// Auto-detected from env prefix + cgroup limits.
63    #[cfg(feature = "memory")]
64    pub memory_guard: Arc<MemoryGuard>,
65
66    /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
67    /// Clone and pass to your pipeline loops.
68    pub shutdown: CancellationToken,
69
70    /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
71    pub context: &'static RuntimeContext,
72
73    /// Adaptive worker pool for parallel batch processing (`worker` feature).
74    /// `None` if the `worker` feature is not enabled or config fails.
75    #[cfg(feature = "worker-pool")]
76    pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
77
78    /// Batch processing engine with SIMD parsing and pre-route filtering
79    /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
80    /// or worker pool creation failed.
81    #[cfg(feature = "worker-batch")]
82    pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
83
84    /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
85    /// `None` if the `scaling` feature is not enabled.
86    #[cfg(feature = "scaling")]
87    pub scaling: Option<Arc<crate::ScalingPressure>>,
88
89    /// Horizontal scaling-pressure ENGINE (CEL over local, correlated metrics).
90    /// Emits `{ns}_scaling_pressure{name}` per configured pressure plus the
91    /// gratis compound `{ns}_transport_{inbound,outbound}_pressure_ratio` and
92    /// `{ns}_scaling_circuit_open`. `None` unless both `scaling` + `expression`
93    /// are enabled. Runs its own periodic tick (CPU sampled internally).
94    #[cfg(all(feature = "scaling", feature = "expression"))]
95    pub scaling_engine: Option<Arc<crate::scaling::ScalingEngine>>,
96
97    /// Lock-free cell for pushing per-pod transport scaling signals (kafka
98    /// assigned-lag, in-flight, shed rate, circuit, ...) that the
99    /// `scaling_engine` reads each tick. Update it from
100    /// your receive/send loops; CPU is sampled by the engine itself. When no
101    /// signals are pushed, the smart default reduces to CPU-only (ACR F2).
102    #[cfg(feature = "scaling")]
103    pub scaling_signals: Arc<crate::scaling::ScalingSignalsCell>,
104
105    /// Self-regulation governor (`governor` feature). Default-ON, opt-out via
106    /// `self_regulation.enabled = false`. `None` when disabled -- nothing is
107    /// constructed and the data path is byte-identical to pre-governor.
108    ///
109    /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
110    /// receive transports' inbound gate / `with_pressure` hooks. The
111    /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
112    /// the [`batch_engine`](Self::batch_engine) governed run path.
113    #[cfg(feature = "governor")]
114    pub governor: Option<crate::SelfRegulationGovernor>,
115}
116
117impl ServiceRuntime {
118    /// Build the service runtime from app configuration.
119    ///
120    /// This is called by `run_app()` -- apps don't call it directly.
121    ///
122    /// # Errors
123    ///
124    /// Returns `CliError` if the metrics server fails to start.
125    pub(crate) async fn build(
126        app_name: &str,
127        env_prefix: &str,
128        metrics_addr: &str,
129        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
130        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
131        #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
132    ) -> Result<Self, CliError> {
133        let ctx = runtime_context();
134
135        // --- Metrics ---
136        let mut metrics = MetricsManager::new(app_name);
137        let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
138
139        // App info metric (version, commit, service name)
140        #[cfg(feature = "metrics-dfe")]
141        {
142            let _app_metrics =
143                crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
144        }
145
146        // --- Memory guard ---
147        #[cfg(feature = "memory")]
148        let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
149
150        // --- Self-regulation governor (default-ON, opt-out) ---
151        //
152        // Constructed HERE -- before the worker pool, batch engine, and the
153        // transports the app builds in run_service() -- so the shared pressure
154        // and byte budget can be threaded into all of them. When
155        // `self_regulation.enabled = false`, `build` returns None and nothing
156        // is constructed: every downstream Option stays None and the data path
157        // is byte-identical to pre-governor behaviour.
158        #[cfg(feature = "governor")]
159        let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
160
161        // --- Scaling pressure ---
162        #[cfg(feature = "scaling")]
163        let scaling = {
164            let config = crate::ScalingPressureConfig::from_cascade();
165            let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
166            metrics.set_scaling_pressure(Arc::clone(&pressure));
167            Some(pressure)
168        };
169
170        // --- Worker pool ---
171        #[cfg(feature = "worker-pool")]
172        let worker_pool = {
173            match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
174                Ok(pool) => {
175                    let pool = Arc::new(pool);
176                    pool.register_metrics(&metrics);
177                    #[cfg(feature = "memory")]
178                    pool.set_memory_guard(Arc::clone(&memory_guard));
179                    #[cfg(feature = "scaling")]
180                    if let Some(ref sp) = scaling {
181                        pool.set_scaling_pressure(Arc::clone(sp));
182                    }
183                    tracing::info!(
184                        max_threads = pool.max_threads(),
185                        "Adaptive worker pool enabled"
186                    );
187                    Some(pool)
188                }
189                Err(e) => {
190                    tracing::warn!(
191                        error = %e,
192                        "Worker pool not configured, falling back to sequential"
193                    );
194                    None
195                }
196            }
197        };
198
199        // --- Batch engine (worker-batch tier only) ---
200        #[cfg(feature = "worker-batch")]
201        let batch_engine = {
202            if let Some(ref pool) = worker_pool {
203                let config =
204                    crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
205                        .unwrap_or_default();
206                let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
207                engine.auto_wire(
208                    &metrics,
209                    #[cfg(feature = "memory")]
210                    Some(&memory_guard),
211                );
212                // Wire the governor's byte-budget lever so the engine's governed
213                // run path streams in budget-sized sub-blocks. None (governor
214                // off) leaves the engine on the whole-batch loop.
215                #[cfg(feature = "governor")]
216                if let Some(ref gov) = governor {
217                    engine.set_byte_budget(gov.budget());
218                }
219                Some(Arc::new(engine))
220            } else {
221                None
222            }
223        };
224
225        // --- Shutdown ---
226        let shutdown = crate::shutdown::install_signal_handler();
227
228        // Start worker pool scaling loop after shutdown token exists
229        #[cfg(feature = "worker-pool")]
230        if let Some(ref pool) = worker_pool {
231            pool.start_scaling_loop(shutdown.clone());
232        }
233
234        // --- Horizontal scaling-pressure engine (CEL over local metrics) ---
235        #[cfg(feature = "scaling")]
236        let scaling_signals = Arc::new(crate::scaling::ScalingSignalsCell::new());
237
238        #[cfg(all(feature = "scaling", feature = "expression"))]
239        let scaling_engine = {
240            let sp_cfg = crate::scaling::ScalingEngineConfig::from_cascade();
241            let inbound = sp_cfg.transport.inbound.as_deref().map_or(
242                crate::scaling::ScalingTransport::Other,
243                crate::scaling::ScalingTransport::from_label,
244            );
245            let outbound = sp_cfg.transport.outbound.as_deref().map_or(
246                crate::scaling::ScalingTransport::Other,
247                crate::scaling::ScalingTransport::from_label,
248            );
249            let (engine, errors) =
250                crate::scaling::ScalingEngine::new(app_name, &sp_cfg, inbound, outbound);
251            for e in &errors {
252                tracing::error!(target: "scaling", "{e}");
253            }
254            let engine = Arc::new(engine);
255            if engine.is_enabled() {
256                tokio::spawn(run_scaling_pressure_loop(
257                    Arc::clone(&engine),
258                    Arc::clone(&scaling_signals),
259                    sp_cfg.interval_secs,
260                    #[cfg(feature = "memory")]
261                    Arc::clone(&memory_guard),
262                    shutdown.clone(),
263                ));
264            }
265            Some(engine)
266        };
267
268        // --- Start metrics server ---
269        if let Err(e) = metrics.start_server(metrics_addr).await {
270            tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
271        }
272
273        // --- Version check (fire-and-forget) ---
274        #[cfg(feature = "version-check")]
275        {
276            crate::VersionCheck::new(crate::VersionCheckConfig {
277                product: app_name.to_string(),
278                current_version: version.to_string(),
279                ..Default::default()
280            })
281            .check_on_startup();
282        }
283
284        // Log runtime context
285        tracing::info!(
286            environment = %ctx.environment,
287            pod_name = ?ctx.pod_name,
288            namespace = ?ctx.namespace,
289            "Service runtime initialised"
290        );
291
292        Ok(Self {
293            metrics,
294            dfe,
295            #[cfg(feature = "memory")]
296            memory_guard,
297            shutdown,
298            context: ctx,
299            #[cfg(feature = "worker-pool")]
300            worker_pool,
301            #[cfg(feature = "worker-batch")]
302            batch_engine,
303            #[cfg(feature = "scaling")]
304            scaling,
305            #[cfg(all(feature = "scaling", feature = "expression"))]
306            scaling_engine,
307            #[cfg(feature = "scaling")]
308            scaling_signals,
309            #[cfg(feature = "governor")]
310            governor,
311        })
312    }
313
314    /// Set the readiness check callback.
315    ///
316    /// Each app defines its own readiness criteria. Call this in `run_service()`
317    /// once you know what "ready" means for your app.
318    pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
319        self.metrics.set_readiness_check(check);
320    }
321
322    /// Return the batch processing engine, if the `worker-batch` feature is
323    /// enabled and the worker pool was successfully created.
324    #[cfg(feature = "worker-batch")]
325    #[must_use]
326    pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
327        self.batch_engine.as_ref()
328    }
329
330    /// Build a governed receive transport from config in ONE call
331    /// (`governor` + `transport` features).
332    ///
333    /// Reads the transport config at `key` and threads the runtime's
334    /// [`governor`](Self::governor) pressure into the receiver's inbound brake
335    /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so apps
336    /// skip the `gate_actuator -> InboundGate -> with_inbound_gate` dance.
337    ///
338    /// When the governor is disabled (`self_regulation.enabled = false`,
339    /// [`governor`](Self::governor) is `None`) this falls back to the plain
340    /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
341    /// -- data path stays byte-identical to pre-governor.
342    ///
343    /// # Errors
344    ///
345    /// Returns the underlying transport error if the config is missing/invalid
346    /// or the backend fails to construct.
347    #[cfg(all(feature = "governor", feature = "transport"))]
348    pub async fn governed_receiver(
349        &self,
350        key: &str,
351    ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
352        use crate::transport::factory::AnyReceiver;
353        match self.governor {
354            Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
355            None => AnyReceiver::from_config(key).await,
356        }
357    }
358}
359
360/// Periodic scaling-pressure tick: sample CPU (rate of the cumulative counter
361/// over the wall window / cores), read the pushed transport signals, and let the
362/// engine evaluate + publish its gauges. Off the data hot-path (interval-driven).
363#[cfg(all(feature = "scaling", feature = "expression"))]
364async fn run_scaling_pressure_loop(
365    engine: Arc<crate::scaling::ScalingEngine>,
366    signals: Arc<crate::scaling::ScalingSignalsCell>,
367    interval_secs: u64,
368    #[cfg(feature = "memory")] memory_guard: Arc<MemoryGuard>,
369    shutdown: CancellationToken,
370) {
371    use std::time::{Duration, Instant};
372
373    // CPU utilisation denominator: the cgroup CPU limit, else the visible core
374    // count. Never 0.
375    let cores = crate::metrics::cpu_limit_cores()
376        .or_else(|| {
377            std::thread::available_parallelism()
378                .ok()
379                .map(|n| n.get() as f64)
380        })
381        .filter(|c| *c > 0.0)
382        .unwrap_or(1.0);
383
384    let mut last_cpu = crate::metrics::cumulative_cpu_seconds();
385    let mut last_at = Instant::now();
386    let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs.max(1)));
387    // tokio's first interval tick fires immediately -- consume it so the first
388    // real sample below spans a full interval (no divide-by-near-zero CPU spike).
389    ticker.tick().await;
390
391    loop {
392        tokio::select! {
393            () = shutdown.cancelled() => break,
394            _ = ticker.tick() => {
395                let now_cpu = crate::metrics::cumulative_cpu_seconds();
396                let now_at = Instant::now();
397                let cpu_ratio = match (last_cpu, now_cpu) {
398                    (Some(prev), Some(cur)) => {
399                        let elapsed = now_at.duration_since(last_at).as_secs_f64().max(1e-3);
400                        // (cur - prev) can go negative on a counter reset -> floor 0.
401                        ((cur - prev).max(0.0) / elapsed) / cores
402                    }
403                    _ => 0.0,
404                };
405                last_cpu = now_cpu;
406                last_at = now_at;
407
408                #[cfg(feature = "memory")]
409                let memory_ratio = memory_guard.pressure_ratio();
410                #[cfg(not(feature = "memory"))]
411                let memory_ratio = 0.0;
412
413                engine.tick(&signals.snapshot(), cpu_ratio, memory_ratio);
414            }
415        }
416    }
417
418    tracing::info!(target: "scaling", "Scaling-pressure loop shutting down");
419}