shiny_application/
application.rs1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Display;
4use crate::controllers::Controllers;
5use crate::Module;
6use std::io;
7use std::pin::pin;
8use std::process::ExitCode;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_util::sync::CancellationToken;
13use serde::Deserialize;
14use thiserror::Error;
15use shiny_common::pointer_utils::ToArc;
16use shiny_common::clock::SystemClock;
17use shiny_configuration::{Configuration, GetConfigurationError};
18use shiny_configuration::configuration_builder::BuildConfigurationError;
19use shiny_configuration::configuration_provider::env_provider::EnvironmentVariablesProvider;
20use shiny_configuration::configuration_provider::json5_provider::{CreateJson5ProviderFromFileError, Json5Provider};
21use shiny_jobs::job_trigger::cron_trigger::{CronTrigger, NewCronTriggerError};
22use shiny_jobs::job_trigger::interval_trigger::{IntervalTriggerFromStrError, IntervalTrigger};
23use shiny_jobs::job_trigger::JobTrigger;
24use shiny_jobs::jobs_executor::JobsExecutor;
25use crate::context_factory::DefaultContextFactory;
26
27pub struct Application {
28 controller: Controllers,
29 cancellation_signal_factory: Arc<dyn CancellationSignalFactory>,
30 jobs_configuration_factory: Arc<dyn ConfigurationFactory>,
31 dynamic_configuration_factory: Arc<dyn ConfigurationFactory>,
32}
33
34#[async_trait::async_trait]
35pub trait CancellationSignalFactory {
36 async fn cancellation_signal(&self) -> io::Result<()>;
37}
38
39struct CtrlCCancellationSignalFactory;
40
41#[async_trait::async_trait]
42impl CancellationSignalFactory for CtrlCCancellationSignalFactory {
43 async fn cancellation_signal(&self) -> io::Result<()> {
44 tokio::signal::ctrl_c().await
45 }
46}
47
48impl Application {
49 pub fn new<TClients, TModule>(clients: TClients, module: TModule) -> Self
50 where
51 TModule: Module<Clients=TClients>,
52 {
53 Application {
54 controller: module.create(clients),
55 cancellation_signal_factory: CtrlCCancellationSignalFactory.arc(),
56 jobs_configuration_factory: DefaultJobsConfigurationFactory.arc(),
57 dynamic_configuration_factory: DefaultDynamicConfigurationFactory.arc(),
58 }
59 }
60
61 pub async fn run(self) -> ExitCode {
62 match self.run_internal().await {
63 Ok(()) => ExitCode::SUCCESS,
64 Err(error) => {
65 tracing::error!("Application failed: {}", error);
66
67 eprintln!("\n{}", MultilineDisplayAdapter(error));
68 ExitCode::FAILURE
69 }
70 }
71 }
72
73 async fn run_internal(self) -> Result<(), RunApplicationError> {
74 tracing::info!("Starting application");
75
76 let configuration = self.jobs_configuration_factory.configuration()?;
77 let jobs_configuration: HashMap<String, JobConfiguration> = configuration.get("jobs").map_err(RunApplicationError::FailedToGetJobsConfiguration)?;
78
79 let configuration = self.dynamic_configuration_factory.configuration()?;
80
81 let context_factory = DefaultContextFactory::new(configuration).arc();
82 let mut job_executor = JobsExecutor::new(context_factory);
83
84 let clock = SystemClock.arc();
85
86 for (name, job) in self.controller.job.take_jobs() {
87 if let Some(configuration) = jobs_configuration.get(&name) {
88 if configuration.enabled.unwrap_or(true) {
89 let trigger: Arc<dyn JobTrigger> = if configuration.schedule.starts_with("PT") {
90 let trigger = IntervalTrigger::from_str(&configuration.schedule)?;
91 tracing::info!("Registered job `{name}` with interval `{} seconds`", trigger.interval().as_secs());
92 trigger.arc()
93 } else {
94 let trigger = CronTrigger::new(&configuration.schedule, clock.clone())?;
95 tracing::info!("Registered job `{name}` with schedule `{}`", configuration.schedule);
96 trigger.arc()
97 };
98
99 job_executor.schedule(name, job, trigger);
100 } else {
101 tracing::info!("Job `{name}` disabled, skiping");
102 }
103 } else {
104 return Err(RunApplicationError::MissingConfigurationForJob(name));
105 }
106 }
107
108 let cancellation_signal = self.cancellation_signal_factory.cancellation_signal();
109 let cancellation_token = CancellationToken::new();
110 let mut future = pin!(job_executor.run(cancellation_token.child_token()));
111
112 tokio::select! {
113 _ = &mut future => {
114 return Err(RunApplicationError::JobExecutorStoppedUnexpectedly)
115 }
116 result = cancellation_signal => {
117 match result {
118 Ok(_) => {
119 tracing::info!("Cancellation signal received, starting graceful shutdown");
120 cancellation_token.cancel();
121 },
122 Err(err) => { return Err(RunApplicationError::FailedToRegisterCtrlCHandler(err)) }
123 }
124 }
125 }
126
127 let graceful_period = tokio::time::sleep(Duration::from_secs(10));
129 tokio::select! {
130 _ = &mut future => {
131 tracing::info!("Application successfully stopped");
132 Ok(())
133 }
134 _ = graceful_period => {
135 Err(RunApplicationError::JobExecutorFailedToStopDuringGracefulShutdownPeriod)
136 }
137 }
138 }
139
140 pub fn set_cancellation_signal_factory(&mut self, cancellation_signal_factory: Arc<dyn CancellationSignalFactory>) {
141 self.cancellation_signal_factory = cancellation_signal_factory;
142 }
143
144 pub fn set_infrastructure_configuration_factory(&mut self, configuration_factory: Arc<dyn ConfigurationFactory>) {
145 self.jobs_configuration_factory = configuration_factory;
146 }
147
148
149 pub fn set_dynamic_configuration_factory(&mut self, dynamic_configuration_factory: Arc<dyn ConfigurationFactory>) {
150 self.dynamic_configuration_factory = dynamic_configuration_factory;
151 }
152}
153
154#[derive(Debug, Error)]
155enum RunApplicationError {
156 #[error("Failed to register control-c handler")]
157 FailedToRegisterCtrlCHandler(#[source] io::Error),
158 #[error("Failed to create configuration")]
159 FailedToCreateConfiguration(#[from] CreateInfrastructureConfigurationError),
160 #[error("Failed to get jobs configuration")]
161 FailedToGetJobsConfiguration(#[from] GetConfigurationError),
162 #[error("Failed to create interval job trigger")]
163 FailedToCreateIntervalJobTrigger(#[from] IntervalTriggerFromStrError),
164 #[error("Failed to create cron job trigger")]
165 FailedToCreateCronJobTrigger(#[from] NewCronTriggerError),
166 #[error("Missing configuration for job {0}")]
167 MissingConfigurationForJob(String),
168 #[error("Job executor failed to stop during graceful shutdown")]
169 JobExecutorFailedToStopDuringGracefulShutdownPeriod,
170 #[error("Job executor stopped unexpectedly")]
171 JobExecutorStoppedUnexpectedly,
172}
173
174pub trait ConfigurationFactory {
175 fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError>;
176}
177
178struct DefaultJobsConfigurationFactory;
179
180impl ConfigurationFactory for DefaultJobsConfigurationFactory {
181 fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
182 let root_provider = Json5Provider::from_path("./configuration/jobs.json5")?;
183 let env_provider = EnvironmentVariablesProvider::new();
184
185 Ok(
186 Configuration::builder(root_provider)
187 .with_provider("environment", env_provider)
188 .build()?
189 )
190 }
191}
192
193
194struct DefaultDynamicConfigurationFactory;
195
196impl ConfigurationFactory for DefaultDynamicConfigurationFactory {
197 fn configuration(&self) -> Result<Configuration, CreateInfrastructureConfigurationError> {
198 let root_provider = Json5Provider::from_path("./configuration/dynamic.json5")?;
199 let env_provider = EnvironmentVariablesProvider::new();
200
201 Ok(
202 Configuration::builder(root_provider)
203 .with_provider("environment", env_provider)
204 .build()?
205 )
206 }
207}
208
209#[derive(Debug, Error)]
210pub enum CreateInfrastructureConfigurationError {
211 #[error("Failed to create yaml provider")]
212 FailedToCreateJson5Provider(#[from] CreateJson5ProviderFromFileError),
213 #[error("Failed to create configuration")]
214 FailedToCreateConfiguration(#[from] BuildConfigurationError),
215}
216
217#[derive(Deserialize)]
218struct JobConfiguration {
219 schedule: String,
220 enabled: Option<bool>,
221}
222
223
224pub struct MultilineDisplayAdapter<E: Error>(pub E);
225
226impl<E: Error> Display for MultilineDisplayAdapter<E> {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 writeln!(f, "Error: {}", self.0)?;
229
230 let mut cause = self.0.source();
231 if self.0.source().is_some() {
232 writeln!(f, "Caused by:")?;
233 }
234
235 while let Some(error) = cause {
236 writeln!(f, " {}", error)?;
237 cause = error.source();
238 }
239
240 Ok(())
241 }
242}