drogue_bazaar/app/run/
main.rs

1use crate::{
2    app::{health::HealthChecker, RuntimeConfig, Startup},
3    core::{config::ConfigFromEnv, Spawner},
4    health::HealthChecked,
5};
6use futures_core::future::LocalBoxFuture;
7use futures_util::future::FutureExt;
8use humantime::format_duration;
9use prometheus::{Encoder, TextEncoder};
10use std::future::Future;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13
14#[cfg(feature = "actix")]
15use crate::app::health::HealthServer;
16
17/// A main runner.
18///
19/// The idea of the main runner is to perform all setup steps, gathering all tasks (futures) to be
20/// executed, and then initialize the stack and drive the tasks, until one of them completes.
21///
22/// In some cases it might be necessary to run a set of tasks on a different context (like actix, or
23/// ntex). In this case it is possible to create a [`SubMain`] instance using [`SubMain::sub_main`].
24pub struct Main<'m> {
25    sub: SubMain<'m>,
26}
27
28impl<'m> Default for Main<'m> {
29    fn default() -> Self {
30        Self::new(RuntimeConfig::default())
31    }
32}
33
34impl<'m> Deref for Main<'m> {
35    type Target = SubMain<'m>;
36
37    fn deref(&self) -> &Self::Target {
38        &self.sub
39    }
40}
41
42impl<'m> DerefMut for Main<'m> {
43    fn deref_mut(&mut self) -> &mut Self::Target {
44        &mut self.sub
45    }
46}
47
48impl<'m> Main<'m> {
49    pub fn new(config: RuntimeConfig) -> Self {
50        Self {
51            sub: SubMain::new(config, Default::default()),
52        }
53    }
54
55    pub fn from_env() -> anyhow::Result<Self> {
56        Ok(Self::new(RuntimeConfig::from_env_prefix("RUNTIME")?))
57    }
58
59    /// Add tasks to run.
60    pub fn add_tasks<I>(mut self, tasks: I) -> Self
61    where
62        I: IntoIterator<Item = LocalBoxFuture<'m, anyhow::Result<()>>>,
63    {
64        self.extend(tasks);
65        self
66    }
67
68    pub fn add_checks<I>(&mut self, i: I)
69    where
70        I: IntoIterator<Item = Box<dyn HealthChecked>>,
71    {
72        self.sub.health.extend(i);
73    }
74
75    pub async fn run(mut self) -> anyhow::Result<()> {
76        log::info!("Starting main ...");
77        log::debug!("Runtime configuration: {:#?}", self.config);
78
79        self.run_console_metrics();
80        self.run_health_server();
81
82        self.sub.run().await
83    }
84
85    #[cfg(feature = "actix")]
86    fn run_health_server(&mut self) {
87        log::info!("Health server: {}", self.config.health.enabled);
88
89        if self.config.health.enabled {
90            let health = HealthServer::new(
91                self.config.health.clone(),
92                self.health.clone(),
93                Some(prometheus::default_registry().clone()),
94            );
95
96            self.tasks.push(health.run().boxed());
97        }
98    }
99
100    #[cfg(not(feature = "actix"))]
101    fn run_health_server(&self) {
102        log::info!(
103            "No health server implementation (required?: {})",
104            self.config.health.enabled
105        );
106
107        if self.config.health.enabled {
108            panic!("Unable to run health endpoint without 'actix' feature. Either enable 'actix' during compilation or disable the health server during runtime.");
109        }
110    }
111
112    fn run_console_metrics(&mut self) {
113        if self.config.console_metrics.enabled {
114            let period = self.config.console_metrics.period;
115
116            self.tasks.push(
117                async move {
118                    log::info!(
119                        "Starting console metrics loop ({})...",
120                        format_duration(period)
121                    );
122                    let encoder = TextEncoder::new();
123                    loop {
124                        let metric_families = prometheus::gather();
125                        {
126                            let mut out = std::io::stdout().lock();
127                            encoder.encode(&metric_families, &mut out).unwrap();
128                        }
129                        tokio::time::sleep(period).await;
130                    }
131                }
132                .boxed(),
133            );
134        }
135    }
136}
137
138impl Spawner for Main<'_> {
139    fn spawn_boxed(&mut self, future: Pin<Box<dyn Future<Output = anyhow::Result<()>>>>) {
140        SubMain::spawn_boxed(self, future)
141    }
142}
143
144impl Startup for Main<'_> {
145    fn check_boxed(&mut self, check: Box<dyn HealthChecked>) {
146        SubMain::check_boxed(self, check)
147    }
148
149    fn use_tracing(&self) -> bool {
150        SubMain::use_tracing(self)
151    }
152
153    fn runtime_config(&self) -> &RuntimeConfig {
154        SubMain::runtime_config(self)
155    }
156}
157
158/// A sub-main instance, which can be used to contribute global tasks to the main instance which
159/// created this sub instance, but gather own tasks, which can be run independently by calling
160/// the [`SubMain::run`] function.
161pub struct SubMain<'m> {
162    config: RuntimeConfig,
163    tasks: Vec<LocalBoxFuture<'m, anyhow::Result<()>>>,
164    health: HealthChecker,
165}
166
167impl SubMain<'_> {
168    pub(crate) fn new(config: RuntimeConfig, health: HealthChecker) -> Self {
169        Self {
170            config,
171            tasks: Default::default(),
172            health,
173        }
174    }
175
176    /// Returns `true` is there are no tasks scheduled so far.
177    pub fn is_empty(&self) -> bool {
178        self.tasks.is_empty()
179    }
180
181    /// Create a new sub-main instance.
182    pub fn sub_main(&self) -> SubMain {
183        self.sub_main_seed().into()
184    }
185
186    /// Create a seed or a sub-main instance, which can be sent.
187    pub fn sub_main_seed(&self) -> SubMainSeed {
188        SubMainSeed::new(self.config.clone(), self.health.clone())
189    }
190
191    /// Run the recorded tasks.
192    ///
193    /// **NOTE:** This does not run any health checks, these must be run by the main instance.
194    pub async fn run(self) -> anyhow::Result<()> {
195        log::info!("Running {} tasks in this main instance", self.tasks.len());
196
197        let (result, _, _) = futures_util::future::select_all(self.tasks).await;
198
199        log::warn!("One of the main runners returned: {result:?}");
200        log::warn!("Exiting application...");
201
202        Ok(())
203    }
204}
205
206impl<'m> Extend<LocalBoxFuture<'m, Result<(), anyhow::Error>>> for SubMain<'m> {
207    fn extend<T: IntoIterator<Item = LocalBoxFuture<'m, anyhow::Result<()>>>>(&mut self, iter: T) {
208        self.tasks.extend(iter)
209    }
210}
211
212impl<'m> Spawner for SubMain<'m> {
213    fn spawn_boxed(&mut self, future: Pin<Box<dyn Future<Output = anyhow::Result<()>>>>) {
214        self.tasks.push(future);
215    }
216}
217
218impl<'m> Startup for SubMain<'m> {
219    fn check_boxed(&mut self, check: Box<dyn HealthChecked>) {
220        self.health.push(check);
221    }
222
223    fn use_tracing(&self) -> bool {
224        self.config.tracing.is_enabled()
225    }
226
227    fn runtime_config(&self) -> &RuntimeConfig {
228        &self.config
229    }
230}
231
232/// A seed for a [`SubMain`] instance.
233///
234/// As the tasks in a `SubMain` are not [`Send`], it is no possible to send the sub instance. Which
235/// may be required it is should be executed on another scheduler. For this it is possible to
236/// create a "seed", which can later (after sending) be turned into a proper instance.
237pub struct SubMainSeed {
238    config: RuntimeConfig,
239    health: HealthChecker,
240}
241
242impl SubMainSeed {
243    fn new(config: RuntimeConfig, health: HealthChecker) -> Self {
244        Self { config, health }
245    }
246}
247
248impl From<SubMainSeed> for SubMain<'_> {
249    fn from(seed: SubMainSeed) -> Self {
250        Self {
251            config: seed.config,
252            health: seed.health,
253            tasks: Default::default(),
254        }
255    }
256}