1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio::time::MissedTickBehavior;
8use tokio_util::sync::CancellationToken;
9
10use crate::error::{Error, Result};
11use crate::node::binary::extract_version;
12use crate::node::events::NodeEvent;
13use crate::node::process::spawn::spawn_node;
14use crate::node::registry::NodeRegistry;
15use crate::node::types::{
16 NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
17};
18
19pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
21
22pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
30
31fn node_pid_file(data_dir: &Path) -> PathBuf {
34 data_dir.join("node.pid")
35}
36
37fn write_node_pid(data_dir: &Path, pid: u32) {
41 let path = node_pid_file(data_dir);
42 if let Err(e) = std::fs::write(&path, pid.to_string()) {
43 tracing::warn!(
44 "Failed to write node pid file at {}: {e}. Node will still run, but a future \
45 daemon restart will not be able to adopt it.",
46 path.display()
47 );
48 }
49}
50
51fn remove_node_pid(data_dir: &Path) {
54 let _ = std::fs::remove_file(node_pid_file(data_dir));
55}
56
57fn read_node_pid(data_dir: &Path) -> Option<u32> {
60 std::fs::read_to_string(node_pid_file(data_dir))
61 .ok()
62 .and_then(|s| s.trim().parse().ok())
63}
64
65fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
82 let target_data_dir = config.data_dir.as_path();
83 for (pid, process) in sys.processes() {
84 if process.thread_kind().is_some() {
89 continue;
90 }
91 let Some(exe) = process.exe() else {
92 continue;
93 };
94 if exe != config.binary_path.as_path() {
95 continue;
96 }
97
98 let cmd = process.cmd();
99 let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
100 let arg = arg.to_string_lossy();
101 if let Some(value) = arg.strip_prefix("--root-dir=") {
102 Path::new(value) == target_data_dir
103 } else if arg == "--root-dir" {
104 cmd.get(i + 1)
105 .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
106 .unwrap_or(false)
107 } else {
108 false
109 }
110 });
111
112 if matches_root_dir {
113 return Some(pid.as_u32());
114 }
115 }
116 None
117}
118
119fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
128 if !is_process_alive(pid) {
129 return false;
130 }
131 match sys.process(sysinfo::Pid::from_u32(pid)) {
132 Some(process) => process.thread_kind().is_none(),
133 None => true,
134 }
135}
136
137fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
143 if let Some(pid) = read_node_pid(&config.data_dir) {
144 if pid_is_live_process(pid, sys) {
145 return Some(pid);
146 }
147 remove_node_pid(&config.data_dir);
151 }
152
153 let pid = find_running_node_process(sys, config)?;
154 write_node_pid(&config.data_dir, pid);
155 Some(pid)
156}
157
158fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
170 let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
171 let now_secs = std::time::SystemTime::now()
172 .duration_since(std::time::UNIX_EPOCH)
173 .ok()?
174 .as_secs();
175 let age = now_secs.saturating_sub(start_secs);
176 Instant::now().checked_sub(Duration::from_secs(age))
177}
178
179const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
181
182const CRASH_WINDOW: Duration = Duration::from_secs(300); const STABLE_DURATION: Duration = Duration::from_secs(300); const MAX_BACKOFF: Duration = Duration::from_secs(60);
191
192pub struct Supervisor {
194 event_tx: broadcast::Sender<NodeEvent>,
195 node_states: HashMap<u32, NodeRuntime>,
197}
198
199struct NodeRuntime {
200 status: NodeStatus,
201 pid: Option<u32>,
202 started_at: Option<Instant>,
203 restart_count: u32,
204 first_crash_at: Option<Instant>,
205 pending_version: Option<String>,
207}
208
209impl Supervisor {
210 pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
211 Self {
212 event_tx,
213 node_states: HashMap::new(),
214 }
215 }
216
217 pub async fn start_node(
222 &mut self,
223 config: &NodeConfig,
224 supervisor_ref: Arc<RwLock<Supervisor>>,
225 registry_ref: Arc<RwLock<NodeRegistry>>,
226 ) -> Result<NodeStarted> {
227 let node_id = config.id;
228
229 if let Some(state) = self.node_states.get(&node_id) {
230 if state.status == NodeStatus::Running {
231 return Err(Error::NodeAlreadyRunning(node_id));
232 }
233 }
234
235 let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
236
237 let mut child = spawn_node_from_config(config).await?;
238 let pid = child
239 .id()
240 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
241
242 match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
247 Ok(Ok(exit_status)) => {
248 let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
252 let stderr_path = spawn_log_dir.join("stderr.log");
253 let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
254 let detail = if stderr_msg.trim().is_empty() {
255 format!("exit code: {exit_status}")
256 } else {
257 stderr_msg.trim().to_string()
258 };
259 self.node_states.insert(
260 node_id,
261 NodeRuntime {
262 status: NodeStatus::Errored,
263 pid: None,
264 started_at: None,
265 restart_count: 0,
266 first_crash_at: None,
267 pending_version: None,
268 },
269 );
270 return Err(Error::ProcessSpawn(format!(
271 "Node {node_id} exited immediately: {detail}"
272 )));
273 }
274 Ok(Err(e)) => {
275 return Err(Error::ProcessSpawn(format!(
276 "Failed to check node process status: {e}"
277 )));
278 }
279 Err(_) => {} }
281
282 self.node_states.insert(
283 node_id,
284 NodeRuntime {
285 status: NodeStatus::Running,
286 pid: Some(pid),
287 started_at: Some(Instant::now()),
288 restart_count: 0,
289 first_crash_at: None,
290 pending_version: None,
291 },
292 );
293
294 let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
295
296 let result = NodeStarted {
297 node_id,
298 service_name: config.service_name.clone(),
299 pid,
300 };
301
302 let event_tx = self.event_tx.clone();
304 let config = config.clone();
305 tokio::spawn(async move {
306 monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
307 });
308
309 Ok(result)
310 }
311
312 pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
318 let state = self
319 .node_states
320 .get_mut(&node_id)
321 .ok_or(Error::NodeNotFound(node_id))?;
322
323 if state.status != NodeStatus::Running {
324 return Err(Error::NodeNotRunning(node_id));
325 }
326
327 let pid = state.pid;
328
329 let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
330 state.status = NodeStatus::Stopping;
331
332 if let Some(pid) = pid {
333 graceful_kill(pid).await;
334 }
335
336 let state = self.node_states.get_mut(&node_id).unwrap();
338 state.status = NodeStatus::Stopped;
339 state.pid = None;
340 state.started_at = None;
341
342 let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
343
344 Ok(())
345 }
346
347 pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
349 let mut stopped = Vec::new();
350 let mut failed = Vec::new();
351 let mut already_stopped = Vec::new();
352
353 for (node_id, service_name) in configs {
354 let node_id = *node_id;
355 match self.node_status(node_id) {
356 Ok(NodeStatus::Running) => {}
357 Ok(_) => {
358 already_stopped.push(node_id);
359 continue;
360 }
361 Err(_) => {
362 already_stopped.push(node_id);
363 continue;
364 }
365 }
366
367 match self.stop_node(node_id).await {
368 Ok(()) => {
369 stopped.push(NodeStopped {
370 node_id,
371 service_name: service_name.clone(),
372 });
373 }
374 Err(Error::NodeNotRunning(_)) => {
375 already_stopped.push(node_id);
376 }
377 Err(e) => {
378 failed.push(NodeStopFailed {
379 node_id,
380 service_name: service_name.clone(),
381 error: e.to_string(),
382 });
383 }
384 }
385 }
386
387 StopNodeResult {
388 stopped,
389 failed,
390 already_stopped,
391 }
392 }
393
394 pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
396 self.node_states
397 .get(&node_id)
398 .map(|s| s.status)
399 .ok_or(Error::NodeNotFound(node_id))
400 }
401
402 pub fn node_pid(&self, node_id: u32) -> Option<u32> {
404 self.node_states.get(&node_id).and_then(|s| s.pid)
405 }
406
407 pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
409 self.node_states
410 .get(&node_id)
411 .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
412 }
413
414 pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
416 self.node_states
417 .get(&node_id)
418 .and_then(|s| s.pending_version.clone())
419 }
420
421 fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
427 let Some(state) = self.node_states.get_mut(&node_id) else {
428 return false;
429 };
430 if state.status != NodeStatus::Running {
431 return false;
432 }
433 state.status = NodeStatus::UpgradeScheduled;
434 state.pending_version = Some(pending_version.clone());
435 let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
436 node_id,
437 pending_version,
438 });
439 true
440 }
441
442 pub fn is_running(&self, node_id: u32) -> bool {
444 self.node_states
445 .get(&node_id)
446 .is_some_and(|s| s.status == NodeStatus::Running)
447 }
448
449 pub fn node_counts(&self) -> (u32, u32, u32) {
451 let mut running = 0u32;
452 let mut stopped = 0u32;
453 let mut errored = 0u32;
454 for state in self.node_states.values() {
455 match state.status {
456 NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
458 running += 1
459 }
460 NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
461 NodeStatus::Errored => errored += 1,
462 }
463 }
464 (running, stopped, errored)
465 }
466
467 fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
469 if let Some(state) = self.node_states.get_mut(&node_id) {
470 state.status = status;
471 state.pid = pid;
472 if status == NodeStatus::Running {
473 state.started_at = Some(Instant::now());
474 } else {
475 state.started_at = None;
479 }
480 }
481 }
482
483 pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
506 let mut sys = sysinfo::System::new();
512 sys.refresh_processes_specifics(
513 sysinfo::ProcessesToUpdate::All,
514 true,
515 sysinfo::ProcessRefreshKind::everything(),
516 );
517
518 let mut adopted = Vec::new();
519 for config in registry.list() {
520 let Some(pid) = resolve_adopted_pid(config, &sys) else {
521 continue;
522 };
523 self.node_states.insert(
524 config.id,
525 NodeRuntime {
526 status: NodeStatus::Running,
527 pid: Some(pid),
528 started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
536 restart_count: 0,
537 first_crash_at: None,
538 pending_version: None,
539 },
540 );
541 let _ = self.event_tx.send(NodeEvent::NodeStarted {
542 node_id: config.id,
543 pid,
544 });
545 adopted.push(config.id);
546 }
547 adopted
548 }
549
550 fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
553 let state = match self.node_states.get_mut(&node_id) {
554 Some(s) => s,
555 None => return (false, 0, Duration::ZERO),
556 };
557
558 let now = Instant::now();
559
560 if let Some(started_at) = state.started_at {
562 if started_at.elapsed() >= STABLE_DURATION {
563 state.restart_count = 0;
564 state.first_crash_at = None;
565 }
566 }
567
568 state.restart_count += 1;
569 let attempt = state.restart_count;
570
571 if state.first_crash_at.is_none() {
572 state.first_crash_at = Some(now);
573 }
574
575 if let Some(first_crash) = state.first_crash_at {
577 if attempt >= MAX_CRASHES_BEFORE_ERRORED
578 && now.duration_since(first_crash) < CRASH_WINDOW
579 {
580 state.status = NodeStatus::Errored;
581 state.pid = None;
582 state.started_at = None;
583 return (false, attempt, Duration::ZERO);
584 }
585 }
586
587 let backoff_secs = 1u64 << (attempt - 1).min(5);
589 let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
590
591 (true, attempt, backoff)
592 }
593}
594
595pub fn spawn_upgrade_monitor(
605 registry: Arc<RwLock<NodeRegistry>>,
606 supervisor: Arc<RwLock<Supervisor>>,
607 interval: Duration,
608 shutdown: CancellationToken,
609) {
610 tokio::spawn(async move {
611 let mut ticker = tokio::time::interval(interval);
612 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
616 ticker.tick().await;
619
620 loop {
621 tokio::select! {
622 _ = shutdown.cancelled() => return,
623 _ = ticker.tick() => {},
624 }
625
626 let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
629 let reg = registry.read().await;
630 let sup = supervisor.read().await;
631 reg.list()
632 .into_iter()
633 .filter_map(|config| match sup.node_status(config.id) {
634 Ok(NodeStatus::Running) => Some((
635 config.id,
636 config.binary_path.clone(),
637 config.version.clone(),
638 sup.node_pending_version(config.id),
639 )),
640 _ => None,
641 })
642 .collect()
643 };
644
645 for (node_id, binary_path, recorded_version, current_pending) in candidates {
646 let observed = match extract_version(&binary_path).await {
647 Ok(v) => v,
648 Err(_) => continue,
650 };
651 if observed == recorded_version {
652 continue;
653 }
654 if current_pending.as_deref() == Some(observed.as_str()) {
655 continue;
656 }
657 supervisor
658 .write()
659 .await
660 .mark_upgrade_scheduled(node_id, observed);
661 }
662 }
663 });
664}
665
666pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
668 let mut args = vec![
669 "--rewards-address".to_string(),
670 config.rewards_address.clone(),
671 "--root-dir".to_string(),
672 config.data_dir.display().to_string(),
673 ];
674
675 if let Some(ref log_dir) = config.log_dir {
676 args.push("--enable-logging".to_string());
677 args.push("--log-dir".to_string());
678 args.push(log_dir.display().to_string());
679 }
680
681 if let Some(port) = config.node_port {
682 args.push("--port".to_string());
683 args.push(port.to_string());
684 }
685
686 if let Some(port) = config.metrics_port {
687 args.push("--metrics-port".to_string());
688 args.push(port.to_string());
689 }
690
691 for peer in &config.bootstrap_peers {
692 args.push("--bootstrap".to_string());
693 args.push(peer.clone());
694 }
695
696 args.push("--stop-on-upgrade".to_string());
701
702 args
703}
704
705async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
711 let args = build_node_args(config);
712 let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
713
714 let log_dir = config
715 .log_dir
716 .as_deref()
717 .unwrap_or(config.data_dir.as_path());
718
719 let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
720 if let Some(pid) = child.id() {
721 write_node_pid(&config.data_dir, pid);
722 }
723 Ok(child)
724}
725
726async fn monitor_node(
730 child: tokio::process::Child,
731 mut config: NodeConfig,
732 supervisor: Arc<RwLock<Supervisor>>,
733 registry: Arc<RwLock<NodeRegistry>>,
734 event_tx: broadcast::Sender<NodeEvent>,
735) {
736 monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
737 remove_node_pid(&config.data_dir);
738}
739
740async fn monitor_node_inner(
741 mut child: tokio::process::Child,
742 config: &mut NodeConfig,
743 supervisor: Arc<RwLock<Supervisor>>,
744 registry: Arc<RwLock<NodeRegistry>>,
745 event_tx: broadcast::Sender<NodeEvent>,
746) {
747 let node_id = config.id;
748
749 loop {
750 let exit_status = child.wait().await;
752
753 let status_at_exit = {
755 let sup = supervisor.read().await;
756 sup.node_status(node_id).ok()
757 };
758
759 match status_at_exit {
760 Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
761 Some(NodeStatus::UpgradeScheduled) => {
762 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
765 Ok(new_child) => {
766 child = new_child;
767 continue;
768 }
769 Err(e) => {
770 let _ = event_tx.send(NodeEvent::NodeErrored {
771 node_id,
772 message: format!("Failed to respawn after upgrade: {e}"),
773 });
774 let mut sup = supervisor.write().await;
775 sup.update_state(node_id, NodeStatus::Errored, None);
776 return;
777 }
778 }
779 }
780 _ => {}
781 }
782
783 let exit_code = exit_status.ok().and_then(|s| s.code());
784
785 if exit_code == Some(0) {
797 if let Ok(disk_version) = extract_version(&config.binary_path).await {
798 if disk_version != config.version {
799 {
800 let mut sup = supervisor.write().await;
801 sup.mark_upgrade_scheduled(node_id, disk_version.clone());
802 }
803 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
804 Ok(new_child) => {
805 child = new_child;
806 continue;
807 }
808 Err(e) => {
809 let _ = event_tx.send(NodeEvent::NodeErrored {
810 node_id,
811 message: format!("Failed to respawn after upgrade: {e}"),
812 });
813 let mut sup = supervisor.write().await;
814 sup.update_state(node_id, NodeStatus::Errored, None);
815 return;
816 }
817 }
818 }
819 }
820 }
824
825 let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
827
828 let (should_restart, attempt, backoff) = {
829 let mut sup = supervisor.write().await;
830 sup.record_crash(node_id)
831 };
832
833 if !should_restart {
834 let _ = event_tx.send(NodeEvent::NodeErrored {
835 node_id,
836 message: format!(
837 "Node crashed {} times within {} seconds, giving up",
838 MAX_CRASHES_BEFORE_ERRORED,
839 CRASH_WINDOW.as_secs()
840 ),
841 });
842 return;
843 }
844
845 let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
846
847 tokio::time::sleep(backoff).await;
848
849 match spawn_node_from_config(&*config).await {
851 Ok(new_child) => {
852 let pid = match new_child.id() {
853 Some(pid) => pid,
854 None => {
855 let _ = event_tx.send(NodeEvent::NodeErrored {
857 node_id,
858 message: "Restarted process exited before PID could be read"
859 .to_string(),
860 });
861 let mut sup = supervisor.write().await;
862 sup.update_state(node_id, NodeStatus::Errored, None);
863 return;
864 }
865 };
866 {
867 let mut sup = supervisor.write().await;
868 sup.update_state(node_id, NodeStatus::Running, Some(pid));
869 }
870 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
871 child = new_child;
872 }
873 Err(e) => {
874 let _ = event_tx.send(NodeEvent::NodeErrored {
875 node_id,
876 message: format!("Failed to restart node: {e}"),
877 });
878 let mut sup = supervisor.write().await;
879 sup.update_state(node_id, NodeStatus::Errored, None);
880 return;
881 }
882 }
883 }
884}
885
886async fn respawn_upgraded_node(
891 config: &mut NodeConfig,
892 supervisor: &Arc<RwLock<Supervisor>>,
893 registry: &Arc<RwLock<NodeRegistry>>,
894 event_tx: &broadcast::Sender<NodeEvent>,
895) -> Result<tokio::process::Child> {
896 let node_id = config.id;
897 let old_version = config.version.clone();
898
899 let new_child = spawn_node_from_config(config).await?;
900 let pid = new_child
901 .id()
902 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
903
904 let new_version = extract_version(&config.binary_path).await.ok();
907
908 if let Some(ref version) = new_version {
909 config.version = version.clone();
910 let mut reg = registry.write().await;
911 if let Ok(stored) = reg.get_mut(node_id) {
912 stored.version = version.clone();
913 let _ = reg.save();
914 }
915 }
916
917 {
918 let mut sup = supervisor.write().await;
919 if let Some(state) = sup.node_states.get_mut(&node_id) {
920 state.status = NodeStatus::Running;
921 state.pid = Some(pid);
922 state.started_at = Some(Instant::now());
923 state.pending_version = None;
924 state.restart_count = 0;
925 state.first_crash_at = None;
926 }
927 }
928
929 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
930 if let Some(version) = new_version {
931 let _ = event_tx.send(NodeEvent::NodeUpgraded {
932 node_id,
933 old_version,
934 new_version: version,
935 });
936 }
937
938 Ok(new_child)
939}
940
941const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
943
944async fn graceful_kill(pid: u32) {
946 send_signal_term(pid);
947
948 let start = Instant::now();
950 loop {
951 if !is_process_alive(pid) {
952 return;
953 }
954 if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
955 break;
956 }
957 tokio::time::sleep(Duration::from_millis(100)).await;
958 }
959
960 send_signal_kill(pid);
962
963 for _ in 0..10 {
965 if !is_process_alive(pid) {
966 return;
967 }
968 tokio::time::sleep(Duration::from_millis(50)).await;
969 }
970}
971
972pub fn spawn_liveness_monitor(
984 registry: Arc<RwLock<NodeRegistry>>,
985 supervisor: Arc<RwLock<Supervisor>>,
986 event_tx: broadcast::Sender<NodeEvent>,
987 interval: Duration,
988 shutdown: CancellationToken,
989) {
990 tokio::spawn(async move {
991 let mut ticker = tokio::time::interval(interval);
992 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
996 loop {
997 tokio::select! {
998 _ = shutdown.cancelled() => return,
999 _ = ticker.tick() => {}
1000 }
1001
1002 let candidates: Vec<(u32, u32, PathBuf)> =
1004 {
1005 let sup = supervisor.read().await;
1006 let reg = registry.read().await;
1007 reg.list()
1008 .into_iter()
1009 .filter_map(|config| {
1010 let pid = sup.node_pid(config.id)?;
1011 matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1012 .then_some((config.id, pid, config.data_dir.clone()))
1013 })
1014 .collect()
1015 };
1016
1017 for (node_id, pid, data_dir) in candidates {
1018 if is_process_alive(pid) {
1019 continue;
1020 }
1021 let mut sup = supervisor.write().await;
1022 if !matches!(sup.node_status(node_id), Ok(NodeStatus::Running)) {
1025 continue;
1026 }
1027 sup.update_state(node_id, NodeStatus::Stopped, None);
1028 let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1029 remove_node_pid(&data_dir);
1030 }
1031 }
1032 });
1033}
1034
1035#[cfg(unix)]
1036fn pid_to_i32(pid: u32) -> Option<i32> {
1037 i32::try_from(pid).ok().filter(|&p| p > 0)
1038}
1039
1040#[cfg(unix)]
1041fn send_signal_term(pid: u32) {
1042 if let Some(pid) = pid_to_i32(pid) {
1043 unsafe {
1044 libc::kill(pid, libc::SIGTERM);
1045 }
1046 }
1047}
1048
1049#[cfg(unix)]
1050fn send_signal_kill(pid: u32) {
1051 if let Some(pid) = pid_to_i32(pid) {
1052 unsafe {
1053 libc::kill(pid, libc::SIGKILL);
1054 }
1055 }
1056}
1057
1058#[cfg(unix)]
1059fn is_process_alive(pid: u32) -> bool {
1060 let Some(pid) = pid_to_i32(pid) else {
1061 return false;
1062 };
1063 let ret = unsafe { libc::kill(pid, 0) };
1064 if ret == 0 {
1065 return true;
1066 }
1067 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1069}
1070
1071#[cfg(windows)]
1072fn send_signal_term(pid: u32) {
1073 use windows_sys::Win32::System::Console::{
1074 AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1075 };
1076
1077 unsafe {
1078 FreeConsole();
1081
1082 if AttachConsole(pid) != 0 {
1084 SetConsoleCtrlHandler(None, 1);
1087 GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1088 FreeConsole();
1091 std::thread::sleep(std::time::Duration::from_millis(50));
1095 SetConsoleCtrlHandler(None, 0);
1098 }
1099 }
1100}
1101
1102#[cfg(windows)]
1103fn send_signal_kill(pid: u32) {
1104 use windows_sys::Win32::Foundation::CloseHandle;
1105 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1106
1107 unsafe {
1108 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1109 if !handle.is_null() {
1110 TerminateProcess(handle, 1);
1111 CloseHandle(handle);
1112 }
1113 }
1114}
1115
1116#[cfg(windows)]
1117fn is_process_alive(pid: u32) -> bool {
1118 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1119 use windows_sys::Win32::System::Threading::{
1120 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1121 };
1122
1123 unsafe {
1124 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1125 if handle.is_null() {
1126 return false;
1127 }
1128 let mut exit_code: u32 = 0;
1129 let success = GetExitCodeProcess(handle, &mut exit_code);
1130 CloseHandle(handle);
1131 success != 0 && exit_code == STILL_ACTIVE as u32
1132 }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use super::*;
1138
1139 #[test]
1140 fn build_node_args_basic() {
1141 let config = NodeConfig {
1142 id: 1,
1143 service_name: "node1".to_string(),
1144 rewards_address: "0xabc123".to_string(),
1145 data_dir: "/data/node-1".into(),
1146 log_dir: Some("/logs/node-1".into()),
1147 node_port: Some(12000),
1148 metrics_port: Some(13000),
1149 network_id: Some(1),
1150 binary_path: "/bin/node".into(),
1151 version: "0.1.0".to_string(),
1152 env_variables: HashMap::new(),
1153 bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1154 };
1155
1156 let args = build_node_args(&config);
1157
1158 assert!(args.contains(&"--rewards-address".to_string()));
1159 assert!(args.contains(&"0xabc123".to_string()));
1160 assert!(args.contains(&"--root-dir".to_string()));
1161 assert!(args.contains(&"/data/node-1".to_string()));
1162 assert!(args.contains(&"--enable-logging".to_string()));
1163 assert!(args.contains(&"--log-dir".to_string()));
1164 assert!(args.contains(&"/logs/node-1".to_string()));
1165 assert!(args.contains(&"--port".to_string()));
1166 assert!(args.contains(&"12000".to_string()));
1167 assert!(args.contains(&"--metrics-port".to_string()));
1168 assert!(args.contains(&"13000".to_string()));
1169 assert!(args.contains(&"--bootstrap".to_string()));
1170 assert!(args.contains(&"peer1".to_string()));
1171 assert!(args.contains(&"peer2".to_string()));
1172 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1173 }
1174
1175 #[test]
1176 fn build_node_args_minimal() {
1177 let config = NodeConfig {
1178 id: 1,
1179 service_name: "node1".to_string(),
1180 rewards_address: "0xabc".to_string(),
1181 data_dir: "/data/node-1".into(),
1182 log_dir: None,
1183 node_port: None,
1184 metrics_port: None,
1185 network_id: None,
1186 binary_path: "/bin/node".into(),
1187 version: "0.1.0".to_string(),
1188 env_variables: HashMap::new(),
1189 bootstrap_peers: vec![],
1190 };
1191
1192 let args = build_node_args(&config);
1193
1194 assert!(args.contains(&"--rewards-address".to_string()));
1195 assert!(args.contains(&"--root-dir".to_string()));
1196 assert!(!args.contains(&"--enable-logging".to_string()));
1197 assert!(!args.contains(&"--log-dir".to_string()));
1198 assert!(!args.contains(&"--port".to_string()));
1199 assert!(!args.contains(&"--metrics-port".to_string()));
1200 assert!(!args.contains(&"--bootstrap".to_string()));
1201 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1202 }
1203
1204 #[test]
1205 fn record_crash_backoff_increases() {
1206 let (tx, _rx) = broadcast::channel(16);
1207 let mut sup = Supervisor::new(tx);
1208
1209 sup.node_states.insert(
1211 1,
1212 NodeRuntime {
1213 status: NodeStatus::Running,
1214 pid: Some(100),
1215 started_at: Some(Instant::now()),
1216 restart_count: 0,
1217 first_crash_at: None,
1218 pending_version: None,
1219 },
1220 );
1221
1222 let (should_restart, attempt, backoff) = sup.record_crash(1);
1223 assert!(should_restart);
1224 assert_eq!(attempt, 1);
1225 assert_eq!(backoff, Duration::from_secs(1));
1226
1227 let (should_restart, attempt, backoff) = sup.record_crash(1);
1228 assert!(should_restart);
1229 assert_eq!(attempt, 2);
1230 assert_eq!(backoff, Duration::from_secs(2));
1231
1232 let (should_restart, attempt, backoff) = sup.record_crash(1);
1233 assert!(should_restart);
1234 assert_eq!(attempt, 3);
1235 assert_eq!(backoff, Duration::from_secs(4));
1236
1237 let (should_restart, attempt, backoff) = sup.record_crash(1);
1238 assert!(should_restart);
1239 assert_eq!(attempt, 4);
1240 assert_eq!(backoff, Duration::from_secs(8));
1241
1242 let (should_restart, attempt, _) = sup.record_crash(1);
1244 assert!(!should_restart);
1245 assert_eq!(attempt, 5);
1246 assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1247 }
1248
1249 #[test]
1250 fn node_counts_tracks_states() {
1251 let (tx, _rx) = broadcast::channel(16);
1252 let mut sup = Supervisor::new(tx);
1253
1254 sup.node_states.insert(
1255 1,
1256 NodeRuntime {
1257 status: NodeStatus::Running,
1258 pid: Some(100),
1259 started_at: Some(Instant::now()),
1260 restart_count: 0,
1261 first_crash_at: None,
1262 pending_version: None,
1263 },
1264 );
1265 sup.node_states.insert(
1266 2,
1267 NodeRuntime {
1268 status: NodeStatus::Stopped,
1269 pid: None,
1270 started_at: None,
1271 restart_count: 0,
1272 first_crash_at: None,
1273 pending_version: None,
1274 },
1275 );
1276 sup.node_states.insert(
1277 3,
1278 NodeRuntime {
1279 status: NodeStatus::Errored,
1280 pid: None,
1281 started_at: None,
1282 restart_count: 5,
1283 first_crash_at: None,
1284 pending_version: None,
1285 },
1286 );
1287
1288 let (running, stopped, errored) = sup.node_counts();
1289 assert_eq!(running, 1);
1290 assert_eq!(stopped, 1);
1291 assert_eq!(errored, 1);
1292 }
1293
1294 #[test]
1295 fn mark_upgrade_scheduled_only_affects_running_nodes() {
1296 let (tx, mut rx) = broadcast::channel(16);
1297 let mut sup = Supervisor::new(tx);
1298
1299 sup.node_states.insert(
1300 1,
1301 NodeRuntime {
1302 status: NodeStatus::Running,
1303 pid: Some(111),
1304 started_at: Some(Instant::now()),
1305 restart_count: 0,
1306 first_crash_at: None,
1307 pending_version: None,
1308 },
1309 );
1310 sup.node_states.insert(
1311 2,
1312 NodeRuntime {
1313 status: NodeStatus::Stopped,
1314 pid: None,
1315 started_at: None,
1316 restart_count: 0,
1317 first_crash_at: None,
1318 pending_version: None,
1319 },
1320 );
1321
1322 let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1324 assert!(affected);
1325 assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1326 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1327 match rx.try_recv() {
1328 Ok(NodeEvent::UpgradeScheduled {
1329 node_id,
1330 pending_version,
1331 }) => {
1332 assert_eq!(node_id, 1);
1333 assert_eq!(pending_version, "0.10.11-rc.1");
1334 }
1335 other => panic!("expected UpgradeScheduled event, got {other:?}"),
1336 }
1337
1338 let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1340 assert!(!affected);
1341 assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1342 assert!(sup.node_pending_version(2).is_none());
1343
1344 let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1346 assert!(!affected);
1347 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1349 }
1350
1351 #[test]
1352 fn node_counts_counts_upgrade_scheduled_as_running() {
1353 let (tx, _rx) = broadcast::channel(16);
1354 let mut sup = Supervisor::new(tx);
1355
1356 sup.node_states.insert(
1357 1,
1358 NodeRuntime {
1359 status: NodeStatus::UpgradeScheduled,
1360 pid: Some(111),
1361 started_at: Some(Instant::now()),
1362 restart_count: 0,
1363 first_crash_at: None,
1364 pending_version: Some("0.10.11-rc.1".to_string()),
1365 },
1366 );
1367
1368 let (running, stopped, errored) = sup.node_counts();
1369 assert_eq!(running, 1);
1370 assert_eq!(stopped, 0);
1371 assert_eq!(errored, 0);
1372 }
1373
1374 #[tokio::test]
1375 async fn stop_node_not_found() {
1376 let (tx, _rx) = broadcast::channel(16);
1377 let mut sup = Supervisor::new(tx);
1378
1379 let result = sup.stop_node(999).await;
1380 assert!(matches!(result, Err(Error::NodeNotFound(999))));
1381 }
1382
1383 #[tokio::test]
1384 async fn stop_node_not_running() {
1385 let (tx, _rx) = broadcast::channel(16);
1386 let mut sup = Supervisor::new(tx);
1387
1388 sup.node_states.insert(
1389 1,
1390 NodeRuntime {
1391 status: NodeStatus::Stopped,
1392 pid: None,
1393 started_at: None,
1394 restart_count: 0,
1395 first_crash_at: None,
1396 pending_version: None,
1397 },
1398 );
1399
1400 let result = sup.stop_node(1).await;
1401 assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1402 }
1403
1404 #[tokio::test]
1405 async fn stop_all_nodes_mixed_states() {
1406 let (tx, _rx) = broadcast::channel(16);
1407 let mut sup = Supervisor::new(tx);
1408
1409 sup.node_states.insert(
1411 1,
1412 NodeRuntime {
1413 status: NodeStatus::Running,
1414 pid: Some(999999),
1415 started_at: Some(Instant::now()),
1416 restart_count: 0,
1417 first_crash_at: None,
1418 pending_version: None,
1419 },
1420 );
1421 sup.node_states.insert(
1423 2,
1424 NodeRuntime {
1425 status: NodeStatus::Stopped,
1426 pid: None,
1427 started_at: None,
1428 restart_count: 0,
1429 first_crash_at: None,
1430 pending_version: None,
1431 },
1432 );
1433
1434 let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
1435
1436 let result = sup.stop_all_nodes(&configs).await;
1437
1438 assert_eq!(result.stopped.len(), 1);
1439 assert_eq!(result.stopped[0].node_id, 1);
1440 assert_eq!(result.stopped[0].service_name, "node1");
1441 assert_eq!(result.already_stopped, vec![2]);
1442 assert!(result.failed.is_empty());
1443 }
1444}