shinyframework_application 0.1.2

Shiny Application
Documentation
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use crate::controllers::Controllers;
use crate::Module;
use std::io;
use std::pin::pin;
use std::process::ExitCode;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use serde::Deserialize;
use thiserror::Error;
use shiny_common::pointer_utils::ToArc;
use shiny_common::clock::SystemClock;
use shiny_configuration::{Configuration, GetConfigurationError};
use shiny_configuration::configuration_builder::BuildConfigurationError;
use shiny_configuration::configuration_provider::env_provider::EnvironmentVariablesProvider;
use shiny_configuration::configuration_provider::json5_provider::{CreateJson5ProviderFromFileError, Json5Provider};
use shiny_jobs::job_trigger::cron_trigger::{CronTrigger, NewCronTriggerError};
use shiny_jobs::job_trigger::interval_trigger::{IntervalTriggerFromStrError, IntervalTrigger};
use shiny_jobs::job_trigger::JobTrigger;
use shiny_jobs::jobs_executor::JobsExecutor;
use crate::context_factory::DefaultContextFactory;

pub struct Application {
    controller: Controllers,
    cancellation_signal_factory: Arc<dyn CancellationSignalFactory>,
    jobs_configuration_factory: Arc<dyn ConfigurationFactory>,
    dynamic_configuration_factory: Arc<dyn ConfigurationFactory>,
}

#[async_trait::async_trait]
pub trait CancellationSignalFactory {
    async fn cancellation_signal(&self) -> io::Result<()>;
}

struct CtrlCCancellationSignalFactory;

#[async_trait::async_trait]
impl CancellationSignalFactory for CtrlCCancellationSignalFactory {
    async fn cancellation_signal(&self) -> io::Result<()> {
        tokio::signal::ctrl_c().await
    }
}

impl Application {
    pub fn new<TClients, TModule>(clients: TClients, module: TModule) -> Self
        where
            TModule: Module<Clients=TClients>,
    {
        Application {
            controller: module.create(clients),
            cancellation_signal_factory: CtrlCCancellationSignalFactory.arc(),
            jobs_configuration_factory: DefaultJobsConfigurationFactory.arc(),
            dynamic_configuration_factory: DefaultDynamicConfigurationFactory.arc(),
        }
    }

    pub async fn run(self) -> ExitCode {
        match self.run_internal().await {
            Ok(()) => ExitCode::SUCCESS,
            Err(error) => {
                tracing::error!("Application failed: {}", error);

                eprintln!("\n{}", MultilineDisplayAdapter(error));
                ExitCode::FAILURE
            }
        }
    }

    async fn run_internal(self) -> Result<(), RunApplicationError> {
        tracing::info!("Starting application");

        let configuration = self.jobs_configuration_factory.configuration()?;
        let jobs_configuration: HashMap<String, JobConfiguration> = configuration.get("jobs").map_err(RunApplicationError::FailedToGetJobsConfiguration)?;

        let configuration = self.dynamic_configuration_factory.configuration()?;

        let context_factory = DefaultContextFactory::new(configuration).arc();
        let mut job_executor = JobsExecutor::new(context_factory);

        let clock = SystemClock.arc();

        for (name, job) in self.controller.job.take_jobs() {
            if let Some(configuration) = jobs_configuration.get(&name) {
                if configuration.enabled.unwrap_or(true) {
                    let trigger: Arc<dyn JobTrigger> = if configuration.schedule.starts_with("PT") {
                        let trigger = IntervalTrigger::from_str(&configuration.schedule)?;
                        tracing::info!("Registered job `{name}` with interval `{} seconds`", trigger.interval().as_secs());
                        trigger.arc()
                    } else {
                        let trigger = CronTrigger::new(&configuration.schedule, clock.clone())?;
                        tracing::info!("Registered job `{name}` with schedule `{}`", configuration.schedule);
                        trigger.arc()
                    };

                    job_executor.schedule(name, job, trigger);
                } else {
                    tracing::info!("Job `{name}` disabled, skiping");
                }
            } else {
                return Err(RunApplicationError::MissingConfigurationForJob(name));
            }
        }

        let cancellation_signal = self.cancellation_signal_factory.cancellation_signal();
        let cancellation_token = CancellationToken::new();
        let mut future = pin!(job_executor.run(cancellation_token.child_token()));

        tokio::select! {
            _ = &mut future => {
                return Err(RunApplicationError::JobExecutorStoppedUnexpectedly)
            }
            result = cancellation_signal => {
                match result {
                    Ok(_) => {
                        tracing::info!("Cancellation signal received, starting graceful shutdown");
                        cancellation_token.cancel();
                    },
                    Err(err) => { return Err(RunApplicationError::FailedToRegisterCtrlCHandler(err)) }
                }
            }
        }

        // Cancellation signal received
        let graceful_period = tokio::time::sleep(Duration::from_secs(10));
        tokio::select! {
            _ = &mut future => {
                tracing::info!("Application successfully stopped");
                Ok(())
            }
            _ = graceful_period => {
                Err(RunApplicationError::JobExecutorFailedToStopDuringGracefulShutdownPeriod)
            }
        }
    }

    pub fn set_cancellation_signal_factory(&mut self, cancellation_signal_factory: Arc<dyn CancellationSignalFactory>) {
        self.cancellation_signal_factory = cancellation_signal_factory;
    }

    pub fn set_infrastructure_configuration_factory(&mut self, configuration_factory: Arc<dyn ConfigurationFactory>) {
        self.jobs_configuration_factory = configuration_factory;
    }


    pub fn set_dynamic_configuration_factory(&mut self, dynamic_configuration_factory: Arc<dyn ConfigurationFactory>) {
        self.dynamic_configuration_factory = dynamic_configuration_factory;
    }
}

#[derive(Debug, Error)]
enum RunApplicationError {
    #[error("Failed to register control-c handler")]
    FailedToRegisterCtrlCHandler(#[source] io::Error),
    #[error("Failed to create configuration")]
    FailedToCreateConfiguration(#[from] CreateInfrastructureConfigurationError),
    #[error("Failed to get jobs configuration")]
    FailedToGetJobsConfiguration(#[from] GetConfigurationError),
    #[error("Failed to create interval job trigger")]
    FailedToCreateIntervalJobTrigger(#[from] IntervalTriggerFromStrError),
    #[error("Failed to create cron job trigger")]
    FailedToCreateCronJobTrigger(#[from] NewCronTriggerError),
    #[error("Missing configuration for job {0}")]
    MissingConfigurationForJob(String),
    #[error("Job executor failed to stop during graceful shutdown")]
    JobExecutorFailedToStopDuringGracefulShutdownPeriod,
    #[error("Job executor stopped unexpectedly")]
    JobExecutorStoppedUnexpectedly,
}

pub trait ConfigurationFactory {
    fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError>;
}

struct DefaultJobsConfigurationFactory;

impl ConfigurationFactory for DefaultJobsConfigurationFactory {
    fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
        let root_provider = Json5Provider::from_path("./configuration/jobs.json5")?;
        let env_provider = EnvironmentVariablesProvider::new();

        Ok(
            Configuration::builder(root_provider)
                .with_provider("environment", env_provider)
                .build()?
        )
    }
}


struct DefaultDynamicConfigurationFactory;

impl ConfigurationFactory for DefaultDynamicConfigurationFactory {
    fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
        let root_provider = Json5Provider::from_path("./configuration/dynamic.json5")?;
        let env_provider = EnvironmentVariablesProvider::new();

        Ok(
            Configuration::builder(root_provider)
                .with_provider("environment", env_provider)
                .build()?
        )
    }
}

#[derive(Debug, Error)]
pub enum CreateInfrastructureConfigurationError {
    #[error("Failed to create yaml provider")]
    FailedToCreateJson5Provider(#[from] CreateJson5ProviderFromFileError),
    #[error("Failed to create configuration")]
    FailedToCreateConfiguration(#[from] BuildConfigurationError),
}

#[derive(Deserialize)]
struct JobConfiguration {
    schedule: String,
    enabled: Option<bool>,
}


pub struct MultilineDisplayAdapter<E: Error>(pub E);

impl<E: Error> Display for MultilineDisplayAdapter<E> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        writeln!(f, "Error: {}", self.0)?;

        let mut cause = self.0.source();
        if self.0.source().is_some() {
            writeln!(f, "Caused by:")?;
        }

        while let Some(error) = cause {
            writeln!(f, "   {}", error)?;
            cause = error.source();
        }

        Ok(())
    }
}