1use std::{
18 any::{Any, TypeId},
19 sync::{Arc, atomic::AtomicU32},
20 thread
21};
22
23use hashbrown::HashMap;
24
25use tokio::{sync::broadcast, task};
26
27use killswitch::KillSwitch;
28
29use crate::{
30 err::{AppErrors, CbErr},
31 rt::{
32 Demise, InitCtx, RocketServiceHandler, RunEnv, ServiceReporter, SvcEvt,
33 TermCtx, signals
34 }
35};
36
37#[cfg(unix)]
38use crate::rt::UserSig;
39
40
41pub struct MainParams<ApEr>
42where
43 ApEr: Send
44{
45 pub(crate) re: RunEnv,
46 pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
47 pub(crate) rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>,
48 pub(crate) sr: ServiceReporter,
49 pub(crate) svcevt_ch:
50 Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
51 pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
52 pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
53}
54
55pub fn main<ApEr>(params: MainParams<ApEr>) -> Result<(), CbErr<ApEr>>
58where
59 ApEr: Send
60{
61 rocket::execute(rocket_async_main(params))?;
62
63 Ok(())
64}
65
66
67#[expect(clippy::too_many_lines)]
68async fn rocket_async_main<ApEr>(
69 MainParams {
70 re,
71 svcevt_handler,
72 mut rt_handler,
73 sr,
74 svcevt_ch,
75 passthrough_init,
76 mut passthrough_term
77 }: MainParams<ApEr>
78) -> Result<(), CbErr<ApEr>>
79where
80 ApEr: Send
81{
82 let ks = KillSwitch::new();
83
84 let start_ts = jiff::Timestamp::now();
85
86 let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
93 {
94 (tx_svcevt, rx_svcevt)
95 } else {
96 init_svc_channels(&ks)
97 };
98
99 sr.starting(0, Some(super::SVCAPP_INIT_MSG));
101
102 let mut ictx = InitCtx {
104 re: re.clone(),
105 sr: sr.clone(),
106 cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_init,
108 passthrough_term: HashMap::new()
109 };
110 let (rockets, init_apperr) =
111 super::timeit_async("initialization", || async {
112 match rt_handler.init(&mut ictx).await {
113 Ok(rockets) => (rockets, None),
114 Err(e) => (Vec::new(), Some(e))
115 }
116 })
117 .await;
118
119 for (k, v) in ictx.passthrough_term.drain() {
122 passthrough_term.insert(k, v);
123 }
124
125 drop(ictx);
130
131 let mut ignited = vec![];
137 let mut rocket_shutdowns = vec![];
138 for rocket in rockets {
139 let rocket = rocket.ignite().await?;
140 rocket_shutdowns.push(rocket.shutdown());
141 ignited.push(rocket);
142 }
143
144 let run_apperr = if init_apperr.is_none() {
147 sr.started();
149
150 let mut rx_svcevt2 = rx_svcevt.resubscribe();
151
152 let jh_graceful_landing = task::spawn(async move {
158 loop {
159 let msg = match rx_svcevt2.recv().await {
160 Ok(msg) => msg,
161 Err(e) => {
162 log::error!("Unable to receive broadcast SvcEvt message, {e}");
163 break;
164 }
165 };
166 if let SvcEvt::Shutdown(_) = msg {
167 tracing::trace!("Ask rocket instances to shut down gracefully");
168 for shutdown in rocket_shutdowns {
169 shutdown.notify();
171 }
172 break;
173 }
174 tracing::trace!("Ignored message in task waiting for shutdown");
175 }
176 });
177
178 let jh = thread::Builder::new()
181 .name("svcevt".into())
182 .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
183 .unwrap();
184
185 let res = super::timeit_async("main", || async {
189 rt_handler.run(ignited, &re).await.err()
190 })
191 .await;
192
193 let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
199
200 if let Err(e) = jh_graceful_landing.await {
202 log::warn!("An error was returned from the graceful landing task; {e}");
203 }
204
205 let _ = task::spawn_blocking(|| jh.join()).await;
207
208 res
209 } else {
210 None
211 };
212
213 sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
218
219 ks.trigger();
222 if (ks.finalize().await).is_err() {
223 log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
224 }
225
226 let mut tctx = TermCtx {
228 re,
229 sr: sr.clone(),
230 cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_term
232 };
233 let term_apperr = super::timeit_async("termination", || async {
234 rt_handler.shutdown(&mut tctx).await.err()
235 })
236 .await;
237
238 drop(tctx);
243
244 sr.stopped();
246
247 log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
248
249 if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
253 let apperrs = AppErrors {
254 init: init_apperr,
255 run: run_apperr,
256 shutdown: term_apperr
257 };
258 Err(CbErr::SrvApp(apperrs))?;
259 }
260
261 Ok(())
262}
263
264fn init_svc_channels(
265 ks: &KillSwitch
266) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
267 let (tx, rx) = broadcast::channel(16);
269
270 let ks2 = ks.clone();
272
273 let txc = tx.clone();
276 task::spawn(signals::wait_shutdown(
277 move || {
278 if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
279 log::error!("Unable to send SvcEvt::Shutdown event; {e}");
280 }
281 },
282 ks2
283 ));
284
285 let txc = tx.clone();
289 let ks2 = ks.clone();
291 task::spawn(signals::wait_term(
292 move || {
293 if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Terminated)) {
294 log::error!("Unable to send SvcEvt::Terminate event; {e}");
295 }
296 },
297 ks2
298 ));
299
300 #[cfg(unix)]
303 {
304 let ks2 = ks.clone();
306
307 let txc = tx.clone();
309 task::spawn(signals::wait_reload(
310 move || {
311 if let Err(e) = txc.send(SvcEvt::ReloadConf) {
312 log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
313 }
314 },
315 ks2
316 ));
317 }
318
319 #[cfg(unix)]
320 {
321 let ks2 = ks.clone();
322
323 let txc = tx.clone();
324 task::spawn(signals::wait_user1(
325 move || {
326 if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
327 log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
328 }
329 },
330 ks2
331 ));
332 }
333
334 #[cfg(unix)]
335 {
336 let ks2 = ks.clone();
337
338 let txc = tx.clone();
339 task::spawn(signals::wait_user2(
340 move || {
341 if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
342 log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
343 }
344 },
345 ks2
346 ));
347 }
348
349 (tx, rx)
350}
351
352