use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::env::{RuntimeContext, runtime_context};
#[cfg(feature = "memory")]
use crate::memory::{MemoryGuard, MemoryGuardConfig};
use crate::metrics::MetricsManager;
use super::error::CliError;
pub struct ServiceRuntime {
pub metrics: MetricsManager,
pub dfe: Arc<crate::metrics::DfeMetrics>,
#[cfg(feature = "memory")]
pub memory_guard: Arc<MemoryGuard>,
pub shutdown: CancellationToken,
pub context: &'static RuntimeContext,
#[cfg(feature = "worker-pool")]
pub worker_pool: Option<Arc<crate::worker::AdaptiveWorkerPool>>,
#[cfg(feature = "worker-batch")]
pub batch_engine: Option<Arc<crate::worker::BatchEngine>>,
#[cfg(feature = "scaling")]
pub scaling: Option<Arc<crate::ScalingPressure>>,
}
impl ServiceRuntime {
pub(crate) async fn build(
app_name: &str,
env_prefix: &str,
metrics_addr: &str,
#[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] version: &str,
#[cfg_attr(not(feature = "metrics-dfe"), allow(unused_variables))] commit: &str,
#[cfg(feature = "scaling")] scaling_components: Vec<crate::ScalingComponent>,
) -> Result<Self, CliError> {
let ctx = runtime_context();
let mut metrics = MetricsManager::new(app_name);
let dfe = Arc::new(crate::metrics::DfeMetrics::register(&metrics));
#[cfg(feature = "metrics-dfe")]
{
let _app_metrics =
crate::metrics::dfe_groups::AppMetrics::new(&metrics, version, commit);
}
#[cfg(feature = "memory")]
let memory_guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::from_env(env_prefix)));
#[cfg(feature = "scaling")]
let scaling = {
let config = crate::ScalingPressureConfig::from_cascade();
let pressure = Arc::new(crate::ScalingPressure::new(config, scaling_components));
metrics.set_scaling_pressure(Arc::clone(&pressure));
Some(pressure)
};
#[cfg(feature = "worker-pool")]
let worker_pool = {
match crate::worker::AdaptiveWorkerPool::from_cascade("worker_pool") {
Ok(pool) => {
let pool = Arc::new(pool);
pool.register_metrics(&metrics);
#[cfg(feature = "memory")]
pool.set_memory_guard(Arc::clone(&memory_guard));
#[cfg(feature = "scaling")]
if let Some(ref sp) = scaling {
pool.set_scaling_pressure(Arc::clone(sp));
}
tracing::info!(
max_threads = pool.max_threads(),
"Adaptive worker pool enabled"
);
Some(pool)
}
Err(e) => {
tracing::warn!(
error = %e,
"Worker pool not configured, falling back to sequential"
);
None
}
}
};
#[cfg(feature = "worker-batch")]
let batch_engine = {
if let Some(ref pool) = worker_pool {
let config =
crate::worker::engine::BatchProcessingConfig::from_cascade("batch_processing")
.unwrap_or_default();
let mut engine = crate::worker::BatchEngine::with_pool(Arc::clone(pool), config);
engine.auto_wire(
&metrics,
#[cfg(feature = "memory")]
Some(&memory_guard),
);
Some(Arc::new(engine))
} else {
None
}
};
let shutdown = crate::shutdown::install_signal_handler();
#[cfg(feature = "worker-pool")]
if let Some(ref pool) = worker_pool {
pool.start_scaling_loop(shutdown.clone());
}
if let Err(e) = metrics.start_server(metrics_addr).await {
tracing::error!(error = %e, addr = metrics_addr, "Failed to start metrics server");
}
#[cfg(feature = "version-check")]
{
crate::VersionCheck::new(crate::VersionCheckConfig {
product: app_name.to_string(),
current_version: version.to_string(),
..Default::default()
})
.check_on_startup();
}
tracing::info!(
environment = %ctx.environment,
pod_name = ?ctx.pod_name,
namespace = ?ctx.namespace,
"Service runtime initialised"
);
Ok(Self {
metrics,
dfe,
#[cfg(feature = "memory")]
memory_guard,
shutdown,
context: ctx,
#[cfg(feature = "worker-pool")]
worker_pool,
#[cfg(feature = "worker-batch")]
batch_engine,
#[cfg(feature = "scaling")]
scaling,
})
}
pub fn set_readiness_check<F: Fn() -> bool + Send + Sync + 'static>(&mut self, check: F) {
self.metrics.set_readiness_check(check);
}
#[cfg(feature = "worker-batch")]
#[must_use]
pub fn batch_engine(&self) -> Option<&Arc<crate::worker::BatchEngine>> {
self.batch_engine.as_ref()
}
}