runtime_rs/runtime.rs
1use std::marker::PhantomData;
2
3use tokio::task::JoinSet;
4use tracing::{debug, error};
5
6use crate::registry::Registry;
7use crate::state::SharedState;
8
9/// Runtime task manager for runnable providers.
10///
11/// This keeps JoinSet orchestration out of bootstrap and ensures shutdown
12/// is observed immediately via the shared shutdown token.
13pub struct Runtime<S> {
14 join_set: JoinSet<crate::registry::Result<()>>,
15 _state: PhantomData<fn() -> S>,
16}
17
18impl<S> Default for Runtime<S> {
19 fn default() -> Self {
20 Self { join_set: JoinSet::new(), _state: PhantomData }
21 }
22}
23
24impl<S> Runtime<S>
25where
26 S: Clone + Send + 'static,
27{
28 /// Spawn all runnable providers from registry.
29 pub fn spawn_all(
30 &mut self,
31 registry: &Registry<S>,
32 state: S,
33 ) -> usize {
34 registry.run_all(state, &mut self.join_set)
35 }
36}
37
38impl<S> Runtime<S>
39where
40 S: SharedState,
41{
42 /// Run until shutdown is initiated or a critical runnable failure occurs.
43 ///
44 /// Returns:
45 /// - `Ok(())` when the shutdown token is cancelled or all runnables finished.
46 /// - `Err(_)` on critical startup/join failures.
47 pub async fn wait_until_shutdown(
48 &mut self,
49 state: &S,
50 ) -> crate::registry::Result<()> {
51 let shutdown = state.shutdown_token();
52
53 loop {
54 tokio::select! {
55 _ = shutdown.cancelled() => {
56 debug!("runtime observed shutdown signal; leaving runnable wait loop");
57 return Ok(());
58 }
59 res = self.join_set.join_next() => {
60 let Some(res) = res else {
61 // No runnable tasks left.
62 debug!("runtime join set is empty");
63 return Ok(());
64 };
65
66 match res {
67 Ok(Ok(())) => {
68 debug!("a runnable finished cleanly");
69 }
70 // Recoverable failures (best-effort tasks)
71 // dependency shouldn't tear the worker down): the
72 // runnable opted in by returning
73 // `Error::run_continue(...)`. Log and keep serving.
74 Ok(Err(crate::registry::Error::Recoverable { name, source })) => {
75 error!(provider = %name, "runnable failed (worker continuing): {}", source);
76 }
77 // Default policy is fatal: bring the worker down so
78 // the supervisor can respawn deterministically. Any
79 // runnable that wants log+continue must explicitly
80 // opt in via `Error::run_continue`.
81 Ok(Err(e)) => {
82 error!("a runnable failed: {}", e);
83 return Err(e);
84 }
85 Err(join_err) => {
86 return Err(join_err.into());
87 }
88 }
89 }
90 }
91 }
92 }
93}
94
95impl<S> Runtime<S> {
96 /// Abort and drain all remaining runnable tasks.
97 pub async fn abort_and_drain(&mut self) {
98 self.join_set.abort_all();
99 while self.join_set.join_next().await.is_some() {}
100 debug!("runtime aborted and drained remaining runnable tasks");
101 }
102
103 /// Wait for all remaining runnable tasks to finish on their own.
104 ///
105 /// Shutdown-aware listeners do their protocol-level graceful drain inside
106 /// their runnable future. The bootstrap layer must give those futures a
107 /// chance to complete before falling back to `abort_and_drain`.
108 pub async fn drain(&mut self) -> crate::registry::Result<()> {
109 while let Some(res) = self.join_set.join_next().await {
110 match res {
111 Ok(Ok(())) => {
112 debug!("a runnable finished cleanly during drain");
113 }
114 Ok(Err(crate::registry::Error::Recoverable { name, source })) => {
115 error!(provider = %name, "runnable failed during drain (continuing): {}", source);
116 }
117 Ok(Err(e)) => {
118 error!("a runnable failed during drain: {}", e);
119 return Err(e);
120 }
121 Err(join_err) => {
122 return Err(join_err.into());
123 }
124 }
125 }
126 debug!("runtime drained remaining runnable tasks");
127 Ok(())
128 }
129}