1use std::{
2 any::{Any, TypeId},
3 sync::{Arc, atomic::AtomicU32},
4 thread
5};
6
7use hashbrown::HashMap;
8
9use tokio::{runtime, sync::broadcast, task};
10
11use crate::{
12 err::{AppErrors, CbErr},
13 rt::{
14 Demise, InitCtx, RunEnv, ServiceReporter, SvcEvt, TermCtx,
15 TokioServiceHandler, signals
16 }
17};
18
19use killswitch::KillSwitch;
20
21#[cfg(unix)]
22use crate::rt::UserSig;
23
24
25pub struct MainParams<ApEr> {
26 pub(crate) re: RunEnv,
27 pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
28 pub(crate) rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>,
29 pub(crate) sr: ServiceReporter,
30 pub(crate) svcevt_ch:
31 Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
32 pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
33 pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
34}
35
36
37pub fn main<ApEr>(
40 rtbldr: Option<runtime::Builder>,
41 params: MainParams<ApEr>
42) -> Result<(), CbErr<ApEr>>
43where
44 ApEr: Send + std::fmt::Debug
45{
46 let rt = if let Some(mut bldr) = rtbldr {
47 bldr.build()?
48 } else {
49 tokio::runtime::Runtime::new()?
50 };
51 rt.block_on(async_main(params))?;
52
53 Ok(())
54}
55
56async fn async_main<ApEr>(
62 MainParams {
63 re,
64 svcevt_handler,
65 mut rt_handler,
66 sr,
67 svcevt_ch,
68 passthrough_init,
69 mut passthrough_term
70 }: MainParams<ApEr>
71) -> Result<(), CbErr<ApEr>>
72where
73 ApEr: Send + std::fmt::Debug
74{
75 let ks = KillSwitch::new();
76
77 let start_ts = jiff::Timestamp::now();
78
79 let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
83 {
84 (tx_svcevt, rx_svcevt)
85 } else {
86 init_svc_channels(&ks)
87 };
88
89 sr.starting(0, Some(super::SVCAPP_INIT_MSG));
91
92 let mut ictx = InitCtx {
94 re: re.clone(),
95 sr: sr.clone(),
96 cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_init,
98 passthrough_term: HashMap::new()
99 };
100 let init_apperr = super::timeit_async("initialization", || async {
101 rt_handler.init(&mut ictx).await.err()
102 })
103 .await;
104
105 for (k, v) in ictx.passthrough_term.drain() {
108 passthrough_term.insert(k, v);
109 }
110
111 drop(ictx);
116
117 if let Some(ref e) = init_apperr {
118 tracing::error!("Service handler init() failed; {e:?}");
119 }
120
121 let run_apperr = if init_apperr.is_none() {
122 sr.started();
123
124 let jh = thread::Builder::new()
127 .name("svcevt".into())
128 .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
129 .unwrap();
130
131 let ret = super::timeit_async("main", || async {
135 rt_handler.run(&re).await.err()
136 })
137 .await;
138
139 if let Some(ref e) = ret {
140 tracing::error!("Service handler run() failed; {e:?}");
141 }
142
143 let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
149
150 let _ = task::spawn_blocking(|| jh.join()).await;
152
153 ret
154 } else {
155 None
156 };
157
158 sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
163
164 ks.trigger();
167
168 if (ks.finalize().await).is_err() {
169 log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
170 }
171
172 let mut tctx = TermCtx {
174 re,
175 sr: sr.clone(),
176 cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_term
178 };
179 let term_apperr = super::timeit_async("termination", || async {
180 rt_handler.shutdown(&mut tctx).await.err()
181 })
182 .await;
183
184 drop(tctx);
189
190 if let Some(ref e) = term_apperr {
191 tracing::error!("Service handler shutdown() failed; {e:?}");
192 }
193
194 sr.stopped();
196
197 log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
198
199 if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
203 let apperrs = AppErrors {
204 init: init_apperr,
205 run: run_apperr,
206 shutdown: term_apperr
207 };
208 Err(CbErr::SrvApp(apperrs))?;
209 }
210
211 Ok(())
212}
213
214fn init_svc_channels(
215 ks: &KillSwitch
216) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
217 let (tx, rx) = broadcast::channel(16);
219
220 let ks2 = ks.clone();
221
222 let txc = tx.clone();
224 task::spawn(signals::wait_shutdown(
225 move || {
226 if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
227 log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
228 }
229 },
230 ks2
231 ));
232
233 let txc = tx.clone();
236 let ks2 = ks.clone();
237 task::spawn(signals::wait_term(
238 move || {
239 let svcevt = SvcEvt::Shutdown(Demise::Terminated);
240 if let Err(e) = txc.send(svcevt) {
241 log::error!("Unable to send SvcEvt::Terminate event; {e}");
242 }
243 },
244 ks2
245 ));
246
247 #[cfg(unix)]
250 {
251 let ks2 = ks.clone();
252
253 let txc = tx.clone();
254 task::spawn(signals::wait_reload(
255 move || {
256 if let Err(e) = txc.send(SvcEvt::ReloadConf) {
257 log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
258 }
259 },
260 ks2
261 ));
262 }
263
264 #[cfg(unix)]
265 {
266 let ks2 = ks.clone();
267
268 let txc = tx.clone();
269 task::spawn(signals::wait_user1(
270 move || {
271 if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
272 log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
273 }
274 },
275 ks2
276 ));
277 }
278
279 #[cfg(unix)]
280 {
281 let ks2 = ks.clone();
282
283 let txc = tx.clone();
284 task::spawn(signals::wait_user2(
285 move || {
286 if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
287 log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
288 }
289 },
290 ks2
291 ));
292 }
293
294 (tx, rx)
295}
296
297