mecha10_runtime/
supervisor.rs1use crate::config::RestartPolicy;
4use crate::health::{HealthChecker, HealthStatus};
5use crate::node::NodeRunner;
6use crate::shutdown::ShutdownHandle;
7use anyhow::Result;
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10
11#[derive(Clone, Debug, PartialEq)]
13pub enum NodeStatus {
14 Starting,
16
17 Running,
19
20 ShuttingDown,
22
23 Stopped,
25
26 Failed { reason: String },
28
29 Restarting { attempt: usize },
31}
32
33pub struct NodeHandle {
35 name: String,
36 task: JoinHandle<()>,
37}
38
39impl NodeHandle {
40 pub fn name(&self) -> &str {
42 &self.name
43 }
44
45 pub fn is_finished(&self) -> bool {
47 self.task.is_finished()
48 }
49
50 pub fn abort(&self) {
52 self.task.abort();
53 }
54
55 pub async fn wait(self) -> Result<()> {
57 self.task.await?;
58 Ok(())
59 }
60}
61
62pub struct Supervisor {
64 health_checker: Arc<HealthChecker>,
65 shutdown: ShutdownHandle,
66 restart_policy: RestartPolicy,
67 nodes: Arc<tokio::sync::RwLock<std::collections::HashMap<String, NodeHandle>>>,
69}
70
71impl Supervisor {
72 pub fn new(health_checker: Arc<HealthChecker>, shutdown: ShutdownHandle, restart_policy: RestartPolicy) -> Self {
74 Self {
75 health_checker,
76 shutdown,
77 restart_policy,
78 nodes: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
79 }
80 }
81
82 pub fn launch_node(&self, mut node: Box<dyn NodeRunner>) -> NodeHandle {
84 let name = node.name().to_string();
85 let health_checker = self.health_checker.clone();
86 let shutdown = self.shutdown.clone();
87 let restart_policy = self.restart_policy.clone();
88
89 tracing::info!("Launching node: {}", name);
90
91 let task = tokio::spawn(async move {
92 let node_name = node.name().to_string();
93 let mut attempt = 0;
94
95 health_checker.register(node_name.clone(), HealthStatus::Healthy).await;
97
98 loop {
99 tracing::debug!("Starting node: {} (attempt {})", node_name, attempt);
100
101 let mut shutdown_rx = shutdown.subscribe();
103 let run_future = node.run();
104
105 tokio::select! {
106 result = run_future => {
107 match result {
108 Ok(()) => {
109 tracing::info!("Node {} completed successfully", node_name);
110 health_checker
111 .update(&node_name, HealthStatus::Healthy)
112 .await;
113 break;
114 }
115 Err(e) => {
116 tracing::error!("Node {} failed: {}", node_name, e);
117 health_checker
118 .update(
119 &node_name,
120 HealthStatus::Unhealthy {
121 reason: e.to_string(),
122 },
123 )
124 .await;
125
126 if restart_policy.should_restart(attempt) {
128 let backoff = restart_policy.backoff_duration(attempt);
129 tracing::info!(
130 "Restarting node {} in {:?} (attempt {})",
131 node_name,
132 backoff,
133 attempt + 1
134 );
135 tokio::time::sleep(backoff).await;
136 attempt += 1;
137 continue;
138 } else {
139 tracing::error!(
140 "Node {} exhausted restart attempts, giving up",
141 node_name
142 );
143 break;
144 }
145 }
146 }
147 }
148 _ = shutdown_rx.recv() => {
149 tracing::info!("Shutdown signal received, stopping node: {}", node_name);
150 if let Err(e) = node.shutdown().await {
151 tracing::error!("Error during node {} shutdown: {}", node_name, e);
152 }
153 health_checker
154 .update(&node_name, HealthStatus::Healthy)
155 .await;
156 break;
157 }
158 }
159 }
160
161 health_checker.unregister(&node_name).await;
163 tracing::debug!("Node {} supervision ended", node_name);
164 });
165
166 NodeHandle { name, task }
167 }
168
169 pub fn launch_nodes(&self, nodes: Vec<Box<dyn NodeRunner>>) -> Vec<NodeHandle> {
171 nodes.into_iter().map(|node| self.launch_node(node)).collect()
172 }
173
174 pub fn health_checker(&self) -> &Arc<HealthChecker> {
176 &self.health_checker
177 }
178
179 pub fn shutdown(&self) -> &ShutdownHandle {
181 &self.shutdown
182 }
183
184 #[allow(dead_code)] pub async fn start_node(&self, name: &str) -> Result<()> {
198 tracing::info!("Starting node: {}", name);
199
200 self.health_checker
206 .register(name.to_string(), HealthStatus::Healthy)
207 .await;
208
209 tracing::debug!("Node {} registered (stub)", name);
210 Ok(())
211 }
212
213 #[allow(dead_code)] pub async fn stop_node(&self, name: &str) -> Result<()> {
224 tracing::info!("Stopping node: {}", name);
225
226 let mut nodes = self.nodes.write().await;
227
228 if let Some(handle) = nodes.remove(name) {
229 handle.abort();
231
232 self.health_checker.unregister(name).await;
234
235 tracing::info!("Node {} stopped", name);
236 Ok(())
237 } else {
238 self.health_checker.unregister(name).await;
240 tracing::warn!(
241 "Node {} not found in tracked nodes, unregistered from health checker",
242 name
243 );
244 Ok(())
245 }
246 }
247
248 #[allow(dead_code)] pub async fn get_running_nodes(&self) -> Vec<String> {
253 let nodes = self.nodes.read().await;
254 nodes.keys().cloned().collect()
255 }
256}
257
258#[async_trait::async_trait]
260impl crate::lifecycle::SupervisorTrait for Supervisor {
261 async fn start_node(&self, name: &str) -> Result<()> {
262 self.start_node(name).await
263 }
264
265 async fn stop_node(&self, name: &str) -> Result<()> {
266 self.stop_node(name).await
267 }
268
269 fn get_running_nodes(&self) -> Vec<String> {
270 vec![]
274 }
275}