use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use crate::controllers::Controllers;
use crate::Module;
use std::io;
use std::pin::pin;
use std::process::ExitCode;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use serde::Deserialize;
use thiserror::Error;
use shiny_common::pointer_utils::ToArc;
use shiny_common::clock::SystemClock;
use shiny_configuration::{Configuration, GetConfigurationError};
use shiny_configuration::configuration_builder::BuildConfigurationError;
use shiny_configuration::configuration_provider::env_provider::EnvironmentVariablesProvider;
use shiny_configuration::configuration_provider::json5_provider::{CreateJson5ProviderFromFileError, Json5Provider};
use shiny_jobs::job_trigger::cron_trigger::{CronTrigger, NewCronTriggerError};
use shiny_jobs::job_trigger::interval_trigger::{IntervalTriggerFromStrError, IntervalTrigger};
use shiny_jobs::job_trigger::JobTrigger;
use shiny_jobs::jobs_executor::JobsExecutor;
use crate::context_factory::DefaultContextFactory;
pub struct Application {
controller: Controllers,
cancellation_signal_factory: Arc<dyn CancellationSignalFactory>,
jobs_configuration_factory: Arc<dyn ConfigurationFactory>,
dynamic_configuration_factory: Arc<dyn ConfigurationFactory>,
}
#[async_trait::async_trait]
pub trait CancellationSignalFactory {
async fn cancellation_signal(&self) -> io::Result<()>;
}
struct CtrlCCancellationSignalFactory;
#[async_trait::async_trait]
impl CancellationSignalFactory for CtrlCCancellationSignalFactory {
async fn cancellation_signal(&self) -> io::Result<()> {
tokio::signal::ctrl_c().await
}
}
impl Application {
pub fn new<TClients, TModule>(clients: TClients, module: TModule) -> Self
where
TModule: Module<Clients=TClients>,
{
Application {
controller: module.create(clients),
cancellation_signal_factory: CtrlCCancellationSignalFactory.arc(),
jobs_configuration_factory: DefaultJobsConfigurationFactory.arc(),
dynamic_configuration_factory: DefaultDynamicConfigurationFactory.arc(),
}
}
pub async fn run(self) -> ExitCode {
match self.run_internal().await {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
tracing::error!("Application failed: {}", error);
eprintln!("\n{}", MultilineDisplayAdapter(error));
ExitCode::FAILURE
}
}
}
async fn run_internal(self) -> Result<(), RunApplicationError> {
tracing::info!("Starting application");
let configuration = self.jobs_configuration_factory.configuration()?;
let jobs_configuration: HashMap<String, JobConfiguration> = configuration.get("jobs").map_err(RunApplicationError::FailedToGetJobsConfiguration)?;
let configuration = self.dynamic_configuration_factory.configuration()?;
let context_factory = DefaultContextFactory::new(configuration).arc();
let mut job_executor = JobsExecutor::new(context_factory);
let clock = SystemClock.arc();
for (name, job) in self.controller.job.take_jobs() {
if let Some(configuration) = jobs_configuration.get(&name) {
if configuration.enabled.unwrap_or(true) {
let trigger: Arc<dyn JobTrigger> = if configuration.schedule.starts_with("PT") {
let trigger = IntervalTrigger::from_str(&configuration.schedule)?;
tracing::info!("Registered job `{name}` with interval `{} seconds`", trigger.interval().as_secs());
trigger.arc()
} else {
let trigger = CronTrigger::new(&configuration.schedule, clock.clone())?;
tracing::info!("Registered job `{name}` with schedule `{}`", configuration.schedule);
trigger.arc()
};
job_executor.schedule(name, job, trigger);
} else {
tracing::info!("Job `{name}` disabled, skiping");
}
} else {
return Err(RunApplicationError::MissingConfigurationForJob(name));
}
}
let cancellation_signal = self.cancellation_signal_factory.cancellation_signal();
let cancellation_token = CancellationToken::new();
let mut future = pin!(job_executor.run(cancellation_token.child_token()));
tokio::select! {
_ = &mut future => {
return Err(RunApplicationError::JobExecutorStoppedUnexpectedly)
}
result = cancellation_signal => {
match result {
Ok(_) => {
tracing::info!("Cancellation signal received, starting graceful shutdown");
cancellation_token.cancel();
},
Err(err) => { return Err(RunApplicationError::FailedToRegisterCtrlCHandler(err)) }
}
}
}
let graceful_period = tokio::time::sleep(Duration::from_secs(10));
tokio::select! {
_ = &mut future => {
tracing::info!("Application successfully stopped");
Ok(())
}
_ = graceful_period => {
Err(RunApplicationError::JobExecutorFailedToStopDuringGracefulShutdownPeriod)
}
}
}
pub fn set_cancellation_signal_factory(&mut self, cancellation_signal_factory: Arc<dyn CancellationSignalFactory>) {
self.cancellation_signal_factory = cancellation_signal_factory;
}
pub fn set_infrastructure_configuration_factory(&mut self, configuration_factory: Arc<dyn ConfigurationFactory>) {
self.jobs_configuration_factory = configuration_factory;
}
pub fn set_dynamic_configuration_factory(&mut self, dynamic_configuration_factory: Arc<dyn ConfigurationFactory>) {
self.dynamic_configuration_factory = dynamic_configuration_factory;
}
}
#[derive(Debug, Error)]
enum RunApplicationError {
#[error("Failed to register control-c handler")]
FailedToRegisterCtrlCHandler(#[source] io::Error),
#[error("Failed to create configuration")]
FailedToCreateConfiguration(#[from] CreateInfrastructureConfigurationError),
#[error("Failed to get jobs configuration")]
FailedToGetJobsConfiguration(#[from] GetConfigurationError),
#[error("Failed to create interval job trigger")]
FailedToCreateIntervalJobTrigger(#[from] IntervalTriggerFromStrError),
#[error("Failed to create cron job trigger")]
FailedToCreateCronJobTrigger(#[from] NewCronTriggerError),
#[error("Missing configuration for job {0}")]
MissingConfigurationForJob(String),
#[error("Job executor failed to stop during graceful shutdown")]
JobExecutorFailedToStopDuringGracefulShutdownPeriod,
#[error("Job executor stopped unexpectedly")]
JobExecutorStoppedUnexpectedly,
}
pub trait ConfigurationFactory {
fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError>;
}
struct DefaultJobsConfigurationFactory;
impl ConfigurationFactory for DefaultJobsConfigurationFactory {
fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
let root_provider = Json5Provider::from_path("./configuration/jobs.json5")?;
let env_provider = EnvironmentVariablesProvider::new();
Ok(
Configuration::builder(root_provider)
.with_provider("environment", env_provider)
.build()?
)
}
}
struct DefaultDynamicConfigurationFactory;
impl ConfigurationFactory for DefaultDynamicConfigurationFactory {
fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
let root_provider = Json5Provider::from_path("./configuration/dynamic.json5")?;
let env_provider = EnvironmentVariablesProvider::new();
Ok(
Configuration::builder(root_provider)
.with_provider("environment", env_provider)
.build()?
)
}
}
#[derive(Debug, Error)]
pub enum CreateInfrastructureConfigurationError {
#[error("Failed to create yaml provider")]
FailedToCreateJson5Provider(#[from] CreateJson5ProviderFromFileError),
#[error("Failed to create configuration")]
FailedToCreateConfiguration(#[from] BuildConfigurationError),
}
#[derive(Deserialize)]
struct JobConfiguration {
schedule: String,
enabled: Option<bool>,
}
pub struct MultilineDisplayAdapter<E: Error>(pub E);
impl<E: Error> Display for MultilineDisplayAdapter<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Error: {}", self.0)?;
let mut cause = self.0.source();
if self.0.source().is_some() {
writeln!(f, "Caused by:")?;
}
while let Some(error) = cause {
writeln!(f, " {}", error)?;
cause = error.source();
}
Ok(())
}
}