use std::marker::PhantomData;
use tokio::task::JoinSet;
use tracing::{debug, error};
use crate::registry::Registry;
use crate::state::SharedState;
pub struct Runtime<S> {
join_set: JoinSet<crate::registry::Result<()>>,
_state: PhantomData<fn() -> S>,
}
impl<S> Default for Runtime<S> {
fn default() -> Self {
Self { join_set: JoinSet::new(), _state: PhantomData }
}
}
impl<S> Runtime<S>
where
S: Clone + Send + 'static,
{
pub fn spawn_all(
&mut self,
registry: &Registry<S>,
state: S,
) -> usize {
registry.run_all(state, &mut self.join_set)
}
}
impl<S> Runtime<S>
where
S: SharedState,
{
pub async fn wait_until_shutdown(
&mut self,
state: &S,
) -> crate::registry::Result<()> {
let shutdown = state.shutdown_token();
loop {
tokio::select! {
_ = shutdown.cancelled() => {
debug!("runtime observed shutdown signal; leaving runnable wait loop");
return Ok(());
}
res = self.join_set.join_next() => {
let Some(res) = res else {
debug!("runtime join set is empty");
return Ok(());
};
match res {
Ok(Ok(())) => {
debug!("a runnable finished cleanly");
}
Ok(Err(crate::registry::Error::Recoverable { name, source })) => {
error!(provider = %name, "runnable failed (worker continuing): {}", source);
}
Ok(Err(e)) => {
error!("a runnable failed: {}", e);
return Err(e);
}
Err(join_err) => {
return Err(join_err.into());
}
}
}
}
}
}
}
impl<S> Runtime<S> {
pub async fn abort_and_drain(&mut self) {
self.join_set.abort_all();
while self.join_set.join_next().await.is_some() {}
debug!("runtime aborted and drained remaining runnable tasks");
}
pub async fn drain(&mut self) -> crate::registry::Result<()> {
while let Some(res) = self.join_set.join_next().await {
match res {
Ok(Ok(())) => {
debug!("a runnable finished cleanly during drain");
}
Ok(Err(crate::registry::Error::Recoverable { name, source })) => {
error!(provider = %name, "runnable failed during drain (continuing): {}", source);
}
Ok(Err(e)) => {
error!("a runnable failed during drain: {}", e);
return Err(e);
}
Err(join_err) => {
return Err(join_err.into());
}
}
}
debug!("runtime drained remaining runnable tasks");
Ok(())
}
}