hyperi_rustlib/cli/runtime.rs
1// Project: hyperi-rustlib
2// File: src/cli/runtime.rs
3// Purpose: ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. Apps receive it fully wired -- eliminates
13//! ~50 lines of identical boilerplate per DFE app.
14//!
15//! ## What's included (always)
16//!
17//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
18//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
19//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
20//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
21//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
22//!
23//! ## What's included (when features enabled)
24//!
25//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
26//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
27//!
28//! ## What stays app-specific
29//!
30//! - Readiness check criteria (each app defines "ready" differently)
31//! - Config hot-reload (optional, app-specific reload logic)
32//! - Pipeline creation (100% domain-specific)
33//! - DLQ setup (varies per app)
34//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
35
36use std::sync::Arc;
37
38use tokio_util::sync::CancellationToken;
39
40use crate::env::{RuntimeContext, runtime_context};
41#[cfg(feature = "memory")]
42use crate::memory::{MemoryGuard, MemoryGuardConfig};
43use crate::metrics::MetricsManager;
44
45use super::error::CliError;
46
47/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
48///
49/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
50///
51/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
52/// are automatically disabled. On K8s, they're automatically enabled.
53pub struct ServiceRuntime {
54 /// Metrics manager -- already started, serving endpoints.
55 /// Use for registering app-specific metrics and metric groups.
56 pub metrics: MetricsManager,
57
58 /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
59 pub dfe: Arc<crate::metrics::DfeMetrics>,
60
61 /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
62 /// Auto-detected from env prefix + cgroup limits.
63 #[cfg(feature = "memory")]
64 pub memory_guard: Arc<MemoryGuard>,
65
66 /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
67 /// Clone and pass to your pipeline loops.
68 pub shutdown: CancellationToken,
69
70 /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
71 pub context: &'static RuntimeContext,
72
73 /// Adaptive worker pool for parallel batch processing (`worker` feature).
74 /// `None` if the `worker` feature is not enabled or config fails.
75 #[cfg(feature = "worker-pool")]
76 pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
77
78 /// Batch processing engine with SIMD parsing and pre-route filtering
79 /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
80 /// or worker pool creation failed.
81 #[cfg(feature = "worker-batch")]
82 pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
83
84 /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
85 /// `None` if the `scaling` feature is not enabled.
86 #[cfg(feature = "scaling")]
87 pub scaling: Option<Arc<crate::ScalingPressure>>,
88
89 /// Horizontal scaling-pressure ENGINE (CEL over local, correlated metrics).
90 /// Emits `{ns}_scaling_pressure{name}` per configured pressure plus the
91 /// gratis compound `{ns}_transport_{inbound,outbound}_pressure_ratio` and
92 /// `{ns}_scaling_circuit_open`. `None` unless both `scaling` + `expression`
93 /// are enabled. Runs its own periodic tick (CPU sampled internally).
94 #[cfg(all(feature = "scaling", feature = "expression"))]
95 pub scaling_engine: Option<Arc<crate::scaling::ScalingEngine>>,
96
97 /// Lock-free cell for pushing per-pod transport scaling signals (kafka
98 /// assigned-lag, in-flight, shed rate, circuit, ...) that the
99 /// `scaling_engine` reads each tick. Update it from
100 /// your receive/send loops; CPU is sampled by the engine itself. When no
101 /// signals are pushed, the smart default reduces to CPU-only (ACR F2).
102 #[cfg(feature = "scaling")]
103 pub scaling_signals: Arc<crate::scaling::ScalingSignalsCell>,
104
105 /// Self-regulation governor (`governor` feature). Default-ON, opt-out via
106 /// `self_regulation.enabled = false`. `None` when disabled -- nothing is
107 /// constructed and the data path is byte-identical to pre-governor.
108 ///
109 /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
110 /// receive transports' inbound gate / `with_pressure` hooks. The
111 /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
112 /// the [`batch_engine`](Self::batch_engine) governed run path.
113 #[cfg(feature = "governor")]
114 pub governor: Option<crate::SelfRegulationGovernor>,
115}
116
117impl ServiceRuntime {
118 /// Build the service runtime from app configuration.
119 ///
120 /// This is called by `run_app()` -- apps don't call it directly.
121 ///
122 /// # Errors
123 ///
124 /// Returns `CliError` if the metrics server fails to start.
125 pub(crate) async fn build(
126 app_name: &str,
127 env_prefix: &str,
128 metrics_addr: &str,
129 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
130 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
131 #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
132 ) -> Result<Self, CliError> {
133 let ctx = runtime_context();
134
135 // --- Metrics ---
136 let mut metrics = MetricsManager::new(app_name);
137 let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
138
139 // App info metric (version, commit, service name)
140 #[cfg(feature = "metrics-dfe")]
141 {
142 let _app_metrics =
143 crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
144 }
145
146 // --- Memory guard ---
147 // Cascade `memory:` section is the base, flat {PREFIX}_MEMORY_* env
148 // overlaid on top. `from_env` alone (pre-2.8.12) read only the flat
149 // env and silently ignored the YAML `memory:` section.
150 #[cfg(feature = "memory")]
151 let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_cascade_with_env(
152 env_prefix,
153 )));
154
155 // --- Self-regulation governor (default-ON, opt-out) ---
156 //
157 // Constructed HERE -- before the worker pool, batch engine, and the
158 // transports the app builds in run_service() -- so the shared pressure
159 // and byte budget can be threaded into all of them. When
160 // `self_regulation.enabled = false`, `build` returns None and nothing
161 // is constructed: every downstream Option stays None and the data path
162 // is byte-identical to pre-governor behaviour.
163 #[cfg(feature = "governor")]
164 let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
165
166 // --- Scaling pressure ---
167 #[cfg(feature = "scaling")]
168 let scaling = {
169 let config = crate::ScalingPressureConfig::from_cascade();
170 let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
171 metrics.set_scaling_pressure(Arc::clone(&pressure));
172 Some(pressure)
173 };
174
175 // --- Worker pool ---
176 #[cfg(feature = "worker-pool")]
177 let worker_pool = {
178 match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
179 Ok(pool) => {
180 let pool = Arc::new(pool);
181 pool.register_metrics(&metrics);
182 #[cfg(feature = "memory")]
183 pool.set_memory_guard(Arc::clone(&memory_guard));
184 #[cfg(feature = "scaling")]
185 if let Some(ref sp) = scaling {
186 pool.set_scaling_pressure(Arc::clone(sp));
187 }
188 tracing::info!(
189 max_threads = pool.max_threads(),
190 "Adaptive worker pool enabled"
191 );
192 Some(pool)
193 }
194 Err(e) => {
195 tracing::warn!(
196 error = %e,
197 "Worker pool not configured, falling back to sequential"
198 );
199 None
200 }
201 }
202 };
203
204 // --- Batch engine (worker-batch tier only) ---
205 #[cfg(feature = "worker-batch")]
206 let batch_engine = {
207 if let Some(ref pool) = worker_pool {
208 let config =
209 crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
210 .unwrap_or_default();
211 let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
212 engine.auto_wire(
213 &metrics,
214 #[cfg(feature = "memory")]
215 Some(&memory_guard),
216 );
217 // Wire the governor's byte-budget lever so the engine's governed
218 // run path streams in budget-sized sub-blocks. None (governor
219 // off) leaves the engine on the whole-batch loop.
220 #[cfg(feature = "governor")]
221 if let Some(ref gov) = governor {
222 engine.set_byte_budget(gov.budget());
223 }
224 Some(Arc::new(engine))
225 } else {
226 None
227 }
228 };
229
230 // --- Shutdown ---
231 let shutdown = crate::shutdown::install_signal_handler();
232
233 // Start worker pool scaling loop after shutdown token exists
234 #[cfg(feature = "worker-pool")]
235 if let Some(ref pool) = worker_pool {
236 pool.start_scaling_loop(shutdown.clone());
237 }
238
239 // --- Horizontal scaling-pressure engine (CEL over local metrics) ---
240 #[cfg(feature = "scaling")]
241 let scaling_signals = Arc::new(crate::scaling::ScalingSignalsCell::new());
242
243 #[cfg(all(feature = "scaling", feature = "expression"))]
244 let scaling_engine = {
245 let sp_cfg = crate::scaling::ScalingEngineConfig::from_cascade();
246 let inbound = sp_cfg.transport.inbound.as_deref().map_or(
247 crate::scaling::ScalingTransport::Other,
248 crate::scaling::ScalingTransport::from_label,
249 );
250 let outbound = sp_cfg.transport.outbound.as_deref().map_or(
251 crate::scaling::ScalingTransport::Other,
252 crate::scaling::ScalingTransport::from_label,
253 );
254 let (engine, errors) =
255 crate::scaling::ScalingEngine::new(app_name, &sp_cfg, inbound, outbound);
256 for e in &errors {
257 tracing::error!(target: "scaling", "{e}");
258 }
259 let engine = Arc::new(engine);
260 if engine.is_enabled() {
261 tokio::spawn(run_scaling_pressure_loop(
262 Arc::clone(&engine),
263 Arc::clone(&scaling_signals),
264 sp_cfg.interval_secs,
265 #[cfg(feature = "memory")]
266 Arc::clone(&memory_guard),
267 shutdown.clone(),
268 ));
269 }
270 Some(engine)
271 };
272
273 // --- Start metrics server ---
274 if let Err(e) = metrics.start_server(metrics_addr).await {
275 tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
276 }
277
278 // --- Version check (fire-and-forget) ---
279 #[cfg(feature = "version-check")]
280 {
281 crate::VersionCheck::new(crate::VersionCheckConfig {
282 product: app_name.to_string(),
283 current_version: version.to_string(),
284 ..Default::default()
285 })
286 .check_on_startup();
287 }
288
289 // Turns the previously-silent "from_cascade defaulted everything"
290 // failure into one observable startup line.
291 #[cfg(feature = "config")]
292 log_cascade_section_summary();
293
294 // Log runtime context
295 tracing::info!(
296 environment = %ctx.environment,
297 pod_name = ?ctx.pod_name,
298 namespace = ?ctx.namespace,
299 "Service runtime initialised"
300 );
301
302 Ok(Self {
303 metrics,
304 dfe,
305 #[cfg(feature = "memory")]
306 memory_guard,
307 shutdown,
308 context: ctx,
309 #[cfg(feature = "worker-pool")]
310 worker_pool,
311 #[cfg(feature = "worker-batch")]
312 batch_engine,
313 #[cfg(feature = "scaling")]
314 scaling,
315 #[cfg(all(feature = "scaling", feature = "expression"))]
316 scaling_engine,
317 #[cfg(feature = "scaling")]
318 scaling_signals,
319 #[cfg(feature = "governor")]
320 governor,
321 })
322 }
323
324 /// Set the readiness check callback.
325 ///
326 /// Each app defines its own readiness criteria. Call this in `run_service()`
327 /// once you know what "ready" means for your app.
328 pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
329 self.metrics.set_readiness_check(check);
330 }
331
332 /// Return the batch processing engine, if the `worker-batch` feature is
333 /// enabled and the worker pool was successfully created.
334 #[cfg(feature = "worker-batch")]
335 #[must_use]
336 pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
337 self.batch_engine.as_ref()
338 }
339
340 /// Build a governed receive transport from config in ONE call
341 /// (`governor` + `transport` features).
342 ///
343 /// Reads the transport config at `key` and threads the runtime's
344 /// [`governor`](Self::governor) pressure into the receiver's inbound brake
345 /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so apps
346 /// skip the `gate_actuator -> InboundGate -> with_inbound_gate` dance.
347 ///
348 /// When the governor is disabled (`self_regulation.enabled = false`,
349 /// [`governor`](Self::governor) is `None`) this falls back to the plain
350 /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
351 /// -- data path stays byte-identical to pre-governor.
352 ///
353 /// # Errors
354 ///
355 /// Returns the underlying transport error if the config is missing/invalid
356 /// or the backend fails to construct.
357 #[cfg(all(feature = "governor", feature = "transport"))]
358 pub async fn governed_receiver(
359 &self,
360 key: &str,
361 ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
362 use crate::transport::factory::AnyReceiver;
363 match self.governor {
364 Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
365 None => AnyReceiver::from_config(key).await,
366 }
367 }
368}
369
370/// Emit one startup line summarising which platform config sections were found
371/// in the cascade vs defaulted. Cheap (key-presence checks, no deserialisation).
372/// This is the observable counterpart to the silent pre-2.8.11 failure where
373/// `from_cascade` defaulted everything because the cascade was never populated.
374#[cfg(feature = "config")]
375fn log_cascade_section_summary() {
376 let cfg = crate::config::try_get();
377 let present = |key: &str| cfg.is_some_and(|c| c.contains(key));
378 tracing::info!(
379 cascade_initialised = cfg.is_some(),
380 self_regulation = present("self_regulation"),
381 worker_pool = present("worker_pool"),
382 batch_processing = present("batch_processing"),
383 scaling = present("scaling"),
384 expression = present("expression"),
385 "Config cascade sections (true = found in config, false = using defaults)"
386 );
387}
388
389/// Periodic scaling-pressure tick: sample CPU (rate of the cumulative counter
390/// over the wall window / cores), read the pushed transport signals, and let the
391/// engine evaluate + publish its gauges. Off the data hot-path (interval-driven).
392#[cfg(all(feature = "scaling", feature = "expression"))]
393async fn run_scaling_pressure_loop(
394 engine: Arc<crate::scaling::ScalingEngine>,
395 signals: Arc<crate::scaling::ScalingSignalsCell>,
396 interval_secs: u64,
397 #[cfg(feature = "memory")] memory_guard: Arc<MemoryGuard>,
398 shutdown: CancellationToken,
399) {
400 use std::time::{Duration, Instant};
401
402 // CPU utilisation denominator: the cgroup CPU limit, else the visible core
403 // count. Never 0.
404 let cores = crate::metrics::cpu_limit_cores()
405 .or_else(|| {
406 std::thread::available_parallelism()
407 .ok()
408 .map(|n| n.get() as f64)
409 })
410 .filter(|c| *c > 0.0)
411 .unwrap_or(1.0);
412
413 let mut last_cpu = crate::metrics::cumulative_cpu_seconds();
414 let mut last_at = Instant::now();
415 let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs.max(1)));
416 // tokio's first interval tick fires immediately -- consume it so the first
417 // real sample below spans a full interval (no divide-by-near-zero CPU spike).
418 ticker.tick().await;
419
420 loop {
421 tokio::select! {
422 () = shutdown.cancelled() => break,
423 _ = ticker.tick() => {
424 let now_cpu = crate::metrics::cumulative_cpu_seconds();
425 let now_at = Instant::now();
426 let cpu_ratio = match (last_cpu, now_cpu) {
427 (Some(prev), Some(cur)) => {
428 let elapsed = now_at.duration_since(last_at).as_secs_f64().max(1e-3);
429 // (cur - prev) can go negative on a counter reset -> floor 0.
430 ((cur - prev).max(0.0) / elapsed) / cores
431 }
432 _ => 0.0,
433 };
434 last_cpu = now_cpu;
435 last_at = now_at;
436
437 #[cfg(feature = "memory")]
438 let memory_ratio = memory_guard.pressure_ratio();
439 #[cfg(not(feature = "memory"))]
440 let memory_ratio = 0.0;
441
442 engine.tick(&signals.snapshot(), cpu_ratio, memory_ratio);
443 }
444 }
445 }
446
447 tracing::info!(target: "scaling", "Scaling-pressure loop shutting down");
448}