shiny_application/
application.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Display;
4use crate::controllers::Controllers;
5use crate::Module;
6use std::io;
7use std::pin::pin;
8use std::process::ExitCode;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_util::sync::CancellationToken;
13use serde::Deserialize;
14use thiserror::Error;
15use shiny_common::pointer_utils::ToArc;
16use shiny_common::clock::SystemClock;
17use shiny_configuration::{Configuration, GetConfigurationError};
18use shiny_configuration::configuration_builder::BuildConfigurationError;
19use shiny_configuration::configuration_provider::env_provider::EnvironmentVariablesProvider;
20use shiny_configuration::configuration_provider::json5_provider::{CreateJson5ProviderFromFileError, Json5Provider};
21use shiny_jobs::job_trigger::cron_trigger::{CronTrigger, NewCronTriggerError};
22use shiny_jobs::job_trigger::interval_trigger::{IntervalTriggerFromStrError, IntervalTrigger};
23use shiny_jobs::job_trigger::JobTrigger;
24use shiny_jobs::jobs_executor::JobsExecutor;
25use crate::context_factory::DefaultContextFactory;
26
27pub struct Application {
28    controller: Controllers,
29    cancellation_signal_factory: Arc<dyn CancellationSignalFactory>,
30    jobs_configuration_factory: Arc<dyn ConfigurationFactory>,
31    dynamic_configuration_factory: Arc<dyn ConfigurationFactory>,
32}
33
34#[async_trait::async_trait]
35pub trait CancellationSignalFactory {
36    async fn cancellation_signal(&self) -> io::Result<()>;
37}
38
39struct CtrlCCancellationSignalFactory;
40
41#[async_trait::async_trait]
42impl CancellationSignalFactory for CtrlCCancellationSignalFactory {
43    async fn cancellation_signal(&self) -> io::Result<()> {
44        tokio::signal::ctrl_c().await
45    }
46}
47
48impl Application {
49    pub fn new<TClients, TModule>(clients: TClients, module: TModule) -> Self
50        where
51            TModule: Module<Clients=TClients>,
52    {
53        Application {
54            controller: module.create(clients),
55            cancellation_signal_factory: CtrlCCancellationSignalFactory.arc(),
56            jobs_configuration_factory: DefaultJobsConfigurationFactory.arc(),
57            dynamic_configuration_factory: DefaultDynamicConfigurationFactory.arc(),
58        }
59    }
60
61    pub async fn run(self) -> ExitCode {
62        match self.run_internal().await {
63            Ok(()) => ExitCode::SUCCESS,
64            Err(error) => {
65                tracing::error!("Application failed: {}", error);
66
67                eprintln!("\n{}", MultilineDisplayAdapter(error));
68                ExitCode::FAILURE
69            }
70        }
71    }
72
73    async fn run_internal(self) -> Result<(), RunApplicationError> {
74        tracing::info!("Starting application");
75
76        let configuration = self.jobs_configuration_factory.configuration()?;
77        let jobs_configuration: HashMap<String, JobConfiguration> = configuration.get("jobs").map_err(RunApplicationError::FailedToGetJobsConfiguration)?;
78
79        let configuration = self.dynamic_configuration_factory.configuration()?;
80
81        let context_factory = DefaultContextFactory::new(configuration).arc();
82        let mut job_executor = JobsExecutor::new(context_factory);
83
84        let clock = SystemClock.arc();
85
86        for (name, job) in self.controller.job.take_jobs() {
87            if let Some(configuration) = jobs_configuration.get(&name) {
88                if configuration.enabled.unwrap_or(true) {
89                    let trigger: Arc<dyn JobTrigger> = if configuration.schedule.starts_with("PT") {
90                        let trigger = IntervalTrigger::from_str(&configuration.schedule)?;
91                        tracing::info!("Registered job `{name}` with interval `{} seconds`", trigger.interval().as_secs());
92                        trigger.arc()
93                    } else {
94                        let trigger = CronTrigger::new(&configuration.schedule, clock.clone())?;
95                        tracing::info!("Registered job `{name}` with schedule `{}`", configuration.schedule);
96                        trigger.arc()
97                    };
98
99                    job_executor.schedule(name, job, trigger);
100                } else {
101                    tracing::info!("Job `{name}` disabled, skiping");
102                }
103            } else {
104                return Err(RunApplicationError::MissingConfigurationForJob(name));
105            }
106        }
107
108        let cancellation_signal = self.cancellation_signal_factory.cancellation_signal();
109        let cancellation_token = CancellationToken::new();
110        let mut future = pin!(job_executor.run(cancellation_token.child_token()));
111
112        tokio::select! {
113            _ = &mut future => {
114                return Err(RunApplicationError::JobExecutorStoppedUnexpectedly)
115            }
116            result = cancellation_signal => {
117                match result {
118                    Ok(_) => {
119                        tracing::info!("Cancellation signal received, starting graceful shutdown");
120                        cancellation_token.cancel();
121                    },
122                    Err(err) => { return Err(RunApplicationError::FailedToRegisterCtrlCHandler(err)) }
123                }
124            }
125        }
126
127        // Cancellation signal received
128        let graceful_period = tokio::time::sleep(Duration::from_secs(10));
129        tokio::select! {
130            _ = &mut future => {
131                tracing::info!("Application successfully stopped");
132                Ok(())
133            }
134            _ = graceful_period => {
135                Err(RunApplicationError::JobExecutorFailedToStopDuringGracefulShutdownPeriod)
136            }
137        }
138    }
139
140    pub fn set_cancellation_signal_factory(&mut self, cancellation_signal_factory: Arc<dyn CancellationSignalFactory>) {
141        self.cancellation_signal_factory = cancellation_signal_factory;
142    }
143
144    pub fn set_infrastructure_configuration_factory(&mut self, configuration_factory: Arc<dyn ConfigurationFactory>) {
145        self.jobs_configuration_factory = configuration_factory;
146    }
147
148
149    pub fn set_dynamic_configuration_factory(&mut self, dynamic_configuration_factory: Arc<dyn ConfigurationFactory>) {
150        self.dynamic_configuration_factory = dynamic_configuration_factory;
151    }
152}
153
154#[derive(Debug, Error)]
155enum RunApplicationError {
156    #[error("Failed to register control-c handler")]
157    FailedToRegisterCtrlCHandler(#[source] io::Error),
158    #[error("Failed to create configuration")]
159    FailedToCreateConfiguration(#[from] CreateInfrastructureConfigurationError),
160    #[error("Failed to get jobs configuration")]
161    FailedToGetJobsConfiguration(#[from] GetConfigurationError),
162    #[error("Failed to create interval job trigger")]
163    FailedToCreateIntervalJobTrigger(#[from] IntervalTriggerFromStrError),
164    #[error("Failed to create cron job trigger")]
165    FailedToCreateCronJobTrigger(#[from] NewCronTriggerError),
166    #[error("Missing configuration for job {0}")]
167    MissingConfigurationForJob(String),
168    #[error("Job executor failed to stop during graceful shutdown")]
169    JobExecutorFailedToStopDuringGracefulShutdownPeriod,
170    #[error("Job executor stopped unexpectedly")]
171    JobExecutorStoppedUnexpectedly,
172}
173
174pub trait ConfigurationFactory {
175    fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError>;
176}
177
178struct DefaultJobsConfigurationFactory;
179
180impl ConfigurationFactory for DefaultJobsConfigurationFactory {
181    fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
182        let root_provider = Json5Provider::from_path("./configuration/jobs.json5")?;
183        let env_provider = EnvironmentVariablesProvider::new();
184
185        Ok(
186            Configuration::builder(root_provider)
187                .with_provider("environment", env_provider)
188                .build()?
189        )
190    }
191}
192
193
194struct DefaultDynamicConfigurationFactory;
195
196impl ConfigurationFactory for DefaultDynamicConfigurationFactory {
197    fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
198        let root_provider = Json5Provider::from_path("./configuration/dynamic.json5")?;
199        let env_provider = EnvironmentVariablesProvider::new();
200
201        Ok(
202            Configuration::builder(root_provider)
203                .with_provider("environment", env_provider)
204                .build()?
205        )
206    }
207}
208
209#[derive(Debug, Error)]
210pub enum CreateInfrastructureConfigurationError {
211    #[error("Failed to create yaml provider")]
212    FailedToCreateJson5Provider(#[from] CreateJson5ProviderFromFileError),
213    #[error("Failed to create configuration")]
214    FailedToCreateConfiguration(#[from] BuildConfigurationError),
215}
216
217#[derive(Deserialize)]
218struct JobConfiguration {
219    schedule: String,
220    enabled: Option<bool>,
221}
222
223
224pub struct MultilineDisplayAdapter<E: Error>(pub E);
225
226impl<E: Error> Display for MultilineDisplayAdapter<E> {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        writeln!(f, "Error: {}", self.0)?;
229
230        let mut cause = self.0.source();
231        if self.0.source().is_some() {
232            writeln!(f, "Caused by:")?;
233        }
234
235        while let Some(error) = cause {
236            writeln!(f, "   {}", error)?;
237            cause = error.source();
238        }
239
240        Ok(())
241    }
242}