hyperi_rustlib/cli/runtime.rs
1// Project: hyperi-rustlib
2// File: src/cli/runtime.rs
3// Purpose: ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. It contains all the common infrastructure
13//! that every DFE app needs -- metrics, memory guard, scaling, shutdown,
14//! worker pool, and runtime context. Apps receive it fully wired.
15//!
16//! This eliminates ~50 lines of identical boilerplate per DFE app.
17//!
18//! ## What's included (always)
19//!
20//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
21//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
22//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
23//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
24//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
25//!
26//! ## What's included (when features enabled)
27//!
28//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
29//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
30//!
31//! ## What stays app-specific
32//!
33//! - Readiness check criteria (each app defines "ready" differently)
34//! - Config hot-reload (optional, app-specific reload logic)
35//! - Pipeline creation (100% domain-specific)
36//! - DLQ setup (varies per app)
37//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
38
39use std::sync::Arc;
40
41use tokio_util::sync::CancellationToken;
42
43use crate::env::{RuntimeContext, runtime_context};
44#[cfg(feature = "memory")]
45use crate::memory::{MemoryGuard, MemoryGuardConfig};
46use crate::metrics::MetricsManager;
47
48use super::error::CliError;
49
50/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
51///
52/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
53///
54/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
55/// are automatically disabled. On K8s, they're automatically enabled.
56pub struct ServiceRuntime {
57 /// Metrics manager -- already started, serving endpoints.
58 /// Use for registering app-specific metrics and metric groups.
59 pub metrics: MetricsManager,
60
61 /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
62 pub dfe: Arc<crate::metrics::DfeMetrics>,
63
64 /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
65 /// Auto-detected from env prefix + cgroup limits.
66 #[cfg(feature = "memory")]
67 pub memory_guard: Arc<MemoryGuard>,
68
69 /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
70 /// Clone and pass to your pipeline loops.
71 pub shutdown: CancellationToken,
72
73 /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
74 pub context: &'static RuntimeContext,
75
76 /// Adaptive worker pool for parallel batch processing (`worker` feature).
77 /// `None` if the `worker` feature is not enabled or config fails.
78 #[cfg(feature = "worker-pool")]
79 pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
80
81 /// Batch processing engine with SIMD parsing and pre-route filtering
82 /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
83 /// or worker pool creation failed.
84 #[cfg(feature = "worker-batch")]
85 pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
86
87 /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
88 /// `None` if the `scaling` feature is not enabled.
89 #[cfg(feature = "scaling")]
90 pub scaling: Option<Arc<crate::ScalingPressure>>,
91
92 /// Self-regulation governor (`governor` feature). Built default-ON (opt-out
93 /// via `self_regulation.enabled = false`) from the cascade. `None` when
94 /// disabled by config -- in which case nothing was constructed and the data
95 /// path is byte-identical to pre-governor behaviour.
96 ///
97 /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
98 /// receive transports' inbound gate / `with_pressure` hooks, and the
99 /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
100 /// the [`batch_engine`](Self::batch_engine) governed run path.
101 #[cfg(feature = "governor")]
102 pub governor: Option<crate::SelfRegulationGovernor>,
103}
104
105impl ServiceRuntime {
106 /// Build the service runtime from app configuration.
107 ///
108 /// This is called by `run_app()` -- apps don't call it directly.
109 ///
110 /// # Errors
111 ///
112 /// Returns `CliError` if the metrics server fails to start.
113 pub(crate) async fn build(
114 app_name: &str,
115 env_prefix: &str,
116 metrics_addr: &str,
117 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
118 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
119 #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
120 ) -> Result<Self, CliError> {
121 let ctx = runtime_context();
122
123 // --- Metrics ---
124 let mut metrics = MetricsManager::new(app_name);
125 let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
126
127 // App info metric (version, commit, service name)
128 #[cfg(feature = "metrics-dfe")]
129 {
130 let _app_metrics =
131 crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
132 }
133
134 // --- Memory guard ---
135 #[cfg(feature = "memory")]
136 let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
137
138 // --- Self-regulation governor (default-ON, opt-out) ---
139 //
140 // Constructed HERE -- before the worker pool, batch engine, and the
141 // transports the app builds in run_service() -- so the shared pressure
142 // and byte budget can be threaded into all of them. When
143 // `self_regulation.enabled = false`, `build` returns None and nothing
144 // is constructed: every downstream Option stays None and the data path
145 // is byte-identical to pre-governor behaviour.
146 #[cfg(feature = "governor")]
147 let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
148
149 // --- Scaling pressure ---
150 #[cfg(feature = "scaling")]
151 let scaling = {
152 let config = crate::ScalingPressureConfig::from_cascade();
153 let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
154 metrics.set_scaling_pressure(Arc::clone(&pressure));
155 Some(pressure)
156 };
157
158 // --- Worker pool ---
159 #[cfg(feature = "worker-pool")]
160 let worker_pool = {
161 match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
162 Ok(pool) => {
163 let pool = Arc::new(pool);
164 pool.register_metrics(&metrics);
165 #[cfg(feature = "memory")]
166 pool.set_memory_guard(Arc::clone(&memory_guard));
167 #[cfg(feature = "scaling")]
168 if let Some(ref sp) = scaling {
169 pool.set_scaling_pressure(Arc::clone(sp));
170 }
171 tracing::info!(
172 max_threads = pool.max_threads(),
173 "Adaptive worker pool enabled"
174 );
175 Some(pool)
176 }
177 Err(e) => {
178 tracing::warn!(
179 error = %e,
180 "Worker pool not configured, falling back to sequential"
181 );
182 None
183 }
184 }
185 };
186
187 // --- Batch engine (worker-batch tier only) ---
188 #[cfg(feature = "worker-batch")]
189 let batch_engine = {
190 if let Some(ref pool) = worker_pool {
191 let config =
192 crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
193 .unwrap_or_default();
194 let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
195 engine.auto_wire(
196 &metrics,
197 #[cfg(feature = "memory")]
198 Some(&memory_guard),
199 );
200 // Wire the governor's byte-budget lever so the engine's governed
201 // run path streams in budget-sized sub-blocks. None (governor
202 // off) leaves the engine on the whole-batch loop.
203 #[cfg(feature = "governor")]
204 if let Some(ref gov) = governor {
205 engine.set_byte_budget(gov.budget());
206 }
207 Some(Arc::new(engine))
208 } else {
209 None
210 }
211 };
212
213 // --- Shutdown ---
214 let shutdown = crate::shutdown::install_signal_handler();
215
216 // Start worker pool scaling loop after shutdown token exists
217 #[cfg(feature = "worker-pool")]
218 if let Some(ref pool) = worker_pool {
219 pool.start_scaling_loop(shutdown.clone());
220 }
221
222 // --- Start metrics server ---
223 if let Err(e) = metrics.start_server(metrics_addr).await {
224 tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
225 }
226
227 // --- Version check (fire-and-forget) ---
228 #[cfg(feature = "version-check")]
229 {
230 crate::VersionCheck::new(crate::VersionCheckConfig {
231 product: app_name.to_string(),
232 current_version: version.to_string(),
233 ..Default::default()
234 })
235 .check_on_startup();
236 }
237
238 // Log runtime context
239 tracing::info!(
240 environment = %ctx.environment,
241 pod_name = ?ctx.pod_name,
242 namespace = ?ctx.namespace,
243 "Service runtime initialised"
244 );
245
246 Ok(Self {
247 metrics,
248 dfe,
249 #[cfg(feature = "memory")]
250 memory_guard,
251 shutdown,
252 context: ctx,
253 #[cfg(feature = "worker-pool")]
254 worker_pool,
255 #[cfg(feature = "worker-batch")]
256 batch_engine,
257 #[cfg(feature = "scaling")]
258 scaling,
259 #[cfg(feature = "governor")]
260 governor,
261 })
262 }
263
264 /// Set the readiness check callback.
265 ///
266 /// Each app defines its own readiness criteria. Call this in `run_service()`
267 /// once you know what "ready" means for your app.
268 pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
269 self.metrics.set_readiness_check(check);
270 }
271
272 /// Return the batch processing engine, if the `worker-batch` feature is
273 /// enabled and the worker pool was successfully created.
274 #[cfg(feature = "worker-batch")]
275 #[must_use]
276 pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
277 self.batch_engine.as_ref()
278 }
279
280 /// Build a governed receive transport from config in ONE call
281 /// (`governor` + `transport` features).
282 ///
283 /// This is the receive-side analogue of the byte-budget wiring the runtime
284 /// already does for the [`batch_engine`](Self::batch_engine): it reads the
285 /// transport config at `key` and threads the runtime's
286 /// [`governor`](Self::governor) pressure into the receiver's inbound brake
287 /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so an app
288 /// does NOT repeat the `gate_actuator -> InboundGate -> with_inbound_gate`
289 /// dance by hand.
290 ///
291 /// Construction order is respected for free: the
292 /// [`governor`](Self::governor) was built in `build` BEFORE
293 /// the app calls this in `run_service()`, so the pressure latch already
294 /// exists -- this just clones it (an `Arc` bump) into the transport.
295 ///
296 /// When the governor is disabled (`self_regulation.enabled = false`,
297 /// [`governor`](Self::governor) is `None`) this falls back to the plain
298 /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
299 /// so the data path stays byte-identical to pre-governor behaviour.
300 ///
301 /// # Errors
302 ///
303 /// Returns the underlying transport error if the config is missing/invalid
304 /// or the backend fails to construct.
305 #[cfg(all(feature = "governor", feature = "transport"))]
306 pub async fn governed_receiver(
307 &self,
308 key: &str,
309 ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
310 use crate::transport::factory::AnyReceiver;
311 match self.governor {
312 Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
313 None => AnyReceiver::from_config(key).await,
314 }
315 }
316}