use std::{
any::{Any, TypeId},
sync::{Arc, atomic::AtomicU32},
thread
};
use hashbrown::HashMap;
use tokio::{runtime, sync::broadcast, task};
use crate::{
err::{AppErrors, CbErr},
rt::{
Demise, InitCtx, RunEnv, ServiceReporter, SvcEvt, TermCtx,
TokioServiceHandler, signals
}
};
use killswitch::KillSwitch;
#[cfg(unix)]
use crate::rt::UserSig;
pub struct MainParams<ApEr> {
pub(crate) re: RunEnv,
pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
pub(crate) rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>,
pub(crate) sr: ServiceReporter,
pub(crate) svcevt_ch:
Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
}
pub fn main<ApEr>(
rtbldr: Option<runtime::Builder>,
params: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>>
where
ApEr: Send + std::fmt::Debug
{
let rt = if let Some(mut bldr) = rtbldr {
bldr.build()?
} else {
tokio::runtime::Runtime::new()?
};
rt.block_on(async_main(params))?;
Ok(())
}
async fn async_main<ApEr>(
MainParams {
re,
svcevt_handler,
mut rt_handler,
sr,
svcevt_ch,
passthrough_init,
mut passthrough_term
}: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>>
where
ApEr: Send + std::fmt::Debug
{
let ks = KillSwitch::new();
let start_ts = jiff::Timestamp::now();
let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
{
(tx_svcevt, rx_svcevt)
} else {
init_svc_channels(&ks)
};
sr.starting(0, Some(super::SVCAPP_INIT_MSG));
let mut ictx = InitCtx {
re: re.clone(),
sr: sr.clone(),
cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_init,
passthrough_term: HashMap::new()
};
let init_apperr = super::timeit_async("initialization", || async {
rt_handler.init(&mut ictx).await.err()
})
.await;
for (k, v) in ictx.passthrough_term.drain() {
passthrough_term.insert(k, v);
}
drop(ictx);
if let Some(ref e) = init_apperr {
tracing::error!("Service handler init() failed; {e:?}");
}
let run_apperr = if init_apperr.is_none() {
sr.started();
let jh = thread::Builder::new()
.name("svcevt".into())
.spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
.unwrap();
let ret = super::timeit_async("main", || async {
rt_handler.run(&re).await.err()
})
.await;
if let Some(ref e) = ret {
tracing::error!("Service handler run() failed; {e:?}");
}
let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
let _ = task::spawn_blocking(|| jh.join()).await;
ret
} else {
None
};
sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
ks.trigger();
if (ks.finalize().await).is_err() {
log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
}
let mut tctx = TermCtx {
re,
sr: sr.clone(),
cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_term
};
let term_apperr = super::timeit_async("termination", || async {
rt_handler.shutdown(&mut tctx).await.err()
})
.await;
drop(tctx);
if let Some(ref e) = term_apperr {
tracing::error!("Service handler shutdown() failed; {e:?}");
}
sr.stopped();
log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
let apperrs = AppErrors {
init: init_apperr,
run: run_apperr,
shutdown: term_apperr
};
Err(CbErr::SrvApp(apperrs))?;
}
Ok(())
}
fn init_svc_channels(
ks: &KillSwitch
) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
let (tx, rx) = broadcast::channel(16);
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_shutdown(
move || {
if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
}
},
ks2
));
let txc = tx.clone();
let ks2 = ks.clone();
task::spawn(signals::wait_term(
move || {
let svcevt = SvcEvt::Shutdown(Demise::Terminated);
if let Err(e) = txc.send(svcevt) {
log::error!("Unable to send SvcEvt::Terminate event; {e}");
}
},
ks2
));
#[cfg(unix)]
{
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_reload(
move || {
if let Err(e) = txc.send(SvcEvt::ReloadConf) {
log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
}
},
ks2
));
}
#[cfg(unix)]
{
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_user1(
move || {
if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
}
},
ks2
));
}
#[cfg(unix)]
{
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_user2(
move || {
if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
}
},
ks2
));
}
(tx, rx)
}