drogue_bazaar/app/run/
main.rs1use crate::{
2 app::{health::HealthChecker, RuntimeConfig, Startup},
3 core::{config::ConfigFromEnv, Spawner},
4 health::HealthChecked,
5};
6use futures_core::future::LocalBoxFuture;
7use futures_util::future::FutureExt;
8use humantime::format_duration;
9use prometheus::{Encoder, TextEncoder};
10use std::future::Future;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13
14#[cfg(feature = "actix")]
15use crate::app::health::HealthServer;
16
17pub struct Main<'m> {
25 sub: SubMain<'m>,
26}
27
28impl<'m> Default for Main<'m> {
29 fn default() -> Self {
30 Self::new(RuntimeConfig::default())
31 }
32}
33
34impl<'m> Deref for Main<'m> {
35 type Target = SubMain<'m>;
36
37 fn deref(&self) -> &Self::Target {
38 &self.sub
39 }
40}
41
42impl<'m> DerefMut for Main<'m> {
43 fn deref_mut(&mut self) -> &mut Self::Target {
44 &mut self.sub
45 }
46}
47
48impl<'m> Main<'m> {
49 pub fn new(config: RuntimeConfig) -> Self {
50 Self {
51 sub: SubMain::new(config, Default::default()),
52 }
53 }
54
55 pub fn from_env() -> anyhow::Result<Self> {
56 Ok(Self::new(RuntimeConfig::from_env_prefix("RUNTIME")?))
57 }
58
59 pub fn add_tasks<I>(mut self, tasks: I) -> Self
61 where
62 I: IntoIterator<Item = LocalBoxFuture<'m, anyhow::Result<()>>>,
63 {
64 self.extend(tasks);
65 self
66 }
67
68 pub fn add_checks<I>(&mut self, i: I)
69 where
70 I: IntoIterator<Item = Box<dyn HealthChecked>>,
71 {
72 self.sub.health.extend(i);
73 }
74
75 pub async fn run(mut self) -> anyhow::Result<()> {
76 log::info!("Starting main ...");
77 log::debug!("Runtime configuration: {:#?}", self.config);
78
79 self.run_console_metrics();
80 self.run_health_server();
81
82 self.sub.run().await
83 }
84
85 #[cfg(feature = "actix")]
86 fn run_health_server(&mut self) {
87 log::info!("Health server: {}", self.config.health.enabled);
88
89 if self.config.health.enabled {
90 let health = HealthServer::new(
91 self.config.health.clone(),
92 self.health.clone(),
93 Some(prometheus::default_registry().clone()),
94 );
95
96 self.tasks.push(health.run().boxed());
97 }
98 }
99
100 #[cfg(not(feature = "actix"))]
101 fn run_health_server(&self) {
102 log::info!(
103 "No health server implementation (required?: {})",
104 self.config.health.enabled
105 );
106
107 if self.config.health.enabled {
108 panic!("Unable to run health endpoint without 'actix' feature. Either enable 'actix' during compilation or disable the health server during runtime.");
109 }
110 }
111
112 fn run_console_metrics(&mut self) {
113 if self.config.console_metrics.enabled {
114 let period = self.config.console_metrics.period;
115
116 self.tasks.push(
117 async move {
118 log::info!(
119 "Starting console metrics loop ({})...",
120 format_duration(period)
121 );
122 let encoder = TextEncoder::new();
123 loop {
124 let metric_families = prometheus::gather();
125 {
126 let mut out = std::io::stdout().lock();
127 encoder.encode(&metric_families, &mut out).unwrap();
128 }
129 tokio::time::sleep(period).await;
130 }
131 }
132 .boxed(),
133 );
134 }
135 }
136}
137
138impl Spawner for Main<'_> {
139 fn spawn_boxed(&mut self, future: Pin<Box<dyn Future<Output = anyhow::Result<()>>>>) {
140 SubMain::spawn_boxed(self, future)
141 }
142}
143
144impl Startup for Main<'_> {
145 fn check_boxed(&mut self, check: Box<dyn HealthChecked>) {
146 SubMain::check_boxed(self, check)
147 }
148
149 fn use_tracing(&self) -> bool {
150 SubMain::use_tracing(self)
151 }
152
153 fn runtime_config(&self) -> &RuntimeConfig {
154 SubMain::runtime_config(self)
155 }
156}
157
158pub struct SubMain<'m> {
162 config: RuntimeConfig,
163 tasks: Vec<LocalBoxFuture<'m, anyhow::Result<()>>>,
164 health: HealthChecker,
165}
166
167impl SubMain<'_> {
168 pub(crate) fn new(config: RuntimeConfig, health: HealthChecker) -> Self {
169 Self {
170 config,
171 tasks: Default::default(),
172 health,
173 }
174 }
175
176 pub fn is_empty(&self) -> bool {
178 self.tasks.is_empty()
179 }
180
181 pub fn sub_main(&self) -> SubMain {
183 self.sub_main_seed().into()
184 }
185
186 pub fn sub_main_seed(&self) -> SubMainSeed {
188 SubMainSeed::new(self.config.clone(), self.health.clone())
189 }
190
191 pub async fn run(self) -> anyhow::Result<()> {
195 log::info!("Running {} tasks in this main instance", self.tasks.len());
196
197 let (result, _, _) = futures_util::future::select_all(self.tasks).await;
198
199 log::warn!("One of the main runners returned: {result:?}");
200 log::warn!("Exiting application...");
201
202 Ok(())
203 }
204}
205
206impl<'m> Extend<LocalBoxFuture<'m, Result<(), anyhow::Error>>> for SubMain<'m> {
207 fn extend<T: IntoIterator<Item = LocalBoxFuture<'m, anyhow::Result<()>>>>(&mut self, iter: T) {
208 self.tasks.extend(iter)
209 }
210}
211
212impl<'m> Spawner for SubMain<'m> {
213 fn spawn_boxed(&mut self, future: Pin<Box<dyn Future<Output = anyhow::Result<()>>>>) {
214 self.tasks.push(future);
215 }
216}
217
218impl<'m> Startup for SubMain<'m> {
219 fn check_boxed(&mut self, check: Box<dyn HealthChecked>) {
220 self.health.push(check);
221 }
222
223 fn use_tracing(&self) -> bool {
224 self.config.tracing.is_enabled()
225 }
226
227 fn runtime_config(&self) -> &RuntimeConfig {
228 &self.config
229 }
230}
231
232pub struct SubMainSeed {
238 config: RuntimeConfig,
239 health: HealthChecker,
240}
241
242impl SubMainSeed {
243 fn new(config: RuntimeConfig, health: HealthChecker) -> Self {
244 Self { config, health }
245 }
246}
247
248impl From<SubMainSeed> for SubMain<'_> {
249 fn from(seed: SubMainSeed) -> Self {
250 Self {
251 config: seed.config,
252 health: seed.health,
253 tasks: Default::default(),
254 }
255 }
256}