Skip to main content

runtime_rs/
runtime.rs

1use std::marker::PhantomData;
2
3use tokio::task::JoinSet;
4use tracing::{debug, error};
5
6use crate::registry::Registry;
7use crate::state::SharedState;
8
9/// Runtime task manager for runnable providers.
10///
11/// This keeps JoinSet orchestration out of bootstrap and ensures shutdown
12/// is observed immediately via the shared shutdown token.
13pub struct Runtime<S> {
14    join_set: JoinSet<crate::registry::Result<()>>,
15    _state: PhantomData<fn() -> S>,
16}
17
18impl<S> Default for Runtime<S> {
19    fn default() -> Self {
20        Self { join_set: JoinSet::new(), _state: PhantomData }
21    }
22}
23
24impl<S> Runtime<S>
25where
26    S: Clone + Send + 'static,
27{
28    /// Spawn all runnable providers from registry.
29    pub fn spawn_all(
30        &mut self,
31        registry: &Registry<S>,
32        state: S,
33    ) -> usize {
34        registry.run_all(state, &mut self.join_set)
35    }
36}
37
38impl<S> Runtime<S>
39where
40    S: SharedState,
41{
42    /// Run until shutdown is initiated or a critical runnable failure occurs.
43    ///
44    /// Returns:
45    /// - `Ok(())` when the shutdown token is cancelled or all runnables finished.
46    /// - `Err(_)` on critical startup/join failures.
47    pub async fn wait_until_shutdown(
48        &mut self,
49        state: &S,
50    ) -> crate::registry::Result<()> {
51        let shutdown = state.shutdown_token();
52
53        loop {
54            tokio::select! {
55                _ = shutdown.cancelled() => {
56                    debug!("runtime observed shutdown signal; leaving runnable wait loop");
57                    return Ok(());
58                }
59                res = self.join_set.join_next() => {
60                    let Some(res) = res else {
61                        // No runnable tasks left.
62                        debug!("runtime join set is empty");
63                        return Ok(());
64                    };
65
66                    match res {
67                        Ok(Ok(())) => {
68                            debug!("a runnable finished cleanly");
69                        }
70                        // Recoverable failures (best-effort tasks)
71                        // dependency shouldn't tear the worker down): the
72                        // runnable opted in by returning
73                        // `Error::run_continue(...)`. Log and keep serving.
74                        Ok(Err(crate::registry::Error::Recoverable { name, source })) => {
75                            error!(provider = %name, "runnable failed (worker continuing): {}", source);
76                        }
77                        // Default policy is fatal: bring the worker down so
78                        // the supervisor can respawn deterministically. Any
79                        // runnable that wants log+continue must explicitly
80                        // opt in via `Error::run_continue`.
81                        Ok(Err(e)) => {
82                            error!("a runnable failed: {}", e);
83                            return Err(e);
84                        }
85                        Err(join_err) => {
86                            return Err(join_err.into());
87                        }
88                    }
89                }
90            }
91        }
92    }
93}
94
95impl<S> Runtime<S> {
96    /// Abort and drain all remaining runnable tasks.
97    pub async fn abort_and_drain(&mut self) {
98        self.join_set.abort_all();
99        while self.join_set.join_next().await.is_some() {}
100        debug!("runtime aborted and drained remaining runnable tasks");
101    }
102
103    /// Wait for all remaining runnable tasks to finish on their own.
104    ///
105    /// Shutdown-aware listeners do their protocol-level graceful drain inside
106    /// their runnable future. The bootstrap layer must give those futures a
107    /// chance to complete before falling back to `abort_and_drain`.
108    pub async fn drain(&mut self) -> crate::registry::Result<()> {
109        while let Some(res) = self.join_set.join_next().await {
110            match res {
111                Ok(Ok(())) => {
112                    debug!("a runnable finished cleanly during drain");
113                }
114                Ok(Err(crate::registry::Error::Recoverable { name, source })) => {
115                    error!(provider = %name, "runnable failed during drain (continuing): {}", source);
116                }
117                Ok(Err(e)) => {
118                    error!("a runnable failed during drain: {}", e);
119                    return Err(e);
120                }
121                Err(join_err) => {
122                    return Err(join_err.into());
123                }
124            }
125        }
126        debug!("runtime drained remaining runnable tasks");
127        Ok(())
128    }
129}