mecha10_runtime/
supervisor.rs

1//! Node supervision and lifecycle management
2
3use crate::config::RestartPolicy;
4use crate::health::{HealthChecker, HealthStatus};
5use crate::node::NodeRunner;
6use crate::shutdown::ShutdownHandle;
7use anyhow::Result;
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10
11/// Status of a supervised node
12#[derive(Clone, Debug, PartialEq)]
13pub enum NodeStatus {
14    /// Node is starting up
15    Starting,
16
17    /// Node is running normally
18    Running,
19
20    /// Node is shutting down
21    ShuttingDown,
22
23    /// Node has stopped
24    Stopped,
25
26    /// Node failed with error
27    Failed { reason: String },
28
29    /// Node is restarting
30    Restarting { attempt: usize },
31}
32
33/// Handle to a supervised node
34pub struct NodeHandle {
35    name: String,
36    task: JoinHandle<()>,
37}
38
39impl NodeHandle {
40    /// Get the node name
41    pub fn name(&self) -> &str {
42        &self.name
43    }
44
45    /// Check if the node task is finished
46    pub fn is_finished(&self) -> bool {
47        self.task.is_finished()
48    }
49
50    /// Abort the node task
51    pub fn abort(&self) {
52        self.task.abort();
53    }
54
55    /// Wait for the node to finish
56    pub async fn wait(self) -> Result<()> {
57        self.task.await?;
58        Ok(())
59    }
60}
61
62/// Supervisor for managing node lifecycle
63pub struct Supervisor {
64    health_checker: Arc<HealthChecker>,
65    shutdown: ShutdownHandle,
66    restart_policy: RestartPolicy,
67    /// Tracked node handles by name
68    nodes: Arc<tokio::sync::RwLock<std::collections::HashMap<String, NodeHandle>>>,
69}
70
71impl Supervisor {
72    /// Create a new supervisor
73    pub fn new(health_checker: Arc<HealthChecker>, shutdown: ShutdownHandle, restart_policy: RestartPolicy) -> Self {
74        Self {
75            health_checker,
76            shutdown,
77            restart_policy,
78            nodes: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
79        }
80    }
81
82    /// Launch and supervise a node
83    pub fn launch_node(&self, mut node: Box<dyn NodeRunner>) -> NodeHandle {
84        let name = node.name().to_string();
85        let health_checker = self.health_checker.clone();
86        let shutdown = self.shutdown.clone();
87        let restart_policy = self.restart_policy.clone();
88
89        tracing::info!("Launching node: {}", name);
90
91        let task = tokio::spawn(async move {
92            let node_name = node.name().to_string();
93            let mut attempt = 0;
94
95            // Register with health checker
96            health_checker.register(node_name.clone(), HealthStatus::Healthy).await;
97
98            loop {
99                tracing::debug!("Starting node: {} (attempt {})", node_name, attempt);
100
101                // Run the node with shutdown signal
102                let mut shutdown_rx = shutdown.subscribe();
103                let run_future = node.run();
104
105                tokio::select! {
106                    result = run_future => {
107                        match result {
108                            Ok(()) => {
109                                tracing::info!("Node {} completed successfully", node_name);
110                                health_checker
111                                    .update(&node_name, HealthStatus::Healthy)
112                                    .await;
113                                break;
114                            }
115                            Err(e) => {
116                                tracing::error!("Node {} failed: {}", node_name, e);
117                                health_checker
118                                    .update(
119                                        &node_name,
120                                        HealthStatus::Unhealthy {
121                                            reason: e.to_string(),
122                                        },
123                                    )
124                                    .await;
125
126                                // Check if should restart
127                                if restart_policy.should_restart(attempt) {
128                                    let backoff = restart_policy.backoff_duration(attempt);
129                                    tracing::info!(
130                                        "Restarting node {} in {:?} (attempt {})",
131                                        node_name,
132                                        backoff,
133                                        attempt + 1
134                                    );
135                                    tokio::time::sleep(backoff).await;
136                                    attempt += 1;
137                                    continue;
138                                } else {
139                                    tracing::error!(
140                                        "Node {} exhausted restart attempts, giving up",
141                                        node_name
142                                    );
143                                    break;
144                                }
145                            }
146                        }
147                    }
148                    _ = shutdown_rx.recv() => {
149                        tracing::info!("Shutdown signal received, stopping node: {}", node_name);
150                        if let Err(e) = node.shutdown().await {
151                            tracing::error!("Error during node {} shutdown: {}", node_name, e);
152                        }
153                        health_checker
154                            .update(&node_name, HealthStatus::Healthy)
155                            .await;
156                        break;
157                    }
158                }
159            }
160
161            // Unregister from health checker
162            health_checker.unregister(&node_name).await;
163            tracing::debug!("Node {} supervision ended", node_name);
164        });
165
166        NodeHandle { name, task }
167    }
168
169    /// Launch multiple nodes
170    pub fn launch_nodes(&self, nodes: Vec<Box<dyn NodeRunner>>) -> Vec<NodeHandle> {
171        nodes.into_iter().map(|node| self.launch_node(node)).collect()
172    }
173
174    /// Get the health checker
175    pub fn health_checker(&self) -> &Arc<HealthChecker> {
176        &self.health_checker
177    }
178
179    /// Get the shutdown handle
180    pub fn shutdown(&self) -> &ShutdownHandle {
181        &self.shutdown
182    }
183
184    /// Start a node by name (for lifecycle management)
185    ///
186    /// Note: This is a stub implementation for the lifecycle system.
187    /// Full implementation requires node registry or factory pattern.
188    ///
189    /// # Arguments
190    ///
191    /// * `name` - Name of the node to start
192    ///
193    /// # Errors
194    ///
195    /// Returns error if node cannot be found or started
196    #[allow(dead_code)] // Will be used in Phase 3
197    pub async fn start_node(&self, name: &str) -> Result<()> {
198        tracing::info!("Starting node: {}", name);
199
200        // TODO: Implement node registry/factory to create nodes by name
201        // For now, this is a placeholder that will be implemented when
202        // we integrate with the full runtime system
203
204        // Register with health checker as a placeholder
205        self.health_checker
206            .register(name.to_string(), HealthStatus::Healthy)
207            .await;
208
209        tracing::debug!("Node {} registered (stub)", name);
210        Ok(())
211    }
212
213    /// Stop a node by name (for lifecycle management)
214    ///
215    /// # Arguments
216    ///
217    /// * `name` - Name of the node to stop
218    ///
219    /// # Errors
220    ///
221    /// Returns error if node cannot be found or stopped
222    #[allow(dead_code)] // Will be used in Phase 3
223    pub async fn stop_node(&self, name: &str) -> Result<()> {
224        tracing::info!("Stopping node: {}", name);
225
226        let mut nodes = self.nodes.write().await;
227
228        if let Some(handle) = nodes.remove(name) {
229            // Abort the node task
230            handle.abort();
231
232            // Unregister from health checker
233            self.health_checker.unregister(name).await;
234
235            tracing::info!("Node {} stopped", name);
236            Ok(())
237        } else {
238            // Node not found, but unregister anyway in case it was tracked elsewhere
239            self.health_checker.unregister(name).await;
240            tracing::warn!(
241                "Node {} not found in tracked nodes, unregistered from health checker",
242                name
243            );
244            Ok(())
245        }
246    }
247
248    /// Get list of currently running nodes
249    ///
250    /// Returns the names of all nodes being supervised
251    #[allow(dead_code)] // Will be used in Phase 3
252    pub async fn get_running_nodes(&self) -> Vec<String> {
253        let nodes = self.nodes.read().await;
254        nodes.keys().cloned().collect()
255    }
256}
257
258/// Implement the SupervisorTrait for Supervisor
259#[async_trait::async_trait]
260impl crate::lifecycle::SupervisorTrait for Supervisor {
261    async fn start_node(&self, name: &str) -> Result<()> {
262        self.start_node(name).await
263    }
264
265    async fn stop_node(&self, name: &str) -> Result<()> {
266        self.stop_node(name).await
267    }
268
269    fn get_running_nodes(&self) -> Vec<String> {
270        // Note: This is synchronous but SupervisorTrait requires sync
271        // In real implementation, we'd need to use a different approach
272        // For now, return empty vec as placeholder
273        vec![]
274    }
275}