hyperi_rustlib/cli/runtime.rs
1// Project: hyperi-rustlib
2// File: src/cli/runtime.rs
3// Purpose: ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. It contains all the common infrastructure
13//! that every DFE app needs -- metrics, memory guard, scaling, shutdown,
14//! worker pool, and runtime context. Apps receive it fully wired.
15//!
16//! This eliminates ~50 lines of identical boilerplate per DFE app.
17//!
18//! ## What's included (always)
19//!
20//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
21//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
22//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
23//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
24//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
25//!
26//! ## What's included (when features enabled)
27//!
28//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
29//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
30//!
31//! ## What stays app-specific
32//!
33//! - Readiness check criteria (each app defines "ready" differently)
34//! - Config hot-reload (optional, app-specific reload logic)
35//! - Pipeline creation (100% domain-specific)
36//! - DLQ setup (varies per app)
37//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
38
39use std::sync::Arc;
40
41use tokio_util::sync::CancellationToken;
42
43use crate::env::{RuntimeContext, runtime_context};
44#[cfg(feature = "memory")]
45use crate::memory::{MemoryGuard, MemoryGuardConfig};
46use crate::metrics::MetricsManager;
47
48use super::error::CliError;
49
50/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
51///
52/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
53///
54/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
55/// are automatically disabled. On K8s, they're automatically enabled.
56pub struct ServiceRuntime {
57 /// Metrics manager -- already started, serving endpoints.
58 /// Use for registering app-specific metrics and metric groups.
59 pub metrics: MetricsManager,
60
61 /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
62 pub dfe: Arc<crate::metrics::DfeMetrics>,
63
64 /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
65 /// Auto-detected from env prefix + cgroup limits.
66 #[cfg(feature = "memory")]
67 pub memory_guard: Arc<MemoryGuard>,
68
69 /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
70 /// Clone and pass to your pipeline loops.
71 pub shutdown: CancellationToken,
72
73 /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
74 pub context: &'static RuntimeContext,
75
76 /// Adaptive worker pool for parallel batch processing (`worker` feature).
77 /// `None` if the `worker` feature is not enabled or config fails.
78 #[cfg(feature = "worker-pool")]
79 pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
80
81 /// Batch processing engine with SIMD parsing and pre-route filtering
82 /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
83 /// or worker pool creation failed.
84 #[cfg(feature = "worker-batch")]
85 pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
86
87 /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
88 /// `None` if the `scaling` feature is not enabled.
89 #[cfg(feature = "scaling")]
90 pub scaling: Option<Arc<crate::ScalingPressure>>,
91}
92
93impl ServiceRuntime {
94 /// Build the service runtime from app configuration.
95 ///
96 /// This is called by `run_app()` -- apps don't call it directly.
97 ///
98 /// # Errors
99 ///
100 /// Returns `CliError` if the metrics server fails to start.
101 pub(crate) async fn build(
102 app_name: &str,
103 env_prefix: &str,
104 metrics_addr: &str,
105 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
106 #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
107 #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
108 ) -> Result<Self, CliError> {
109 let ctx = runtime_context();
110
111 // --- Metrics ---
112 let mut metrics = MetricsManager::new(app_name);
113 let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
114
115 // App info metric (version, commit, service name)
116 #[cfg(feature = "metrics-dfe")]
117 {
118 let _app_metrics =
119 crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
120 }
121
122 // --- Memory guard ---
123 #[cfg(feature = "memory")]
124 let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
125
126 // --- Scaling pressure ---
127 #[cfg(feature = "scaling")]
128 let scaling = {
129 let config = crate::ScalingPressureConfig::from_cascade();
130 let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
131 metrics.set_scaling_pressure(Arc::clone(&pressure));
132 Some(pressure)
133 };
134
135 // --- Worker pool ---
136 #[cfg(feature = "worker-pool")]
137 let worker_pool = {
138 match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
139 Ok(pool) => {
140 let pool = Arc::new(pool);
141 pool.register_metrics(&metrics);
142 #[cfg(feature = "memory")]
143 pool.set_memory_guard(Arc::clone(&memory_guard));
144 #[cfg(feature = "scaling")]
145 if let Some(ref sp) = scaling {
146 pool.set_scaling_pressure(Arc::clone(sp));
147 }
148 tracing::info!(
149 max_threads = pool.max_threads(),
150 "Adaptive worker pool enabled"
151 );
152 Some(pool)
153 }
154 Err(e) => {
155 tracing::warn!(
156 error = %e,
157 "Worker pool not configured, falling back to sequential"
158 );
159 None
160 }
161 }
162 };
163
164 // --- Batch engine (worker-batch tier only) ---
165 #[cfg(feature = "worker-batch")]
166 let batch_engine = {
167 if let Some(ref pool) = worker_pool {
168 let config =
169 crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
170 .unwrap_or_default();
171 let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
172 engine.auto_wire(
173 &metrics,
174 #[cfg(feature = "memory")]
175 Some(&memory_guard),
176 );
177 Some(Arc::new(engine))
178 } else {
179 None
180 }
181 };
182
183 // --- Shutdown ---
184 let shutdown = crate::shutdown::install_signal_handler();
185
186 // Start worker pool scaling loop after shutdown token exists
187 #[cfg(feature = "worker-pool")]
188 if let Some(ref pool) = worker_pool {
189 pool.start_scaling_loop(shutdown.clone());
190 }
191
192 // --- Start metrics server ---
193 if let Err(e) = metrics.start_server(metrics_addr).await {
194 tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
195 }
196
197 // --- Version check (fire-and-forget) ---
198 #[cfg(feature = "version-check")]
199 {
200 crate::VersionCheck::new(crate::VersionCheckConfig {
201 product: app_name.to_string(),
202 current_version: version.to_string(),
203 ..Default::default()
204 })
205 .check_on_startup();
206 }
207
208 // Log runtime context
209 tracing::info!(
210 environment = %ctx.environment,
211 pod_name = ?ctx.pod_name,
212 namespace = ?ctx.namespace,
213 "Service runtime initialised"
214 );
215
216 Ok(Self {
217 metrics,
218 dfe,
219 #[cfg(feature = "memory")]
220 memory_guard,
221 shutdown,
222 context: ctx,
223 #[cfg(feature = "worker-pool")]
224 worker_pool,
225 #[cfg(feature = "worker-batch")]
226 batch_engine,
227 #[cfg(feature = "scaling")]
228 scaling,
229 })
230 }
231
232 /// Set the readiness check callback.
233 ///
234 /// Each app defines its own readiness criteria. Call this in `run_service()`
235 /// once you know what "ready" means for your app.
236 pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
237 self.metrics.set_readiness_check(check);
238 }
239
240 /// Return the batch processing engine, if the `worker-batch` feature is
241 /// enabled and the worker pool was successfully created.
242 #[cfg(feature = "worker-batch")]
243 #[must_use]
244 pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
245 self.batch_engine.as_ref()
246 }
247}