use std::collections::HashMap;
use crate::controllers::Controllers;
use crate::Module;
use std::io;
use std::process::ExitCode;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use serde::Deserialize;
use thiserror::Error;
use shiny_common::clock::SystemClock;
use shiny_common::error::MultilineDisplayAdapter;
use shiny_configuration::{Configuration, GetConfigurationError};
use shiny_configuration::configuration_builder::BuildConfigurationError;
use shiny_configuration::configuration_provider::env_provider::EnvironmentVariablesProvider;
use shiny_configuration::configuration_provider::yaml_provider::{CreateYamlProviderFromFileError, YamlProvider};
use shiny_jobs::job_trigger::cron_trigger::{CronTrigger, NewCronTriggerError};
use shiny_jobs::job_trigger::interval_trigger::{IntervalTriggerFromStrError, IntervalTrigger};
use shiny_jobs::job_trigger::JobTrigger;
use shiny_jobs::jobs_executor::JobsExecutor;
use crate::context_factory::DefaultContextFactory;
pub struct Application {
controller: Controllers,
cancellation_signal_factory: Arc<dyn CancellationSignalFactory>,
jobs_configuration_factory: Arc<dyn ConfigurationFactory>,
dynamic_configuration_factory: Arc<dyn ConfigurationFactory>,
}
#[async_trait::async_trait]
pub trait CancellationSignalFactory {
async fn cancellation_signal(&self) -> io::Result<()>;
}
struct CtrlCCancellationSignalFactory;
#[async_trait::async_trait]
impl CancellationSignalFactory for CtrlCCancellationSignalFactory {
async fn cancellation_signal(&self) -> io::Result<()> {
tokio::signal::ctrl_c().await
}
}
impl Application {
pub fn new<TClients, TModule>(clients: TClients, module: TModule) -> Self
where
TModule: Module<Clients=TClients>,
{
Application {
controller: module.create(clients),
cancellation_signal_factory: Arc::new(CtrlCCancellationSignalFactory),
jobs_configuration_factory: Arc::new(DefaultJobsConfigurationFactory),
dynamic_configuration_factory: Arc::new(DefaultDynamicConfigurationFactory),
}
}
pub async fn run(self) -> ExitCode {
match self.run_internal().await {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
tracing::error!("Application failed: {}", error);
eprintln!("\n{}", MultilineDisplayAdapter(error));
ExitCode::FAILURE
}
}
}
async fn run_internal(self) -> Result<(), RunApplicationError> {
tracing::info!("Starting application");
let configuration = self.jobs_configuration_factory.configuration()?;
let jobs_configuration: HashMap<String, JobConfiguration> = configuration.get("jobs").map_err(RunApplicationError::FailedToGetJobsConfiguration)?;
let configuration = self.dynamic_configuration_factory.configuration()?;
let context_factory = DefaultContextFactory::new(configuration);
let mut job_executor = JobsExecutor::new(Arc::new(context_factory));
let clock = Arc::new(SystemClock);
for (name, job) in self.controller.job.take_jobs() {
if let Some(configuration) = jobs_configuration.get(&name) {
if configuration.enabled.unwrap_or(true) {
let trigger: Arc<dyn JobTrigger> = if configuration.schedule.starts_with("PT") {
let trigger = IntervalTrigger::from_str(&configuration.schedule)?;
tracing::info!("Registered job `{name}` with interval `{} seconds`", trigger.interval().as_secs());
Arc::new(trigger)
} else {
let trigger = CronTrigger::new(&configuration.schedule, clock.clone())?;
tracing::info!("Registered job `{name}` with schedule `{}`", configuration.schedule);
Arc::new(trigger)
};
job_executor.schedule(name, job, trigger);
}
}
}
let cancellation_signal = self.cancellation_signal_factory.cancellation_signal();
let cancellation_token = CancellationToken::new();
tokio::select! {
_ = job_executor.run(cancellation_token.child_token()) => {
return Err(RunApplicationError::JobExecutorStoppedUnexpectedly)
}
result = cancellation_signal => {
match result {
Ok(_) => {
tracing::info!("Cancellation signal received, starting graceful shutdown");
cancellation_token.cancel();
},
Err(err) => { return Err(RunApplicationError::FailedToRegisterCtrlCHandler(err)) }
}
}
}
let graceful_period = tokio::time::sleep(Duration::from_secs(10));
tokio::select! {
_ = job_executor.run(cancellation_token.child_token()) => {
tracing::info!("Application successfully stopped");
Ok(())
}
_ = graceful_period => {
return Err(RunApplicationError::JobExecutorFailedToStopDuringGracefulShutdownPeriod)
}
}
}
pub fn set_cancellation_signal_factory(&mut self, cancellation_signal_factory: Arc<dyn CancellationSignalFactory>) {
self.cancellation_signal_factory = cancellation_signal_factory;
}
pub fn set_infrastructure_configuration_factory(&mut self, configuration_factory: Arc<dyn ConfigurationFactory>) {
self.jobs_configuration_factory = configuration_factory;
}
pub fn set_dynamic_configuration_factory(&mut self, dynamic_configuration_factory: Arc<dyn ConfigurationFactory>) {
self.dynamic_configuration_factory = dynamic_configuration_factory;
}
}
#[derive(Debug, Error)]
enum RunApplicationError {
#[error("Failed to register control-c handler")]
FailedToRegisterCtrlCHandler(#[source] io::Error),
#[error("Failed to create configuration")]
FailedToCreateConfiguration(#[from] CreateInfrastructureConfigurationError),
#[error("Failed to get jobs configuration")]
FailedToGetJobsConfiguration(#[from] GetConfigurationError),
#[error("Failed to create interval job trigger")]
FailedToCreateIntervalJobTrigger(#[from] IntervalTriggerFromStrError),
#[error("Failed to create cron job trigger")]
FailedToCreateCronJobTrigger(#[from] NewCronTriggerError),
#[error("Job executor failed to stop during graceful shutdown")]
JobExecutorFailedToStopDuringGracefulShutdownPeriod,
#[error("Job executor stopped unexpectedly")]
JobExecutorStoppedUnexpectedly,
}
pub trait ConfigurationFactory {
fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError>;
}
struct DefaultJobsConfigurationFactory;
impl ConfigurationFactory for DefaultJobsConfigurationFactory {
fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
let root_provider = YamlProvider::from_path("./configuration/production/infrastructure.yaml")?;
let env_provider = EnvironmentVariablesProvider::new();
Ok(
Configuration::builder(root_provider)
.with_provider("environment", env_provider)
.build()?
)
}
}
struct DefaultDynamicConfigurationFactory;
impl ConfigurationFactory for DefaultDynamicConfigurationFactory {
fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
let root_provider = YamlProvider::from_path("./configuration/production/dynamic.yaml")?;
let env_provider = EnvironmentVariablesProvider::new();
Ok(
Configuration::builder(root_provider)
.with_provider("environment", env_provider)
.build()?
)
}
}
#[derive(Debug, Error)]
pub enum CreateInfrastructureConfigurationError {
#[error("Failed to create yaml provider")]
FailedToCreateYamlProvider(#[from] CreateYamlProviderFromFileError),
#[error("Failed to create configuration")]
FailedToCreateConfiguration(#[from] BuildConfigurationError),
}
#[derive(Deserialize)]
struct JobConfiguration {
schedule: String,
enabled: Option<bool>,
}