use crate::{
app::{health::HealthChecker, RuntimeConfig, Startup},
core::{config::ConfigFromEnv, Spawner},
health::HealthChecked,
};
use futures_core::future::LocalBoxFuture;
use futures_util::future::FutureExt;
use humantime::format_duration;
use prometheus::{Encoder, TextEncoder};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
#[cfg(feature = "actix")]
use crate::app::health::HealthServer;
pub struct Main<'m> {
sub: SubMain<'m>,
}
impl<'m> Default for Main<'m> {
fn default() -> Self {
Self::new(RuntimeConfig::default())
}
}
impl<'m> Deref for Main<'m> {
type Target = SubMain<'m>;
fn deref(&self) -> &Self::Target {
&self.sub
}
}
impl<'m> DerefMut for Main<'m> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sub
}
}
impl<'m> Main<'m> {
pub fn new(config: RuntimeConfig) -> Self {
Self {
sub: SubMain::new(config, Default::default()),
}
}
pub fn from_env() -> anyhow::Result<Self> {
Ok(Self::new(RuntimeConfig::from_env_prefix("RUNTIME")?))
}
pub fn add_tasks<I>(mut self, tasks: I) -> Self
where
I: IntoIterator<Item = LocalBoxFuture<'m, anyhow::Result<()>>>,
{
self.extend(tasks);
self
}
pub fn add_checks<I>(&mut self, i: I)
where
I: IntoIterator<Item = Box<dyn HealthChecked>>,
{
self.sub.health.extend(i);
}
pub async fn run(mut self) -> anyhow::Result<()> {
log::info!("Starting main ...");
log::debug!("Runtime configuration: {:#?}", self.config);
self.run_console_metrics();
self.run_health_server();
self.sub.run().await
}
#[cfg(feature = "actix")]
fn run_health_server(&mut self) {
log::info!("Health server: {}", self.config.health.enabled);
if self.config.health.enabled {
let health = HealthServer::new(
self.config.health.clone(),
self.health.clone(),
Some(prometheus::default_registry().clone()),
);
self.tasks.push(health.run().boxed());
}
}
#[cfg(not(feature = "actix"))]
fn run_health_server(&self) {
log::info!(
"No health server implementation (required?: {})",
self.config.health.enabled
);
if self.config.health.enabled {
panic!("Unable to run health endpoint without 'actix' feature. Either enable 'actix' during compilation or disable the health server during runtime.");
}
}
fn run_console_metrics(&mut self) {
if self.config.console_metrics.enabled {
let period = self.config.console_metrics.period;
self.tasks.push(
async move {
log::info!(
"Starting console metrics loop ({})...",
format_duration(period)
);
let encoder = TextEncoder::new();
loop {
let metric_families = prometheus::gather();
{
let mut out = std::io::stdout().lock();
encoder.encode(&metric_families, &mut out).unwrap();
}
tokio::time::sleep(period).await;
}
}
.boxed(),
);
}
}
}
impl Spawner for Main<'_> {
fn spawn_boxed(&mut self, future: Pin<Box<dyn Future<Output = anyhow::Result<()>>>>) {
SubMain::spawn_boxed(self, future)
}
}
impl Startup for Main<'_> {
fn check_boxed(&mut self, check: Box<dyn HealthChecked>) {
SubMain::check_boxed(self, check)
}
fn use_tracing(&self) -> bool {
SubMain::use_tracing(self)
}
fn runtime_config(&self) -> &RuntimeConfig {
SubMain::runtime_config(self)
}
}
pub struct SubMain<'m> {
config: RuntimeConfig,
tasks: Vec<LocalBoxFuture<'m, anyhow::Result<()>>>,
health: HealthChecker,
}
impl SubMain<'_> {
pub(crate) fn new(config: RuntimeConfig, health: HealthChecker) -> Self {
Self {
config,
tasks: Default::default(),
health,
}
}
pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
}
pub fn sub_main(&self) -> SubMain {
self.sub_main_seed().into()
}
pub fn sub_main_seed(&self) -> SubMainSeed {
SubMainSeed::new(self.config.clone(), self.health.clone())
}
pub async fn run(self) -> anyhow::Result<()> {
log::info!("Running {} tasks in this main instance", self.tasks.len());
let (result, _, _) = futures_util::future::select_all(self.tasks).await;
log::warn!("One of the main runners returned: {result:?}");
log::warn!("Exiting application...");
Ok(())
}
}
impl<'m> Extend<LocalBoxFuture<'m, Result<(), anyhow::Error>>> for SubMain<'m> {
fn extend<T: IntoIterator<Item = LocalBoxFuture<'m, anyhow::Result<()>>>>(&mut self, iter: T) {
self.tasks.extend(iter)
}
}
impl<'m> Spawner for SubMain<'m> {
fn spawn_boxed(&mut self, future: Pin<Box<dyn Future<Output = anyhow::Result<()>>>>) {
self.tasks.push(future);
}
}
impl<'m> Startup for SubMain<'m> {
fn check_boxed(&mut self, check: Box<dyn HealthChecked>) {
self.health.push(check);
}
fn use_tracing(&self) -> bool {
self.config.tracing.is_enabled()
}
fn runtime_config(&self) -> &RuntimeConfig {
&self.config
}
}
pub struct SubMainSeed {
config: RuntimeConfig,
health: HealthChecker,
}
impl SubMainSeed {
fn new(config: RuntimeConfig, health: HealthChecker) -> Self {
Self { config, health }
}
}
impl From<SubMainSeed> for SubMain<'_> {
fn from(seed: SubMainSeed) -> Self {
Self {
config: seed.config,
health: seed.health,
tasks: Default::default(),
}
}
}