Skip to main content

hyperi_rustlib/cli/
runtime.rs

1// Project:   hyperi-rustlib
2// File:      src/cli/runtime.rs
3// Purpose:   ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. Apps receive it fully wired -- eliminates
13//! ~50 lines of identical boilerplate per DFE app.
14//!
15//! ## What's included (always)
16//!
17//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
18//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
19//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
20//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
21//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
22//!
23//! ## What's included (when features enabled)
24//!
25//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
26//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
27//!
28//! ## What stays app-specific
29//!
30//! - Readiness check criteria (each app defines "ready" differently)
31//! - Config hot-reload (optional, app-specific reload logic)
32//! - Pipeline creation (100% domain-specific)
33//! - DLQ setup (varies per app)
34//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
35
36use std::sync::Arc;
37
38use tokio_util::sync::CancellationToken;
39
40use crate::env::{RuntimeContext, runtime_context};
41#[cfg(feature = "memory")]
42use crate::memory::{MemoryGuard, MemoryGuardConfig};
43use crate::metrics::MetricsManager;
44
45use super::error::CliError;
46
47/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
48///
49/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
50///
51/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
52/// are automatically disabled. On K8s, they're automatically enabled.
53pub struct ServiceRuntime {
54    /// Metrics manager -- already started, serving endpoints.
55    /// Use for registering app-specific metrics and metric groups.
56    pub metrics: MetricsManager,
57
58    /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
59    pub dfe: Arc<crate::metrics::DfeMetrics>,
60
61    /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
62    /// Auto-detected from env prefix + cgroup limits.
63    #[cfg(feature = "memory")]
64    pub memory_guard: Arc<MemoryGuard>,
65
66    /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
67    /// Clone and pass to your pipeline loops.
68    pub shutdown: CancellationToken,
69
70    /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
71    pub context: &'static RuntimeContext,
72
73    /// Adaptive worker pool for parallel batch processing (`worker` feature).
74    /// `None` if the `worker` feature is not enabled or config fails.
75    #[cfg(feature = "worker-pool")]
76    pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
77
78    /// Batch processing engine with SIMD parsing and pre-route filtering
79    /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
80    /// or worker pool creation failed.
81    #[cfg(feature = "worker-batch")]
82    pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
83
84    /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
85    /// `None` if the `scaling` feature is not enabled.
86    #[cfg(feature = "scaling")]
87    pub scaling: Option<Arc<crate::ScalingPressure>>,
88
89    /// Self-regulation governor (`governor` feature). Default-ON, opt-out via
90    /// `self_regulation.enabled = false`. `None` when disabled -- nothing is
91    /// constructed and the data path is byte-identical to pre-governor.
92    ///
93    /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
94    /// receive transports' inbound gate / `with_pressure` hooks. The
95    /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
96    /// the [`batch_engine`](Self::batch_engine) governed run path.
97    #[cfg(feature = "governor")]
98    pub governor: Option<crate::SelfRegulationGovernor>,
99}
100
101impl ServiceRuntime {
102    /// Build the service runtime from app configuration.
103    ///
104    /// This is called by `run_app()` -- apps don't call it directly.
105    ///
106    /// # Errors
107    ///
108    /// Returns `CliError` if the metrics server fails to start.
109    pub(crate) async fn build(
110        app_name: &str,
111        env_prefix: &str,
112        metrics_addr: &str,
113        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
114        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
115        #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
116    ) -> Result<Self, CliError> {
117        let ctx = runtime_context();
118
119        // --- Metrics ---
120        let mut metrics = MetricsManager::new(app_name);
121        let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
122
123        // App info metric (version, commit, service name)
124        #[cfg(feature = "metrics-dfe")]
125        {
126            let _app_metrics =
127                crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
128        }
129
130        // --- Memory guard ---
131        #[cfg(feature = "memory")]
132        let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
133
134        // --- Self-regulation governor (default-ON, opt-out) ---
135        //
136        // Constructed HERE -- before the worker pool, batch engine, and the
137        // transports the app builds in run_service() -- so the shared pressure
138        // and byte budget can be threaded into all of them. When
139        // `self_regulation.enabled = false`, `build` returns None and nothing
140        // is constructed: every downstream Option stays None and the data path
141        // is byte-identical to pre-governor behaviour.
142        #[cfg(feature = "governor")]
143        let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
144
145        // --- Scaling pressure ---
146        #[cfg(feature = "scaling")]
147        let scaling = {
148            let config = crate::ScalingPressureConfig::from_cascade();
149            let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
150            metrics.set_scaling_pressure(Arc::clone(&pressure));
151            Some(pressure)
152        };
153
154        // --- Worker pool ---
155        #[cfg(feature = "worker-pool")]
156        let worker_pool = {
157            match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
158                Ok(pool) => {
159                    let pool = Arc::new(pool);
160                    pool.register_metrics(&metrics);
161                    #[cfg(feature = "memory")]
162                    pool.set_memory_guard(Arc::clone(&memory_guard));
163                    #[cfg(feature = "scaling")]
164                    if let Some(ref sp) = scaling {
165                        pool.set_scaling_pressure(Arc::clone(sp));
166                    }
167                    tracing::info!(
168                        max_threads = pool.max_threads(),
169                        "Adaptive worker pool enabled"
170                    );
171                    Some(pool)
172                }
173                Err(e) => {
174                    tracing::warn!(
175                        error = %e,
176                        "Worker pool not configured, falling back to sequential"
177                    );
178                    None
179                }
180            }
181        };
182
183        // --- Batch engine (worker-batch tier only) ---
184        #[cfg(feature = "worker-batch")]
185        let batch_engine = {
186            if let Some(ref pool) = worker_pool {
187                let config =
188                    crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
189                        .unwrap_or_default();
190                let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
191                engine.auto_wire(
192                    &metrics,
193                    #[cfg(feature = "memory")]
194                    Some(&memory_guard),
195                );
196                // Wire the governor's byte-budget lever so the engine's governed
197                // run path streams in budget-sized sub-blocks. None (governor
198                // off) leaves the engine on the whole-batch loop.
199                #[cfg(feature = "governor")]
200                if let Some(ref gov) = governor {
201                    engine.set_byte_budget(gov.budget());
202                }
203                Some(Arc::new(engine))
204            } else {
205                None
206            }
207        };
208
209        // --- Shutdown ---
210        let shutdown = crate::shutdown::install_signal_handler();
211
212        // Start worker pool scaling loop after shutdown token exists
213        #[cfg(feature = "worker-pool")]
214        if let Some(ref pool) = worker_pool {
215            pool.start_scaling_loop(shutdown.clone());
216        }
217
218        // --- Start metrics server ---
219        if let Err(e) = metrics.start_server(metrics_addr).await {
220            tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
221        }
222
223        // --- Version check (fire-and-forget) ---
224        #[cfg(feature = "version-check")]
225        {
226            crate::VersionCheck::new(crate::VersionCheckConfig {
227                product: app_name.to_string(),
228                current_version: version.to_string(),
229                ..Default::default()
230            })
231            .check_on_startup();
232        }
233
234        // Log runtime context
235        tracing::info!(
236            environment = %ctx.environment,
237            pod_name = ?ctx.pod_name,
238            namespace = ?ctx.namespace,
239            "Service runtime initialised"
240        );
241
242        Ok(Self {
243            metrics,
244            dfe,
245            #[cfg(feature = "memory")]
246            memory_guard,
247            shutdown,
248            context: ctx,
249            #[cfg(feature = "worker-pool")]
250            worker_pool,
251            #[cfg(feature = "worker-batch")]
252            batch_engine,
253            #[cfg(feature = "scaling")]
254            scaling,
255            #[cfg(feature = "governor")]
256            governor,
257        })
258    }
259
260    /// Set the readiness check callback.
261    ///
262    /// Each app defines its own readiness criteria. Call this in `run_service()`
263    /// once you know what "ready" means for your app.
264    pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
265        self.metrics.set_readiness_check(check);
266    }
267
268    /// Return the batch processing engine, if the `worker-batch` feature is
269    /// enabled and the worker pool was successfully created.
270    #[cfg(feature = "worker-batch")]
271    #[must_use]
272    pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
273        self.batch_engine.as_ref()
274    }
275
276    /// Build a governed receive transport from config in ONE call
277    /// (`governor` + `transport` features).
278    ///
279    /// Reads the transport config at `key` and threads the runtime's
280    /// [`governor`](Self::governor) pressure into the receiver's inbound brake
281    /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so apps
282    /// skip the `gate_actuator -> InboundGate -> with_inbound_gate` dance.
283    ///
284    /// When the governor is disabled (`self_regulation.enabled = false`,
285    /// [`governor`](Self::governor) is `None`) this falls back to the plain
286    /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
287    /// -- data path stays byte-identical to pre-governor.
288    ///
289    /// # Errors
290    ///
291    /// Returns the underlying transport error if the config is missing/invalid
292    /// or the backend fails to construct.
293    #[cfg(all(feature = "governor", feature = "transport"))]
294    pub async fn governed_receiver(
295        &self,
296        key: &str,
297    ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
298        use crate::transport::factory::AnyReceiver;
299        match self.governor {
300            Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
301            None => AnyReceiver::from_config(key).await,
302        }
303    }
304}