use std::{
any::{Any, TypeId},
sync::{Arc, atomic::AtomicU32},
thread
};
use hashbrown::HashMap;
use tokio::{sync::broadcast, task};
use killswitch::KillSwitch;
use crate::{
err::{AppErrors, CbErr},
rt::{
Demise, InitCtx, RocketServiceHandler, RunEnv, ServiceReporter, SvcEvt,
TermCtx, signals
}
};
#[cfg(unix)]
use crate::rt::UserSig;
pub struct MainParams<ApEr>
where
ApEr: Send
{
pub(crate) re: RunEnv,
pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
pub(crate) rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>,
pub(crate) sr: ServiceReporter,
pub(crate) svcevt_ch:
Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
}
pub fn main<ApEr>(params: MainParams<ApEr>) -> Result<(), CbErr<ApEr>>
where
ApEr: Send
{
rocket::execute(rocket_async_main(params))?;
Ok(())
}
#[expect(clippy::too_many_lines)]
async fn rocket_async_main<ApEr>(
MainParams {
re,
svcevt_handler,
mut rt_handler,
sr,
svcevt_ch,
passthrough_init,
mut passthrough_term
}: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>>
where
ApEr: Send
{
let ks = KillSwitch::new();
let start_ts = jiff::Timestamp::now();
let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
{
(tx_svcevt, rx_svcevt)
} else {
init_svc_channels(&ks)
};
sr.starting(0, Some(super::SVCAPP_INIT_MSG));
let mut ictx = InitCtx {
re: re.clone(),
sr: sr.clone(),
cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_init,
passthrough_term: HashMap::new()
};
let (rockets, init_apperr) =
super::timeit_async("initialization", || async {
match rt_handler.init(&mut ictx).await {
Ok(rockets) => (rockets, None),
Err(e) => (Vec::new(), Some(e))
}
})
.await;
for (k, v) in ictx.passthrough_term.drain() {
passthrough_term.insert(k, v);
}
drop(ictx);
let mut ignited = vec![];
let mut rocket_shutdowns = vec![];
for rocket in rockets {
let rocket = rocket.ignite().await?;
rocket_shutdowns.push(rocket.shutdown());
ignited.push(rocket);
}
let run_apperr = if init_apperr.is_none() {
sr.started();
let mut rx_svcevt2 = rx_svcevt.resubscribe();
let jh_graceful_landing = task::spawn(async move {
loop {
let msg = match rx_svcevt2.recv().await {
Ok(msg) => msg,
Err(e) => {
log::error!("Unable to receive broadcast SvcEvt message, {e}");
break;
}
};
if let SvcEvt::Shutdown(_) = msg {
tracing::trace!("Ask rocket instances to shut down gracefully");
for shutdown in rocket_shutdowns {
shutdown.notify();
}
break;
}
tracing::trace!("Ignored message in task waiting for shutdown");
}
});
let jh = thread::Builder::new()
.name("svcevt".into())
.spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
.unwrap();
let res = super::timeit_async("main", || async {
rt_handler.run(ignited, &re).await.err()
})
.await;
let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
if let Err(e) = jh_graceful_landing.await {
log::warn!("An error was returned from the graceful landing task; {e}");
}
let _ = task::spawn_blocking(|| jh.join()).await;
res
} else {
None
};
sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
ks.trigger();
if (ks.finalize().await).is_err() {
log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
}
let mut tctx = TermCtx {
re,
sr: sr.clone(),
cnt: Arc::new(AtomicU32::new(1)), passthrough: passthrough_term
};
let term_apperr = super::timeit_async("termination", || async {
rt_handler.shutdown(&mut tctx).await.err()
})
.await;
drop(tctx);
sr.stopped();
log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
let apperrs = AppErrors {
init: init_apperr,
run: run_apperr,
shutdown: term_apperr
};
Err(CbErr::SrvApp(apperrs))?;
}
Ok(())
}
fn init_svc_channels(
ks: &KillSwitch
) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
let (tx, rx) = broadcast::channel(16);
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_shutdown(
move || {
if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
log::error!("Unable to send SvcEvt::Shutdown event; {e}");
}
},
ks2
));
let txc = tx.clone();
let ks2 = ks.clone();
task::spawn(signals::wait_term(
move || {
if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Terminated)) {
log::error!("Unable to send SvcEvt::Terminate event; {e}");
}
},
ks2
));
#[cfg(unix)]
{
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_reload(
move || {
if let Err(e) = txc.send(SvcEvt::ReloadConf) {
log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
}
},
ks2
));
}
#[cfg(unix)]
{
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_user1(
move || {
if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
}
},
ks2
));
}
#[cfg(unix)]
{
let ks2 = ks.clone();
let txc = tx.clone();
task::spawn(signals::wait_user2(
move || {
if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
}
},
ks2
));
}
(tx, rx)
}