Skip to main content

hyperi_rustlib/cli/
runtime.rs

1// Project:   hyperi-rustlib
2// File:      src/cli/runtime.rs
3// Purpose:   ServiceRuntime -- pre-built infrastructure for DFE service apps
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-built service infrastructure for DFE pipeline applications.
10//!
11//! [`ServiceRuntime`] is created by [`super::run_app`] before calling
12//! [`DfeApp::run_service`]. It contains all the common infrastructure
13//! that every DFE app needs -- metrics, memory guard, scaling, shutdown,
14//! worker pool, and runtime context. Apps receive it fully wired.
15//!
16//! This eliminates ~50 lines of identical boilerplate per DFE app.
17//!
18//! ## What's included (always)
19//!
20//! - [`MetricsManager`] -- started, serving `/metrics`, `/healthz`, `/readyz`
21//! - [`DfeMetrics`] -- platform `dfe_*` metrics registered
22//! - [`MemoryGuard`] -- cgroup-aware, auto-detected from env prefix
23//! - [`CancellationToken`] -- signal handler installed with K8s pre-stop delay
24//! - [`RuntimeContext`] -- K8s/Docker/BareMetal metadata
25//!
26//! ## What's included (when features enabled)
27//!
28//! - [`AdaptiveWorkerPool`] -- rayon + tokio hybrid (`worker` feature)
29//! - [`ScalingPressure`] -- KEDA signals (`scaling` feature)
30//!
31//! ## What stays app-specific
32//!
33//! - Readiness check criteria (each app defines "ready" differently)
34//! - Config hot-reload (optional, app-specific reload logic)
35//! - Pipeline creation (100% domain-specific)
36//! - DLQ setup (varies per app)
37//! - App-specific metric groups (ConsumerMetrics, BufferMetrics, etc.)
38
39use std::sync::Arc;
40
41use tokio_util::sync::CancellationToken;
42
43use crate::env::{RuntimeContext, runtime_context};
44#[cfg(feature = "memory")]
45use crate::memory::{MemoryGuard, MemoryGuardConfig};
46use crate::metrics::MetricsManager;
47
48use super::error::CliError;
49
50/// Pre-built service infrastructure. Created by `run_app()` before `run_service()`.
51///
52/// Apps receive this fully wired -- they just use the fields. No boilerplate needed.
53///
54/// On bare metal, K8s-specific features (pre-stop delay, pod metadata in logs)
55/// are automatically disabled. On K8s, they're automatically enabled.
56pub struct ServiceRuntime {
57    /// Metrics manager -- already started, serving endpoints.
58    /// Use for registering app-specific metrics and metric groups.
59    pub metrics: MetricsManager,
60
61    /// Platform DFE metrics (`dfe_*` counters/gauges). Already registered.
62    pub dfe: Arc<crate::metrics::DfeMetrics>,
63
64    /// Cgroup-aware memory guard. Tracks memory usage for backpressure.
65    /// Auto-detected from env prefix + cgroup limits.
66    #[cfg(feature = "memory")]
67    pub memory_guard: Arc<MemoryGuard>,
68
69    /// Shutdown token. Cancelled on SIGTERM/SIGINT (with K8s pre-stop delay).
70    /// Clone and pass to your pipeline loops.
71    pub shutdown: CancellationToken,
72
73    /// Runtime context -- K8s/Docker/BareMetal metadata (pod_name, namespace, etc.).
74    pub context: &'static RuntimeContext,
75
76    /// Adaptive worker pool for parallel batch processing (`worker` feature).
77    /// `None` if the `worker` feature is not enabled or config fails.
78    #[cfg(feature = "worker-pool")]
79    pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
80
81    /// Batch processing engine with SIMD parsing and pre-route filtering
82    /// (`worker-batch` feature). `None` if `worker-batch` is not enabled
83    /// or worker pool creation failed.
84    #[cfg(feature = "worker-batch")]
85    pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
86
87    /// Scaling pressure calculator for KEDA autoscaling (`scaling` feature).
88    /// `None` if the `scaling` feature is not enabled.
89    #[cfg(feature = "scaling")]
90    pub scaling: Option<Arc<crate::ScalingPressure>>,
91}
92
93impl ServiceRuntime {
94    /// Build the service runtime from app configuration.
95    ///
96    /// This is called by `run_app()` -- apps don't call it directly.
97    ///
98    /// # Errors
99    ///
100    /// Returns `CliError` if the metrics server fails to start.
101    pub(crate) async fn build(
102        app_name: &str,
103        env_prefix: &str,
104        metrics_addr: &str,
105        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
106        #[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
107        #[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
108    ) -> Result<Self, CliError> {
109        let ctx = runtime_context();
110
111        // --- Metrics ---
112        let mut metrics = MetricsManager::new(app_name);
113        let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
114
115        // App info metric (version, commit, service name)
116        #[cfg(feature = "metrics-dfe")]
117        {
118            let _app_metrics =
119                crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
120        }
121
122        // --- Memory guard ---
123        #[cfg(feature = "memory")]
124        let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
125
126        // --- Scaling pressure ---
127        #[cfg(feature = "scaling")]
128        let scaling = {
129            let config = crate::ScalingPressureConfig::from_cascade();
130            let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
131            metrics.set_scaling_pressure(Arc::clone(&pressure));
132            Some(pressure)
133        };
134
135        // --- Worker pool ---
136        #[cfg(feature = "worker-pool")]
137        let worker_pool = {
138            match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
139                Ok(pool) => {
140                    let pool = Arc::new(pool);
141                    pool.register_metrics(&metrics);
142                    #[cfg(feature = "memory")]
143                    pool.set_memory_guard(Arc::clone(&memory_guard));
144                    #[cfg(feature = "scaling")]
145                    if let Some(ref sp) = scaling {
146                        pool.set_scaling_pressure(Arc::clone(sp));
147                    }
148                    tracing::info!(
149                        max_threads = pool.max_threads(),
150                        "Adaptive worker pool enabled"
151                    );
152                    Some(pool)
153                }
154                Err(e) => {
155                    tracing::warn!(
156                        error = %e,
157                        "Worker pool not configured, falling back to sequential"
158                    );
159                    None
160                }
161            }
162        };
163
164        // --- Batch engine (worker-batch tier only) ---
165        #[cfg(feature = "worker-batch")]
166        let batch_engine = {
167            if let Some(ref pool) = worker_pool {
168                let config =
169                    crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
170                        .unwrap_or_default();
171                let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
172                engine.auto_wire(
173                    &metrics,
174                    #[cfg(feature = "memory")]
175                    Some(&memory_guard),
176                );
177                Some(Arc::new(engine))
178            } else {
179                None
180            }
181        };
182
183        // --- Shutdown ---
184        let shutdown = crate::shutdown::install_signal_handler();
185
186        // Start worker pool scaling loop after shutdown token exists
187        #[cfg(feature = "worker-pool")]
188        if let Some(ref pool) = worker_pool {
189            pool.start_scaling_loop(shutdown.clone());
190        }
191
192        // --- Start metrics server ---
193        if let Err(e) = metrics.start_server(metrics_addr).await {
194            tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
195        }
196
197        // --- Version check (fire-and-forget) ---
198        #[cfg(feature = "version-check")]
199        {
200            crate::VersionCheck::new(crate::VersionCheckConfig {
201                product: app_name.to_string(),
202                current_version: version.to_string(),
203                ..Default::default()
204            })
205            .check_on_startup();
206        }
207
208        // Log runtime context
209        tracing::info!(
210            environment = %ctx.environment,
211            pod_name = ?ctx.pod_name,
212            namespace = ?ctx.namespace,
213            "Service runtime initialised"
214        );
215
216        Ok(Self {
217            metrics,
218            dfe,
219            #[cfg(feature = "memory")]
220            memory_guard,
221            shutdown,
222            context: ctx,
223            #[cfg(feature = "worker-pool")]
224            worker_pool,
225            #[cfg(feature = "worker-batch")]
226            batch_engine,
227            #[cfg(feature = "scaling")]
228            scaling,
229        })
230    }
231
232    /// Set the readiness check callback.
233    ///
234    /// Each app defines its own readiness criteria. Call this in `run_service()`
235    /// once you know what "ready" means for your app.
236    pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
237        self.metrics.set_readiness_check(check);
238    }
239
240    /// Return the batch processing engine, if the `worker-batch` feature is
241    /// enabled and the worker pool was successfully created.
242    #[cfg(feature = "worker-batch")]
243    #[must_use]
244    pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
245        self.batch_engine.as_ref()
246    }
247}