nodedb_cluster/subsystem/
registry.rs1use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use super::context::BootstrapCtx;
10use super::errors::{BootstrapError, ShutdownError};
11use super::health::ClusterHealth;
12use super::topo_sort::topo_sort;
13use super::r#trait::{ClusterSubsystem, SubsystemHandle};
14
15#[derive(Default)]
17pub struct SubsystemRegistry {
18 subsystems: Vec<Arc<dyn ClusterSubsystem>>,
19}
20
21impl SubsystemRegistry {
22 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn is_empty(&self) -> bool {
29 self.subsystems.is_empty()
30 }
31
32 pub fn register(&mut self, subsystem: Arc<dyn ClusterSubsystem>) {
37 self.subsystems.push(subsystem);
38 }
39
40 pub async fn start_all(&self, ctx: &BootstrapCtx) -> Result<RunningCluster, BootstrapError> {
48 let order = topo_sort(&self.subsystems)?;
49
50 let mut handles: Vec<SubsystemHandle> = Vec::with_capacity(order.len());
51 let mut started_names: Vec<&'static str> = Vec::with_capacity(order.len());
52
53 for idx in &order {
54 let subsystem = &self.subsystems[*idx];
55 let name = subsystem.name();
56
57 ctx.health
58 .set(name, crate::subsystem::health::SubsystemHealth::Starting)
59 .await;
60
61 match subsystem.start(ctx).await {
62 Ok(handle) => {
63 ctx.health
64 .set(name, crate::subsystem::health::SubsystemHealth::Running)
65 .await;
66 handles.push(handle);
67 started_names.push(name);
68 }
69 Err(start_err) => {
70 ctx.health
71 .set(
72 name,
73 crate::subsystem::health::SubsystemHealth::Failed {
74 reason: start_err.to_string(),
75 },
76 )
77 .await;
78
79 let shutdown_deadline = Instant::now() + SHUTDOWN_CLEANUP_DEADLINE;
81 let mut shutdown_errors = Vec::new();
82
83 for handle in handles.drain(..).rev() {
85 if let Err(e) = handle.shutdown_and_wait(shutdown_deadline).await {
86 shutdown_errors.push(e);
87 }
88 }
89
90 if shutdown_errors.is_empty() {
91 return Err(start_err);
92 } else {
93 return Err(BootstrapError::StartAndShutdownFailure {
94 name,
95 shutdown_errors,
96 });
97 }
98 }
99 }
100 }
101
102 Ok(RunningCluster {
103 handles,
104 health: ctx.health.clone(),
105 })
106 }
107}
108
109const SHUTDOWN_CLEANUP_DEADLINE: Duration = Duration::from_secs(10);
111
112pub struct RunningCluster {
115 pub handles: Vec<SubsystemHandle>,
117 pub health: ClusterHealth,
119}
120
121impl RunningCluster {
122 pub async fn shutdown_all(self, per_subsystem_deadline: Duration) -> Vec<ShutdownError> {
127 let mut errors = Vec::new();
128 for handle in self.handles.into_iter().rev() {
129 let deadline = Instant::now() + per_subsystem_deadline;
130 if let Err(e) = handle.shutdown_and_wait(deadline).await {
131 errors.push(e);
132 }
133 }
134 errors
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use std::sync::Arc;
141 use std::time::Instant;
142
143 use async_trait::async_trait;
144 use tokio::sync::watch;
145
146 use super::*;
147 use crate::subsystem::context::BootstrapCtx;
148 use crate::subsystem::errors::{BootstrapError, ShutdownError};
149 use crate::subsystem::health::SubsystemHealth;
150 use crate::subsystem::r#trait::{ClusterSubsystem, SubsystemHandle};
151
152 struct NamedSubsystem {
155 name: &'static str,
156 deps: &'static [&'static str],
157 }
158
159 #[async_trait]
160 impl ClusterSubsystem for NamedSubsystem {
161 fn name(&self) -> &'static str {
162 self.name
163 }
164 fn dependencies(&self) -> &'static [&'static str] {
165 self.deps
166 }
167 async fn start(&self, _ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
168 let (tx, _rx) = watch::channel(false);
169 let handle = tokio::spawn(async {});
170 Ok(SubsystemHandle::new(self.name, handle, tx))
171 }
172 async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
173 Ok(())
174 }
175 fn health(&self) -> SubsystemHealth {
176 SubsystemHealth::Running
177 }
178 }
179
180 struct FailingSubsystem {
181 name: &'static str,
182 }
183
184 #[async_trait]
185 impl ClusterSubsystem for FailingSubsystem {
186 fn name(&self) -> &'static str {
187 self.name
188 }
189 fn dependencies(&self) -> &'static [&'static str] {
190 &[]
191 }
192 async fn start(&self, _ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
193 Err(BootstrapError::SubsystemStart {
194 name: self.name,
195 cause: "intentional test failure".into(),
196 })
197 }
198 async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
199 Ok(())
200 }
201 fn health(&self) -> SubsystemHealth {
202 SubsystemHealth::Failed {
203 reason: "intentional".into(),
204 }
205 }
206 }
207
208 #[test]
212 fn empty_registry_topo_sorts_cleanly() {
213 let registry = SubsystemRegistry::new();
214 let order = topo_sort(®istry.subsystems).unwrap();
215 assert!(order.is_empty());
216 }
217
218 #[test]
221 fn registry_topo_sorts_dependencies_correctly() {
222 let mut registry = SubsystemRegistry::new();
223 registry.register(Arc::new(NamedSubsystem {
225 name: "beta",
226 deps: &["alpha"],
227 }));
228 registry.register(Arc::new(NamedSubsystem {
229 name: "alpha",
230 deps: &[],
231 }));
232
233 let order = topo_sort(®istry.subsystems).unwrap();
234 let alpha_pos = order
235 .iter()
236 .position(|&i| registry.subsystems[i].name() == "alpha")
237 .unwrap();
238 let beta_pos = order
239 .iter()
240 .position(|&i| registry.subsystems[i].name() == "beta")
241 .unwrap();
242 assert!(alpha_pos < beta_pos, "alpha must precede beta");
243 }
244
245 #[tokio::test]
249 async fn failing_subsystem_start_returns_bootstrap_error() {
250 let mut registry = SubsystemRegistry::new();
254 registry.register(Arc::new(FailingSubsystem { name: "broken" }));
255
256 let order = topo_sort(®istry.subsystems).unwrap();
257 assert_eq!(order.len(), 1, "one subsystem registered");
258 }
259}