hyperi_rustlib/cli/runtime.rs
1// Project: hyperi-rustlib
2// File: src/cli/runtime.rs
3// Purpose: ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. Apps receive it fully wired -- eliminates
13//! ~50 lines of identical boilerplate per DFE app.
14//!
15//! ## What's included (always)
16//!
17//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
18//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
19//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
20//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
21//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
22//!
23//! ## What's included (when features enabled)
24//!
25//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
26//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
27//!
28//! ## What stays app-specific
29//!
30//! - Readiness check criteria (each app defines "ready" differently)
31//! - Config hot-reload (optional, app-specific reload logic)
32//! - Pipeline creation (100% domain-specific)
33//! - DLQ setup (varies per app)
34//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
35
36use std::sync::Arc;
37
38use tokio_util::sync::CancellationToken;
39
40use crate::env::{RuntimeContext, runtime_context};
41#[cfg(feature = "memory")]
42use crate::memory::{MemoryGuard, MemoryGuardConfig};
43use crate::metrics::MetricsManager;
44
45use super::error::CliError;
46
47/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
48///
49/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
50///
51/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
52/// are automatically disabled. On K8s, they're automatically enabled.
53pub struct ServiceRuntime {
54 /// Metrics manager -- already started, serving endpoints.
55 /// Use for registering app-specific metrics and metric groups.
56 pub metrics: MetricsManager,
57
58 /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
59 pub dfe: Arc<crate::metrics::DfeMetrics>,
60
61 /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
62 /// Auto-detected from env prefix + cgroup limits.
63 #[cfg(feature = "memory")]
64 pub memory_guard: Arc<MemoryGuard>,
65
66 /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
67 /// Clone and pass to your pipeline loops.
68 pub shutdown: CancellationToken,
69
70 /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
71 pub context: &'static RuntimeContext,
72
73 /// Adaptive worker pool for parallel batch processing (`worker` feature).
74 /// `None` if the `worker` feature is not enabled or config fails.
75 #[cfg(feature = "worker-pool")]
76 pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
77
78 /// Batch processing engine with SIMD parsing and pre-route filtering
79 /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
80 /// or worker pool creation failed.
81 #[cfg(feature = "worker-batch")]
82 pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
83
84 /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
85 /// `None` if the `scaling` feature is not enabled.
86 #[cfg(feature = "scaling")]
87 pub scaling: Option<Arc<crate::ScalingPressure>>,
88
89 /// Self-regulation governor (`governor` feature). Default-ON, opt-out via
90 /// `self_regulation.enabled = false`. `None` when disabled -- nothing is
91 /// constructed and the data path is byte-identical to pre-governor.
92 ///
93 /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
94 /// receive transports' inbound gate / `with_pressure` hooks. The
95 /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
96 /// the [`batch_engine`](Self::batch_engine) governed run path.
97 #[cfg(feature = "governor")]
98 pub governor: Option<crate::SelfRegulationGovernor>,
99}
100
101impl ServiceRuntime {
102 /// Build the service runtime from app configuration.
103 ///
104 /// This is called by `run_app()` -- apps don't call it directly.
105 ///
106 /// # Errors
107 ///
108 /// Returns `CliError` if the metrics server fails to start.
109 pub(crate) async fn build(
110 app_name: &str,
111 env_prefix: &str,
112 metrics_addr: &str,
113 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
114 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
115 #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
116 ) -> Result<Self, CliError> {
117 let ctx = runtime_context();
118
119 // --- Metrics ---
120 let mut metrics = MetricsManager::new(app_name);
121 let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
122
123 // App info metric (version, commit, service name)
124 #[cfg(feature = "metrics-dfe")]
125 {
126 let _app_metrics =
127 crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
128 }
129
130 // --- Memory guard ---
131 #[cfg(feature = "memory")]
132 let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
133
134 // --- Self-regulation governor (default-ON, opt-out) ---
135 //
136 // Constructed HERE -- before the worker pool, batch engine, and the
137 // transports the app builds in run_service() -- so the shared pressure
138 // and byte budget can be threaded into all of them. When
139 // `self_regulation.enabled = false`, `build` returns None and nothing
140 // is constructed: every downstream Option stays None and the data path
141 // is byte-identical to pre-governor behaviour.
142 #[cfg(feature = "governor")]
143 let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
144
145 // --- Scaling pressure ---
146 #[cfg(feature = "scaling")]
147 let scaling = {
148 let config = crate::ScalingPressureConfig::from_cascade();
149 let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
150 metrics.set_scaling_pressure(Arc::clone(&pressure));
151 Some(pressure)
152 };
153
154 // --- Worker pool ---
155 #[cfg(feature = "worker-pool")]
156 let worker_pool = {
157 match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
158 Ok(pool) => {
159 let pool = Arc::new(pool);
160 pool.register_metrics(&metrics);
161 #[cfg(feature = "memory")]
162 pool.set_memory_guard(Arc::clone(&memory_guard));
163 #[cfg(feature = "scaling")]
164 if let Some(ref sp) = scaling {
165 pool.set_scaling_pressure(Arc::clone(sp));
166 }
167 tracing::info!(
168 max_threads = pool.max_threads(),
169 "Adaptive worker pool enabled"
170 );
171 Some(pool)
172 }
173 Err(e) => {
174 tracing::warn!(
175 error = %e,
176 "Worker pool not configured, falling back to sequential"
177 );
178 None
179 }
180 }
181 };
182
183 // --- Batch engine (worker-batch tier only) ---
184 #[cfg(feature = "worker-batch")]
185 let batch_engine = {
186 if let Some(ref pool) = worker_pool {
187 let config =
188 crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
189 .unwrap_or_default();
190 let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
191 engine.auto_wire(
192 &metrics,
193 #[cfg(feature = "memory")]
194 Some(&memory_guard),
195 );
196 // Wire the governor's byte-budget lever so the engine's governed
197 // run path streams in budget-sized sub-blocks. None (governor
198 // off) leaves the engine on the whole-batch loop.
199 #[cfg(feature = "governor")]
200 if let Some(ref gov) = governor {
201 engine.set_byte_budget(gov.budget());
202 }
203 Some(Arc::new(engine))
204 } else {
205 None
206 }
207 };
208
209 // --- Shutdown ---
210 let shutdown = crate::shutdown::install_signal_handler();
211
212 // Start worker pool scaling loop after shutdown token exists
213 #[cfg(feature = "worker-pool")]
214 if let Some(ref pool) = worker_pool {
215 pool.start_scaling_loop(shutdown.clone());
216 }
217
218 // --- Start metrics server ---
219 if let Err(e) = metrics.start_server(metrics_addr).await {
220 tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
221 }
222
223 // --- Version check (fire-and-forget) ---
224 #[cfg(feature = "version-check")]
225 {
226 crate::VersionCheck::new(crate::VersionCheckConfig {
227 product: app_name.to_string(),
228 current_version: version.to_string(),
229 ..Default::default()
230 })
231 .check_on_startup();
232 }
233
234 // Log runtime context
235 tracing::info!(
236 environment = %ctx.environment,
237 pod_name = ?ctx.pod_name,
238 namespace = ?ctx.namespace,
239 "Service runtime initialised"
240 );
241
242 Ok(Self {
243 metrics,
244 dfe,
245 #[cfg(feature = "memory")]
246 memory_guard,
247 shutdown,
248 context: ctx,
249 #[cfg(feature = "worker-pool")]
250 worker_pool,
251 #[cfg(feature = "worker-batch")]
252 batch_engine,
253 #[cfg(feature = "scaling")]
254 scaling,
255 #[cfg(feature = "governor")]
256 governor,
257 })
258 }
259
260 /// Set the readiness check callback.
261 ///
262 /// Each app defines its own readiness criteria. Call this in `run_service()`
263 /// once you know what "ready" means for your app.
264 pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
265 self.metrics.set_readiness_check(check);
266 }
267
268 /// Return the batch processing engine, if the `worker-batch` feature is
269 /// enabled and the worker pool was successfully created.
270 #[cfg(feature = "worker-batch")]
271 #[must_use]
272 pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
273 self.batch_engine.as_ref()
274 }
275
276 /// Build a governed receive transport from config in ONE call
277 /// (`governor` + `transport` features).
278 ///
279 /// Reads the transport config at `key` and threads the runtime's
280 /// [`governor`](Self::governor) pressure into the receiver's inbound brake
281 /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so apps
282 /// skip the `gate_actuator -> InboundGate -> with_inbound_gate` dance.
283 ///
284 /// When the governor is disabled (`self_regulation.enabled = false`,
285 /// [`governor`](Self::governor) is `None`) this falls back to the plain
286 /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
287 /// -- data path stays byte-identical to pre-governor.
288 ///
289 /// # Errors
290 ///
291 /// Returns the underlying transport error if the config is missing/invalid
292 /// or the backend fails to construct.
293 #[cfg(all(feature = "governor", feature = "transport"))]
294 pub async fn governed_receiver(
295 &self,
296 key: &str,
297 ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
298 use crate::transport::factory::AnyReceiver;
299 match self.governor {
300 Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
301 None => AnyReceiver::from_config(key).await,
302 }
303 }
304}