hyperi_rustlib/cli/runtime.rs
1// Project: hyperi-rustlib
2// File: src/cli/runtime.rs
3// Purpose: ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. Apps receive it fully wired -- eliminates
13//! ~50 lines of identical boilerplate per DFE app.
14//!
15//! ## What's included (always)
16//!
17//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
18//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
19//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
20//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
21//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
22//!
23//! ## What's included (when features enabled)
24//!
25//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
26//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
27//!
28//! ## What stays app-specific
29//!
30//! - Readiness check criteria (each app defines "ready" differently)
31//! - Config hot-reload (optional, app-specific reload logic)
32//! - Pipeline creation (100% domain-specific)
33//! - DLQ setup (varies per app)
34//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
35
36use std::sync::Arc;
37
38use tokio_util::sync::CancellationToken;
39
40use crate::env::{RuntimeContext, runtime_context};
41#[cfg(feature = "memory")]
42use crate::memory::{MemoryGuard, MemoryGuardConfig};
43use crate::metrics::MetricsManager;
44
45use super::error::CliError;
46
47/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
48///
49/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
50///
51/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
52/// are automatically disabled. On K8s, they're automatically enabled.
53pub struct ServiceRuntime {
54 /// Metrics manager -- already started, serving endpoints.
55 /// Use for registering app-specific metrics and metric groups.
56 pub metrics: MetricsManager,
57
58 /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
59 pub dfe: Arc<crate::metrics::DfeMetrics>,
60
61 /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
62 /// Auto-detected from env prefix + cgroup limits.
63 #[cfg(feature = "memory")]
64 pub memory_guard: Arc<MemoryGuard>,
65
66 /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
67 /// Clone and pass to your pipeline loops.
68 pub shutdown: CancellationToken,
69
70 /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
71 pub context: &'static RuntimeContext,
72
73 /// Adaptive worker pool for parallel batch processing (`worker` feature).
74 /// `None` if the `worker` feature is not enabled or config fails.
75 #[cfg(feature = "worker-pool")]
76 pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
77
78 /// Batch processing engine with SIMD parsing and pre-route filtering
79 /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
80 /// or worker pool creation failed.
81 #[cfg(feature = "worker-batch")]
82 pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
83
84 /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
85 /// `None` if the `scaling` feature is not enabled.
86 #[cfg(feature = "scaling")]
87 pub scaling: Option<Arc<crate::ScalingPressure>>,
88
89 /// Horizontal scaling-pressure ENGINE (CEL over local, correlated metrics).
90 /// Emits `{ns}_scaling_pressure{name}` per configured pressure plus the
91 /// gratis compound `{ns}_transport_{inbound,outbound}_pressure_ratio` and
92 /// `{ns}_scaling_circuit_open`. `None` unless both `scaling` + `expression`
93 /// are enabled. Runs its own periodic tick (CPU sampled internally).
94 #[cfg(all(feature = "scaling", feature = "expression"))]
95 pub scaling_engine: Option<Arc<crate::scaling::ScalingEngine>>,
96
97 /// Lock-free cell for pushing per-pod transport scaling signals (kafka
98 /// assigned-lag, in-flight, shed rate, circuit, ...) that the
99 /// `scaling_engine` reads each tick. Update it from
100 /// your receive/send loops; CPU is sampled by the engine itself. When no
101 /// signals are pushed, the smart default reduces to CPU-only (ACR F2).
102 #[cfg(feature = "scaling")]
103 pub scaling_signals: Arc<crate::scaling::ScalingSignalsCell>,
104
105 /// Self-regulation governor (`governor` feature). Default-ON, opt-out via
106 /// `self_regulation.enabled = false`. `None` when disabled -- nothing is
107 /// constructed and the data path is byte-identical to pre-governor.
108 ///
109 /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
110 /// receive transports' inbound gate / `with_pressure` hooks. The
111 /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
112 /// the [`batch_engine`](Self::batch_engine) governed run path.
113 #[cfg(feature = "governor")]
114 pub governor: Option<crate::SelfRegulationGovernor>,
115}
116
117impl ServiceRuntime {
118 /// Build the service runtime from app configuration.
119 ///
120 /// This is called by `run_app()` -- apps don't call it directly.
121 ///
122 /// # Errors
123 ///
124 /// Returns `CliError` if the metrics server fails to start.
125 pub(crate) async fn build(
126 app_name: &str,
127 env_prefix: &str,
128 metrics_addr: &str,
129 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
130 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
131 #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
132 ) -> Result<Self, CliError> {
133 let ctx = runtime_context();
134
135 // --- Metrics ---
136 let mut metrics = MetricsManager::new(app_name);
137 let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
138
139 // App info metric (version, commit, service name)
140 #[cfg(feature = "metrics-dfe")]
141 {
142 let _app_metrics =
143 crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
144 }
145
146 // --- Memory guard ---
147 #[cfg(feature = "memory")]
148 let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
149
150 // --- Self-regulation governor (default-ON, opt-out) ---
151 //
152 // Constructed HERE -- before the worker pool, batch engine, and the
153 // transports the app builds in run_service() -- so the shared pressure
154 // and byte budget can be threaded into all of them. When
155 // `self_regulation.enabled = false`, `build` returns None and nothing
156 // is constructed: every downstream Option stays None and the data path
157 // is byte-identical to pre-governor behaviour.
158 #[cfg(feature = "governor")]
159 let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
160
161 // --- Scaling pressure ---
162 #[cfg(feature = "scaling")]
163 let scaling = {
164 let config = crate::ScalingPressureConfig::from_cascade();
165 let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
166 metrics.set_scaling_pressure(Arc::clone(&pressure));
167 Some(pressure)
168 };
169
170 // --- Worker pool ---
171 #[cfg(feature = "worker-pool")]
172 let worker_pool = {
173 match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
174 Ok(pool) => {
175 let pool = Arc::new(pool);
176 pool.register_metrics(&metrics);
177 #[cfg(feature = "memory")]
178 pool.set_memory_guard(Arc::clone(&memory_guard));
179 #[cfg(feature = "scaling")]
180 if let Some(ref sp) = scaling {
181 pool.set_scaling_pressure(Arc::clone(sp));
182 }
183 tracing::info!(
184 max_threads = pool.max_threads(),
185 "Adaptive worker pool enabled"
186 );
187 Some(pool)
188 }
189 Err(e) => {
190 tracing::warn!(
191 error = %e,
192 "Worker pool not configured, falling back to sequential"
193 );
194 None
195 }
196 }
197 };
198
199 // --- Batch engine (worker-batch tier only) ---
200 #[cfg(feature = "worker-batch")]
201 let batch_engine = {
202 if let Some(ref pool) = worker_pool {
203 let config =
204 crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
205 .unwrap_or_default();
206 let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
207 engine.auto_wire(
208 &metrics,
209 #[cfg(feature = "memory")]
210 Some(&memory_guard),
211 );
212 // Wire the governor's byte-budget lever so the engine's governed
213 // run path streams in budget-sized sub-blocks. None (governor
214 // off) leaves the engine on the whole-batch loop.
215 #[cfg(feature = "governor")]
216 if let Some(ref gov) = governor {
217 engine.set_byte_budget(gov.budget());
218 }
219 Some(Arc::new(engine))
220 } else {
221 None
222 }
223 };
224
225 // --- Shutdown ---
226 let shutdown = crate::shutdown::install_signal_handler();
227
228 // Start worker pool scaling loop after shutdown token exists
229 #[cfg(feature = "worker-pool")]
230 if let Some(ref pool) = worker_pool {
231 pool.start_scaling_loop(shutdown.clone());
232 }
233
234 // --- Horizontal scaling-pressure engine (CEL over local metrics) ---
235 #[cfg(feature = "scaling")]
236 let scaling_signals = Arc::new(crate::scaling::ScalingSignalsCell::new());
237
238 #[cfg(all(feature = "scaling", feature = "expression"))]
239 let scaling_engine = {
240 let sp_cfg = crate::scaling::ScalingEngineConfig::from_cascade();
241 let inbound = sp_cfg.transport.inbound.as_deref().map_or(
242 crate::scaling::ScalingTransport::Other,
243 crate::scaling::ScalingTransport::from_label,
244 );
245 let outbound = sp_cfg.transport.outbound.as_deref().map_or(
246 crate::scaling::ScalingTransport::Other,
247 crate::scaling::ScalingTransport::from_label,
248 );
249 let (engine, errors) =
250 crate::scaling::ScalingEngine::new(app_name, &sp_cfg, inbound, outbound);
251 for e in &errors {
252 tracing::error!(target: "scaling", "{e}");
253 }
254 let engine = Arc::new(engine);
255 if engine.is_enabled() {
256 tokio::spawn(run_scaling_pressure_loop(
257 Arc::clone(&engine),
258 Arc::clone(&scaling_signals),
259 sp_cfg.interval_secs,
260 #[cfg(feature = "memory")]
261 Arc::clone(&memory_guard),
262 shutdown.clone(),
263 ));
264 }
265 Some(engine)
266 };
267
268 // --- Start metrics server ---
269 if let Err(e) = metrics.start_server(metrics_addr).await {
270 tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
271 }
272
273 // --- Version check (fire-and-forget) ---
274 #[cfg(feature = "version-check")]
275 {
276 crate::VersionCheck::new(crate::VersionCheckConfig {
277 product: app_name.to_string(),
278 current_version: version.to_string(),
279 ..Default::default()
280 })
281 .check_on_startup();
282 }
283
284 // Turns the previously-silent "from_cascade defaulted everything"
285 // failure into one observable startup line.
286 #[cfg(feature = "config")]
287 log_cascade_section_summary();
288
289 // Log runtime context
290 tracing::info!(
291 environment = %ctx.environment,
292 pod_name = ?ctx.pod_name,
293 namespace = ?ctx.namespace,
294 "Service runtime initialised"
295 );
296
297 Ok(Self {
298 metrics,
299 dfe,
300 #[cfg(feature = "memory")]
301 memory_guard,
302 shutdown,
303 context: ctx,
304 #[cfg(feature = "worker-pool")]
305 worker_pool,
306 #[cfg(feature = "worker-batch")]
307 batch_engine,
308 #[cfg(feature = "scaling")]
309 scaling,
310 #[cfg(all(feature = "scaling", feature = "expression"))]
311 scaling_engine,
312 #[cfg(feature = "scaling")]
313 scaling_signals,
314 #[cfg(feature = "governor")]
315 governor,
316 })
317 }
318
319 /// Set the readiness check callback.
320 ///
321 /// Each app defines its own readiness criteria. Call this in `run_service()`
322 /// once you know what "ready" means for your app.
323 pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
324 self.metrics.set_readiness_check(check);
325 }
326
327 /// Return the batch processing engine, if the `worker-batch` feature is
328 /// enabled and the worker pool was successfully created.
329 #[cfg(feature = "worker-batch")]
330 #[must_use]
331 pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
332 self.batch_engine.as_ref()
333 }
334
335 /// Build a governed receive transport from config in ONE call
336 /// (`governor` + `transport` features).
337 ///
338 /// Reads the transport config at `key` and threads the runtime's
339 /// [`governor`](Self::governor) pressure into the receiver's inbound brake
340 /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so apps
341 /// skip the `gate_actuator -> InboundGate -> with_inbound_gate` dance.
342 ///
343 /// When the governor is disabled (`self_regulation.enabled = false`,
344 /// [`governor`](Self::governor) is `None`) this falls back to the plain
345 /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
346 /// -- data path stays byte-identical to pre-governor.
347 ///
348 /// # Errors
349 ///
350 /// Returns the underlying transport error if the config is missing/invalid
351 /// or the backend fails to construct.
352 #[cfg(all(feature = "governor", feature = "transport"))]
353 pub async fn governed_receiver(
354 &self,
355 key: &str,
356 ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
357 use crate::transport::factory::AnyReceiver;
358 match self.governor {
359 Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
360 None => AnyReceiver::from_config(key).await,
361 }
362 }
363}
364
365/// Emit one startup line summarising which platform config sections were found
366/// in the cascade vs defaulted. Cheap (key-presence checks, no deserialisation).
367/// This is the observable counterpart to the silent pre-2.8.11 failure where
368/// `from_cascade` defaulted everything because the cascade was never populated.
369#[cfg(feature = "config")]
370fn log_cascade_section_summary() {
371 let cfg = crate::config::try_get();
372 let present = |key: &str| cfg.is_some_and(|c| c.contains(key));
373 tracing::info!(
374 cascade_initialised = cfg.is_some(),
375 self_regulation = present("self_regulation"),
376 worker_pool = present("worker_pool"),
377 batch_processing = present("batch_processing"),
378 scaling = present("scaling"),
379 expression = present("expression"),
380 "Config cascade sections (true = found in config, false = using defaults)"
381 );
382}
383
384/// Periodic scaling-pressure tick: sample CPU (rate of the cumulative counter
385/// over the wall window / cores), read the pushed transport signals, and let the
386/// engine evaluate + publish its gauges. Off the data hot-path (interval-driven).
387#[cfg(all(feature = "scaling", feature = "expression"))]
388async fn run_scaling_pressure_loop(
389 engine: Arc<crate::scaling::ScalingEngine>,
390 signals: Arc<crate::scaling::ScalingSignalsCell>,
391 interval_secs: u64,
392 #[cfg(feature = "memory")] memory_guard: Arc<MemoryGuard>,
393 shutdown: CancellationToken,
394) {
395 use std::time::{Duration, Instant};
396
397 // CPU utilisation denominator: the cgroup CPU limit, else the visible core
398 // count. Never 0.
399 let cores = crate::metrics::cpu_limit_cores()
400 .or_else(|| {
401 std::thread::available_parallelism()
402 .ok()
403 .map(|n| n.get() as f64)
404 })
405 .filter(|c| *c > 0.0)
406 .unwrap_or(1.0);
407
408 let mut last_cpu = crate::metrics::cumulative_cpu_seconds();
409 let mut last_at = Instant::now();
410 let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs.max(1)));
411 // tokio's first interval tick fires immediately -- consume it so the first
412 // real sample below spans a full interval (no divide-by-near-zero CPU spike).
413 ticker.tick().await;
414
415 loop {
416 tokio::select! {
417 () = shutdown.cancelled() => break,
418 _ = ticker.tick() => {
419 let now_cpu = crate::metrics::cumulative_cpu_seconds();
420 let now_at = Instant::now();
421 let cpu_ratio = match (last_cpu, now_cpu) {
422 (Some(prev), Some(cur)) => {
423 let elapsed = now_at.duration_since(last_at).as_secs_f64().max(1e-3);
424 // (cur - prev) can go negative on a counter reset -> floor 0.
425 ((cur - prev).max(0.0) / elapsed) / cores
426 }
427 _ => 0.0,
428 };
429 last_cpu = now_cpu;
430 last_at = now_at;
431
432 #[cfg(feature = "memory")]
433 let memory_ratio = memory_guard.pressure_ratio();
434 #[cfg(not(feature = "memory"))]
435 let memory_ratio = 0.0;
436
437 engine.tick(&signals.snapshot(), cpu_ratio, memory_ratio);
438 }
439 }
440 }
441
442 tracing::info!(target: "scaling", "Scaling-pressure loop shutting down");
443}