use std::{
any::{Any, TypeId},
sync::{Arc, atomic::AtomicU32},
thread
};
use hashbrown::HashMap;
use tokio::sync::broadcast;
use crate::{
err::{AppErrors, CbErr},
rt::{
Demise, InitCtx, RunEnv, ServiceHandler, ServiceReporter, SvcEvt, TermCtx,
signals
}
};
#[cfg(unix)]
use crate::rt::{UserSig, signals::SigType};
pub struct MainParams<ApEr> {
pub(crate) re: RunEnv,
pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
pub(crate) rt_handler: Box<dyn ServiceHandler<AppErr = ApEr>>,
pub(crate) sr: ServiceReporter,
pub(crate) svcevt_ch:
Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
pub(crate) test_mode: bool
}
pub fn main<ApEr>(
MainParams {
re,
svcevt_handler,
mut rt_handler,
sr,
svcevt_ch,
passthrough_init,
mut passthrough_term,
test_mode
}: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>> {
#[cfg(unix)]
let _ = test_mode;
let start_ts = jiff::Timestamp::now();
let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
{
(tx_svcevt, rx_svcevt)
} else {
let (tx, rx) = broadcast::channel(16);
let tx2 = tx.clone();
#[cfg(unix)]
signals::sync_sigmon(move |st| match st {
SigType::Usr1 => {
if let Err(e) = tx2.send(SvcEvt::User(UserSig::Sig1)) {
log::error!("Unable to send SvcEvt::Info event; {e}",);
}
}
SigType::Usr2 => {
if let Err(e) = tx2.send(SvcEvt::User(UserSig::Sig2)) {
log::error!("Unable to send SvcEvt::Info event; {e}");
}
}
SigType::Int => {
if let Err(e) = tx2.send(SvcEvt::Shutdown(Demise::Interrupted)) {
log::error!("Unable to send SvcEvt::Shutdown event; {e}");
}
}
SigType::Term => {
if let Err(e) = tx2.send(SvcEvt::Shutdown(Demise::Terminated)) {
log::error!("Unable to send SvcEvt::Terminate event; {e}");
}
}
SigType::Hup => {
if let Err(e) = tx2.send(SvcEvt::ReloadConf) {
log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
}
}
})?;
#[cfg(windows)]
signals::sync_kill_to_event(tx2, test_mode)?;
(tx, rx)
};
sr.starting(0, Some(super::SVCAPP_INIT_MSG));
let mut ictx = InitCtx {
re: re.clone(),
sr: sr.clone(),
cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_init,
passthrough_term: HashMap::new()
};
let init_apperr =
super::timeit("initialization", || rt_handler.init(&mut ictx).err());
for (k, v) in ictx.passthrough_term.drain() {
passthrough_term.insert(k, v);
}
drop(ictx);
let run_apperr = if init_apperr.is_none() {
sr.started();
let jh = thread::Builder::new()
.name("svcevt".into())
.spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
.unwrap();
let ret = super::timeit("main", || rt_handler.run(&re).err());
let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
let _ = jh.join();
ret
} else {
None
};
sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
let mut tctx = TermCtx {
re,
sr: sr.clone(),
cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_term
};
let term_apperr =
super::timeit("termination", || rt_handler.shutdown(&mut tctx).err());
drop(tctx);
sr.stopped();
log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
let apperrs = AppErrors {
init: init_apperr,
run: run_apperr,
shutdown: term_apperr
};
Err(CbErr::SrvApp(apperrs))?;
}
Ok(())
}