Skip to main content

hyperi_rustlib/cli/
runtime.rs

1// Project:   hyperi-rustlib
2// File:      src/cli/runtime.rs
3// Purpose:   ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. It contains all the common infrastructure
13//! that every DFE app needs -- metrics, memory guard, scaling, shutdown,
14//! worker pool, and runtime context. Apps receive it fully wired.
15//!
16//! This eliminates ~50 lines of identical boilerplate per DFE app.
17//!
18//! ## What's included (always)
19//!
20//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
21//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
22//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
23//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
24//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
25//!
26//! ## What's included (when features enabled)
27//!
28//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
29//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
30//!
31//! ## What stays app-specific
32//!
33//! - Readiness check criteria (each app defines "ready" differently)
34//! - Config hot-reload (optional, app-specific reload logic)
35//! - Pipeline creation (100% domain-specific)
36//! - DLQ setup (varies per app)
37//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
38
39use std::sync::Arc;
40
41use tokio_util::sync::CancellationToken;
42
43use crate::env::{RuntimeContext, runtime_context};
44#[cfg(feature = "memory")]
45use crate::memory::{MemoryGuard, MemoryGuardConfig};
46use crate::metrics::MetricsManager;
47
48use super::error::CliError;
49
50/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
51///
52/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
53///
54/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
55/// are automatically disabled. On K8s, they're automatically enabled.
56pub struct ServiceRuntime {
57    /// Metrics manager -- already started, serving endpoints.
58    /// Use for registering app-specific metrics and metric groups.
59    pub metrics: MetricsManager,
60
61    /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
62    pub dfe: Arc<crate::metrics::DfeMetrics>,
63
64    /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
65    /// Auto-detected from env prefix + cgroup limits.
66    #[cfg(feature = "memory")]
67    pub memory_guard: Arc<MemoryGuard>,
68
69    /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
70    /// Clone and pass to your pipeline loops.
71    pub shutdown: CancellationToken,
72
73    /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
74    pub context: &'static RuntimeContext,
75
76    /// Adaptive worker pool for parallel batch processing (`worker` feature).
77    /// `None` if the `worker` feature is not enabled or config fails.
78    #[cfg(feature = "worker-pool")]
79    pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
80
81    /// Batch processing engine with SIMD parsing and pre-route filtering
82    /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
83    /// or worker pool creation failed.
84    #[cfg(feature = "worker-batch")]
85    pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
86
87    /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
88    /// `None` if the `scaling` feature is not enabled.
89    #[cfg(feature = "scaling")]
90    pub scaling: Option<Arc<crate::ScalingPressure>>,
91
92    /// Self-regulation governor (`governor` feature). Built default-ON (opt-out
93    /// via `self_regulation.enabled = false`) from the cascade. `None` when
94    /// disabled by config -- in which case nothing was constructed and the data
95    /// path is byte-identical to pre-governor behaviour.
96    ///
97    /// Thread [`pressure`](crate::SelfRegulationGovernor::pressure) into your
98    /// receive transports' inbound gate / `with_pressure` hooks, and the
99    /// [`budget`](crate::SelfRegulationGovernor::budget) is already wired into
100    /// the [`batch_engine`](Self::batch_engine) governed run path.
101    #[cfg(feature = "governor")]
102    pub governor: Option<crate::SelfRegulationGovernor>,
103}
104
105impl ServiceRuntime {
106    /// Build the service runtime from app configuration.
107    ///
108    /// This is called by `run_app()` -- apps don't call it directly.
109    ///
110    /// # Errors
111    ///
112    /// Returns `CliError` if the metrics server fails to start.
113    pub(crate) async fn build(
114        app_name: &str,
115        env_prefix: &str,
116        metrics_addr: &str,
117        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
118        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
119        #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
120    ) -> Result<Self, CliError> {
121        let ctx = runtime_context();
122
123        // --- Metrics ---
124        let mut metrics = MetricsManager::new(app_name);
125        let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
126
127        // App info metric (version, commit, service name)
128        #[cfg(feature = "metrics-dfe")]
129        {
130            let _app_metrics =
131                crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
132        }
133
134        // --- Memory guard ---
135        #[cfg(feature = "memory")]
136        let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
137
138        // --- Self-regulation governor (default-ON, opt-out) ---
139        //
140        // Constructed HERE -- before the worker pool, batch engine, and the
141        // transports the app builds in run_service() -- so the shared pressure
142        // and byte budget can be threaded into all of them. When
143        // `self_regulation.enabled = false`, `build` returns None and nothing
144        // is constructed: every downstream Option stays None and the data path
145        // is byte-identical to pre-governor behaviour.
146        #[cfg(feature = "governor")]
147        let governor = crate::SelfRegulationConfig::from_cascade().build(Arc::clone(&memory_guard));
148
149        // --- Scaling pressure ---
150        #[cfg(feature = "scaling")]
151        let scaling = {
152            let config = crate::ScalingPressureConfig::from_cascade();
153            let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
154            metrics.set_scaling_pressure(Arc::clone(&pressure));
155            Some(pressure)
156        };
157
158        // --- Worker pool ---
159        #[cfg(feature = "worker-pool")]
160        let worker_pool = {
161            match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
162                Ok(pool) => {
163                    let pool = Arc::new(pool);
164                    pool.register_metrics(&metrics);
165                    #[cfg(feature = "memory")]
166                    pool.set_memory_guard(Arc::clone(&memory_guard));
167                    #[cfg(feature = "scaling")]
168                    if let Some(ref sp) = scaling {
169                        pool.set_scaling_pressure(Arc::clone(sp));
170                    }
171                    tracing::info!(
172                        max_threads = pool.max_threads(),
173                        "Adaptive worker pool enabled"
174                    );
175                    Some(pool)
176                }
177                Err(e) => {
178                    tracing::warn!(
179                        error = %e,
180                        "Worker pool not configured, falling back to sequential"
181                    );
182                    None
183                }
184            }
185        };
186
187        // --- Batch engine (worker-batch tier only) ---
188        #[cfg(feature = "worker-batch")]
189        let batch_engine = {
190            if let Some(ref pool) = worker_pool {
191                let config =
192                    crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
193                        .unwrap_or_default();
194                let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
195                engine.auto_wire(
196                    &metrics,
197                    #[cfg(feature = "memory")]
198                    Some(&memory_guard),
199                );
200                // Wire the governor's byte-budget lever so the engine's governed
201                // run path streams in budget-sized sub-blocks. None (governor
202                // off) leaves the engine on the whole-batch loop.
203                #[cfg(feature = "governor")]
204                if let Some(ref gov) = governor {
205                    engine.set_byte_budget(gov.budget());
206                }
207                Some(Arc::new(engine))
208            } else {
209                None
210            }
211        };
212
213        // --- Shutdown ---
214        let shutdown = crate::shutdown::install_signal_handler();
215
216        // Start worker pool scaling loop after shutdown token exists
217        #[cfg(feature = "worker-pool")]
218        if let Some(ref pool) = worker_pool {
219            pool.start_scaling_loop(shutdown.clone());
220        }
221
222        // --- Start metrics server ---
223        if let Err(e) = metrics.start_server(metrics_addr).await {
224            tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
225        }
226
227        // --- Version check (fire-and-forget) ---
228        #[cfg(feature = "version-check")]
229        {
230            crate::VersionCheck::new(crate::VersionCheckConfig {
231                product: app_name.to_string(),
232                current_version: version.to_string(),
233                ..Default::default()
234            })
235            .check_on_startup();
236        }
237
238        // Log runtime context
239        tracing::info!(
240            environment = %ctx.environment,
241            pod_name = ?ctx.pod_name,
242            namespace = ?ctx.namespace,
243            "Service runtime initialised"
244        );
245
246        Ok(Self {
247            metrics,
248            dfe,
249            #[cfg(feature = "memory")]
250            memory_guard,
251            shutdown,
252            context: ctx,
253            #[cfg(feature = "worker-pool")]
254            worker_pool,
255            #[cfg(feature = "worker-batch")]
256            batch_engine,
257            #[cfg(feature = "scaling")]
258            scaling,
259            #[cfg(feature = "governor")]
260            governor,
261        })
262    }
263
264    /// Set the readiness check callback.
265    ///
266    /// Each app defines its own readiness criteria. Call this in `run_service()`
267    /// once you know what "ready" means for your app.
268    pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
269        self.metrics.set_readiness_check(check);
270    }
271
272    /// Return the batch processing engine, if the `worker-batch` feature is
273    /// enabled and the worker pool was successfully created.
274    #[cfg(feature = "worker-batch")]
275    #[must_use]
276    pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
277        self.batch_engine.as_ref()
278    }
279
280    /// Build a governed receive transport from config in ONE call
281    /// (`governor` + `transport` features).
282    ///
283    /// This is the receive-side analogue of the byte-budget wiring the runtime
284    /// already does for the [`batch_engine`](Self::batch_engine): it reads the
285    /// transport config at `key` and threads the runtime's
286    /// [`governor`](Self::governor) pressure into the receiver's inbound brake
287    /// (Kafka pause-partitions gate, HTTP/gRPC 503/`unavailable` shed) so an app
288    /// does NOT repeat the `gate_actuator -> InboundGate -> with_inbound_gate`
289    /// dance by hand.
290    ///
291    /// Construction order is respected for free: the
292    /// [`governor`](Self::governor) was built in `build` BEFORE
293    /// the app calls this in `run_service()`, so the pressure latch already
294    /// exists -- this just clones it (an `Arc` bump) into the transport.
295    ///
296    /// When the governor is disabled (`self_regulation.enabled = false`,
297    /// [`governor`](Self::governor) is `None`) this falls back to the plain
298    /// [`AnyReceiver::from_config`](crate::transport::factory::AnyReceiver::from_config)
299    /// so the data path stays byte-identical to pre-governor behaviour.
300    ///
301    /// # Errors
302    ///
303    /// Returns the underlying transport error if the config is missing/invalid
304    /// or the backend fails to construct.
305    #[cfg(all(feature = "governor", feature = "transport"))]
306    pub async fn governed_receiver(
307        &self,
308        key: &str,
309    ) -> Result<crate::transport::factory::AnyReceiver, crate::transport::TransportError> {
310        use crate::transport::factory::AnyReceiver;
311        match self.governor {
312            Some(ref gov) => AnyReceiver::from_config_with_governor(key, gov).await,
313            None => AnyReceiver::from_config(key).await,
314        }
315    }
316}