1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio_util::sync::CancellationToken;
8
9use crate::error::{Error, Result};
10use crate::node::binary::extract_version;
11use crate::node::events::NodeEvent;
12use crate::node::process::spawn::spawn_node;
13use crate::node::registry::NodeRegistry;
14use crate::node::types::{
15 NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
16};
17
18pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
20
21pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
29
30fn node_pid_file(data_dir: &Path) -> PathBuf {
33 data_dir.join("node.pid")
34}
35
36fn write_node_pid(data_dir: &Path, pid: u32) {
40 let path = node_pid_file(data_dir);
41 if let Err(e) = std::fs::write(&path, pid.to_string()) {
42 tracing::warn!(
43 "Failed to write node pid file at {}: {e}. Node will still run, but a future \
44 daemon restart will not be able to adopt it.",
45 path.display()
46 );
47 }
48}
49
50fn remove_node_pid(data_dir: &Path) {
53 let _ = std::fs::remove_file(node_pid_file(data_dir));
54}
55
56fn read_node_pid(data_dir: &Path) -> Option<u32> {
59 std::fs::read_to_string(node_pid_file(data_dir))
60 .ok()
61 .and_then(|s| s.trim().parse().ok())
62}
63
64fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
81 let target_data_dir = config.data_dir.as_path();
82 for (pid, process) in sys.processes() {
83 if process.thread_kind().is_some() {
88 continue;
89 }
90 let Some(exe) = process.exe() else {
91 continue;
92 };
93 if exe != config.binary_path.as_path() {
94 continue;
95 }
96
97 let cmd = process.cmd();
98 let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
99 let arg = arg.to_string_lossy();
100 if let Some(value) = arg.strip_prefix("--root-dir=") {
101 Path::new(value) == target_data_dir
102 } else if arg == "--root-dir" {
103 cmd.get(i + 1)
104 .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
105 .unwrap_or(false)
106 } else {
107 false
108 }
109 });
110
111 if matches_root_dir {
112 return Some(pid.as_u32());
113 }
114 }
115 None
116}
117
118fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
127 if !is_process_alive(pid) {
128 return false;
129 }
130 match sys.process(sysinfo::Pid::from_u32(pid)) {
131 Some(process) => process.thread_kind().is_none(),
132 None => true,
133 }
134}
135
136fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
142 if let Some(pid) = read_node_pid(&config.data_dir) {
143 if pid_is_live_process(pid, sys) {
144 return Some(pid);
145 }
146 remove_node_pid(&config.data_dir);
150 }
151
152 let pid = find_running_node_process(sys, config)?;
153 write_node_pid(&config.data_dir, pid);
154 Some(pid)
155}
156
157fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
169 let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
170 let now_secs = std::time::SystemTime::now()
171 .duration_since(std::time::UNIX_EPOCH)
172 .ok()?
173 .as_secs();
174 let age = now_secs.saturating_sub(start_secs);
175 Instant::now().checked_sub(Duration::from_secs(age))
176}
177
178const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
180
181const CRASH_WINDOW: Duration = Duration::from_secs(300); const STABLE_DURATION: Duration = Duration::from_secs(300); const MAX_BACKOFF: Duration = Duration::from_secs(60);
190
191pub struct Supervisor {
193 event_tx: broadcast::Sender<NodeEvent>,
194 node_states: HashMap<u32, NodeRuntime>,
196}
197
198struct NodeRuntime {
199 status: NodeStatus,
200 pid: Option<u32>,
201 started_at: Option<Instant>,
202 restart_count: u32,
203 first_crash_at: Option<Instant>,
204 pending_version: Option<String>,
206}
207
208impl Supervisor {
209 pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
210 Self {
211 event_tx,
212 node_states: HashMap::new(),
213 }
214 }
215
216 pub async fn start_node(
221 &mut self,
222 config: &NodeConfig,
223 supervisor_ref: Arc<RwLock<Supervisor>>,
224 registry_ref: Arc<RwLock<NodeRegistry>>,
225 ) -> Result<NodeStarted> {
226 let node_id = config.id;
227
228 if let Some(state) = self.node_states.get(&node_id) {
229 if state.status == NodeStatus::Running {
230 return Err(Error::NodeAlreadyRunning(node_id));
231 }
232 }
233
234 let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
235
236 let mut child = spawn_node_from_config(config).await?;
237 let pid = child
238 .id()
239 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
240
241 match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
246 Ok(Ok(exit_status)) => {
247 let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
251 let stderr_path = spawn_log_dir.join("stderr.log");
252 let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
253 let detail = if stderr_msg.trim().is_empty() {
254 format!("exit code: {exit_status}")
255 } else {
256 stderr_msg.trim().to_string()
257 };
258 self.node_states.insert(
259 node_id,
260 NodeRuntime {
261 status: NodeStatus::Errored,
262 pid: None,
263 started_at: None,
264 restart_count: 0,
265 first_crash_at: None,
266 pending_version: None,
267 },
268 );
269 return Err(Error::ProcessSpawn(format!(
270 "Node {node_id} exited immediately: {detail}"
271 )));
272 }
273 Ok(Err(e)) => {
274 return Err(Error::ProcessSpawn(format!(
275 "Failed to check node process status: {e}"
276 )));
277 }
278 Err(_) => {} }
280
281 self.node_states.insert(
282 node_id,
283 NodeRuntime {
284 status: NodeStatus::Running,
285 pid: Some(pid),
286 started_at: Some(Instant::now()),
287 restart_count: 0,
288 first_crash_at: None,
289 pending_version: None,
290 },
291 );
292
293 let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
294
295 let result = NodeStarted {
296 node_id,
297 service_name: config.service_name.clone(),
298 pid,
299 };
300
301 let event_tx = self.event_tx.clone();
303 let config = config.clone();
304 tokio::spawn(async move {
305 monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
306 });
307
308 Ok(result)
309 }
310
311 pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
317 let state = self
318 .node_states
319 .get_mut(&node_id)
320 .ok_or(Error::NodeNotFound(node_id))?;
321
322 if state.status != NodeStatus::Running {
323 return Err(Error::NodeNotRunning(node_id));
324 }
325
326 let pid = state.pid;
327
328 let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
329 state.status = NodeStatus::Stopping;
330
331 if let Some(pid) = pid {
332 graceful_kill(pid).await;
333 }
334
335 let state = self.node_states.get_mut(&node_id).unwrap();
337 state.status = NodeStatus::Stopped;
338 state.pid = None;
339 state.started_at = None;
340
341 let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
342
343 Ok(())
344 }
345
346 pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
348 let mut stopped = Vec::new();
349 let mut failed = Vec::new();
350 let mut already_stopped = Vec::new();
351
352 for (node_id, service_name) in configs {
353 let node_id = *node_id;
354 match self.node_status(node_id) {
355 Ok(NodeStatus::Running) => {}
356 Ok(_) => {
357 already_stopped.push(node_id);
358 continue;
359 }
360 Err(_) => {
361 already_stopped.push(node_id);
362 continue;
363 }
364 }
365
366 match self.stop_node(node_id).await {
367 Ok(()) => {
368 stopped.push(NodeStopped {
369 node_id,
370 service_name: service_name.clone(),
371 });
372 }
373 Err(Error::NodeNotRunning(_)) => {
374 already_stopped.push(node_id);
375 }
376 Err(e) => {
377 failed.push(NodeStopFailed {
378 node_id,
379 service_name: service_name.clone(),
380 error: e.to_string(),
381 });
382 }
383 }
384 }
385
386 StopNodeResult {
387 stopped,
388 failed,
389 already_stopped,
390 }
391 }
392
393 pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
395 self.node_states
396 .get(&node_id)
397 .map(|s| s.status)
398 .ok_or(Error::NodeNotFound(node_id))
399 }
400
401 pub fn node_pid(&self, node_id: u32) -> Option<u32> {
403 self.node_states.get(&node_id).and_then(|s| s.pid)
404 }
405
406 pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
408 self.node_states
409 .get(&node_id)
410 .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
411 }
412
413 pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
415 self.node_states
416 .get(&node_id)
417 .and_then(|s| s.pending_version.clone())
418 }
419
420 fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
426 let Some(state) = self.node_states.get_mut(&node_id) else {
427 return false;
428 };
429 if state.status != NodeStatus::Running {
430 return false;
431 }
432 state.status = NodeStatus::UpgradeScheduled;
433 state.pending_version = Some(pending_version.clone());
434 let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
435 node_id,
436 pending_version,
437 });
438 true
439 }
440
441 pub fn is_running(&self, node_id: u32) -> bool {
443 self.node_states
444 .get(&node_id)
445 .is_some_and(|s| s.status == NodeStatus::Running)
446 }
447
448 pub fn node_counts(&self) -> (u32, u32, u32) {
450 let mut running = 0u32;
451 let mut stopped = 0u32;
452 let mut errored = 0u32;
453 for state in self.node_states.values() {
454 match state.status {
455 NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
457 running += 1
458 }
459 NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
460 NodeStatus::Errored => errored += 1,
461 }
462 }
463 (running, stopped, errored)
464 }
465
466 fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
468 if let Some(state) = self.node_states.get_mut(&node_id) {
469 state.status = status;
470 state.pid = pid;
471 if status == NodeStatus::Running {
472 state.started_at = Some(Instant::now());
473 } else {
474 state.started_at = None;
478 }
479 }
480 }
481
482 pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
505 let mut sys = sysinfo::System::new();
511 sys.refresh_processes_specifics(
512 sysinfo::ProcessesToUpdate::All,
513 true,
514 sysinfo::ProcessRefreshKind::everything(),
515 );
516
517 let mut adopted = Vec::new();
518 for config in registry.list() {
519 let Some(pid) = resolve_adopted_pid(config, &sys) else {
520 continue;
521 };
522 self.node_states.insert(
523 config.id,
524 NodeRuntime {
525 status: NodeStatus::Running,
526 pid: Some(pid),
527 started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
535 restart_count: 0,
536 first_crash_at: None,
537 pending_version: None,
538 },
539 );
540 let _ = self.event_tx.send(NodeEvent::NodeStarted {
541 node_id: config.id,
542 pid,
543 });
544 adopted.push(config.id);
545 }
546 adopted
547 }
548
549 fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
552 let state = match self.node_states.get_mut(&node_id) {
553 Some(s) => s,
554 None => return (false, 0, Duration::ZERO),
555 };
556
557 let now = Instant::now();
558
559 if let Some(started_at) = state.started_at {
561 if started_at.elapsed() >= STABLE_DURATION {
562 state.restart_count = 0;
563 state.first_crash_at = None;
564 }
565 }
566
567 state.restart_count += 1;
568 let attempt = state.restart_count;
569
570 if state.first_crash_at.is_none() {
571 state.first_crash_at = Some(now);
572 }
573
574 if let Some(first_crash) = state.first_crash_at {
576 if attempt >= MAX_CRASHES_BEFORE_ERRORED
577 && now.duration_since(first_crash) < CRASH_WINDOW
578 {
579 state.status = NodeStatus::Errored;
580 state.pid = None;
581 state.started_at = None;
582 return (false, attempt, Duration::ZERO);
583 }
584 }
585
586 let backoff_secs = 1u64 << (attempt - 1).min(5);
588 let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
589
590 (true, attempt, backoff)
591 }
592}
593
594pub fn spawn_upgrade_monitor(
604 registry: Arc<RwLock<NodeRegistry>>,
605 supervisor: Arc<RwLock<Supervisor>>,
606 interval: Duration,
607 shutdown: CancellationToken,
608) {
609 tokio::spawn(async move {
610 let mut ticker = tokio::time::interval(interval);
611 ticker.tick().await;
614
615 loop {
616 tokio::select! {
617 _ = shutdown.cancelled() => return,
618 _ = ticker.tick() => {},
619 }
620
621 let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
624 let reg = registry.read().await;
625 let sup = supervisor.read().await;
626 reg.list()
627 .into_iter()
628 .filter_map(|config| match sup.node_status(config.id) {
629 Ok(NodeStatus::Running) => Some((
630 config.id,
631 config.binary_path.clone(),
632 config.version.clone(),
633 sup.node_pending_version(config.id),
634 )),
635 _ => None,
636 })
637 .collect()
638 };
639
640 for (node_id, binary_path, recorded_version, current_pending) in candidates {
641 let observed = match extract_version(&binary_path).await {
642 Ok(v) => v,
643 Err(_) => continue,
645 };
646 if observed == recorded_version {
647 continue;
648 }
649 if current_pending.as_deref() == Some(observed.as_str()) {
650 continue;
651 }
652 supervisor
653 .write()
654 .await
655 .mark_upgrade_scheduled(node_id, observed);
656 }
657 }
658 });
659}
660
661pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
663 let mut args = vec![
664 "--rewards-address".to_string(),
665 config.rewards_address.clone(),
666 "--root-dir".to_string(),
667 config.data_dir.display().to_string(),
668 ];
669
670 if let Some(ref log_dir) = config.log_dir {
671 args.push("--enable-logging".to_string());
672 args.push("--log-dir".to_string());
673 args.push(log_dir.display().to_string());
674 }
675
676 if let Some(port) = config.node_port {
677 args.push("--port".to_string());
678 args.push(port.to_string());
679 }
680
681 if let Some(port) = config.metrics_port {
682 args.push("--metrics-port".to_string());
683 args.push(port.to_string());
684 }
685
686 for peer in &config.bootstrap_peers {
687 args.push("--bootstrap".to_string());
688 args.push(peer.clone());
689 }
690
691 args.push("--stop-on-upgrade".to_string());
696
697 args
698}
699
700async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
706 let args = build_node_args(config);
707 let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
708
709 let log_dir = config
710 .log_dir
711 .as_deref()
712 .unwrap_or(config.data_dir.as_path());
713
714 let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
715 if let Some(pid) = child.id() {
716 write_node_pid(&config.data_dir, pid);
717 }
718 Ok(child)
719}
720
721async fn monitor_node(
725 child: tokio::process::Child,
726 mut config: NodeConfig,
727 supervisor: Arc<RwLock<Supervisor>>,
728 registry: Arc<RwLock<NodeRegistry>>,
729 event_tx: broadcast::Sender<NodeEvent>,
730) {
731 monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
732 remove_node_pid(&config.data_dir);
733}
734
735async fn monitor_node_inner(
736 mut child: tokio::process::Child,
737 config: &mut NodeConfig,
738 supervisor: Arc<RwLock<Supervisor>>,
739 registry: Arc<RwLock<NodeRegistry>>,
740 event_tx: broadcast::Sender<NodeEvent>,
741) {
742 let node_id = config.id;
743
744 loop {
745 let exit_status = child.wait().await;
747
748 let status_at_exit = {
750 let sup = supervisor.read().await;
751 sup.node_status(node_id).ok()
752 };
753
754 match status_at_exit {
755 Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
756 Some(NodeStatus::UpgradeScheduled) => {
757 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
760 Ok(new_child) => {
761 child = new_child;
762 continue;
763 }
764 Err(e) => {
765 let _ = event_tx.send(NodeEvent::NodeErrored {
766 node_id,
767 message: format!("Failed to respawn after upgrade: {e}"),
768 });
769 let mut sup = supervisor.write().await;
770 sup.update_state(node_id, NodeStatus::Errored, None);
771 return;
772 }
773 }
774 }
775 _ => {}
776 }
777
778 let exit_code = exit_status.ok().and_then(|s| s.code());
779
780 if exit_code == Some(0) {
792 if let Ok(disk_version) = extract_version(&config.binary_path).await {
793 if disk_version != config.version {
794 {
795 let mut sup = supervisor.write().await;
796 sup.mark_upgrade_scheduled(node_id, disk_version.clone());
797 }
798 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
799 Ok(new_child) => {
800 child = new_child;
801 continue;
802 }
803 Err(e) => {
804 let _ = event_tx.send(NodeEvent::NodeErrored {
805 node_id,
806 message: format!("Failed to respawn after upgrade: {e}"),
807 });
808 let mut sup = supervisor.write().await;
809 sup.update_state(node_id, NodeStatus::Errored, None);
810 return;
811 }
812 }
813 }
814 }
815 }
819
820 let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
822
823 let (should_restart, attempt, backoff) = {
824 let mut sup = supervisor.write().await;
825 sup.record_crash(node_id)
826 };
827
828 if !should_restart {
829 let _ = event_tx.send(NodeEvent::NodeErrored {
830 node_id,
831 message: format!(
832 "Node crashed {} times within {} seconds, giving up",
833 MAX_CRASHES_BEFORE_ERRORED,
834 CRASH_WINDOW.as_secs()
835 ),
836 });
837 return;
838 }
839
840 let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
841
842 tokio::time::sleep(backoff).await;
843
844 match spawn_node_from_config(&*config).await {
846 Ok(new_child) => {
847 let pid = match new_child.id() {
848 Some(pid) => pid,
849 None => {
850 let _ = event_tx.send(NodeEvent::NodeErrored {
852 node_id,
853 message: "Restarted process exited before PID could be read"
854 .to_string(),
855 });
856 let mut sup = supervisor.write().await;
857 sup.update_state(node_id, NodeStatus::Errored, None);
858 return;
859 }
860 };
861 {
862 let mut sup = supervisor.write().await;
863 sup.update_state(node_id, NodeStatus::Running, Some(pid));
864 }
865 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
866 child = new_child;
867 }
868 Err(e) => {
869 let _ = event_tx.send(NodeEvent::NodeErrored {
870 node_id,
871 message: format!("Failed to restart node: {e}"),
872 });
873 let mut sup = supervisor.write().await;
874 sup.update_state(node_id, NodeStatus::Errored, None);
875 return;
876 }
877 }
878 }
879}
880
881async fn respawn_upgraded_node(
886 config: &mut NodeConfig,
887 supervisor: &Arc<RwLock<Supervisor>>,
888 registry: &Arc<RwLock<NodeRegistry>>,
889 event_tx: &broadcast::Sender<NodeEvent>,
890) -> Result<tokio::process::Child> {
891 let node_id = config.id;
892 let old_version = config.version.clone();
893
894 let new_child = spawn_node_from_config(config).await?;
895 let pid = new_child
896 .id()
897 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
898
899 let new_version = extract_version(&config.binary_path).await.ok();
902
903 if let Some(ref version) = new_version {
904 config.version = version.clone();
905 let mut reg = registry.write().await;
906 if let Ok(stored) = reg.get_mut(node_id) {
907 stored.version = version.clone();
908 let _ = reg.save();
909 }
910 }
911
912 {
913 let mut sup = supervisor.write().await;
914 if let Some(state) = sup.node_states.get_mut(&node_id) {
915 state.status = NodeStatus::Running;
916 state.pid = Some(pid);
917 state.started_at = Some(Instant::now());
918 state.pending_version = None;
919 state.restart_count = 0;
920 state.first_crash_at = None;
921 }
922 }
923
924 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
925 if let Some(version) = new_version {
926 let _ = event_tx.send(NodeEvent::NodeUpgraded {
927 node_id,
928 old_version,
929 new_version: version,
930 });
931 }
932
933 Ok(new_child)
934}
935
936const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
938
939async fn graceful_kill(pid: u32) {
941 send_signal_term(pid);
942
943 let start = Instant::now();
945 loop {
946 if !is_process_alive(pid) {
947 return;
948 }
949 if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
950 break;
951 }
952 tokio::time::sleep(Duration::from_millis(100)).await;
953 }
954
955 send_signal_kill(pid);
957
958 for _ in 0..10 {
960 if !is_process_alive(pid) {
961 return;
962 }
963 tokio::time::sleep(Duration::from_millis(50)).await;
964 }
965}
966
967pub fn spawn_liveness_monitor(
979 registry: Arc<RwLock<NodeRegistry>>,
980 supervisor: Arc<RwLock<Supervisor>>,
981 event_tx: broadcast::Sender<NodeEvent>,
982 interval: Duration,
983 shutdown: CancellationToken,
984) {
985 tokio::spawn(async move {
986 let mut ticker = tokio::time::interval(interval);
987 loop {
988 tokio::select! {
989 _ = shutdown.cancelled() => return,
990 _ = ticker.tick() => {}
991 }
992
993 let candidates: Vec<(u32, u32, PathBuf)> =
995 {
996 let sup = supervisor.read().await;
997 let reg = registry.read().await;
998 reg.list()
999 .into_iter()
1000 .filter_map(|config| {
1001 let pid = sup.node_pid(config.id)?;
1002 matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1003 .then_some((config.id, pid, config.data_dir.clone()))
1004 })
1005 .collect()
1006 };
1007
1008 for (node_id, pid, data_dir) in candidates {
1009 if is_process_alive(pid) {
1010 continue;
1011 }
1012 let mut sup = supervisor.write().await;
1013 if !matches!(sup.node_status(node_id), Ok(NodeStatus::Running)) {
1016 continue;
1017 }
1018 sup.update_state(node_id, NodeStatus::Stopped, None);
1019 let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1020 remove_node_pid(&data_dir);
1021 }
1022 }
1023 });
1024}
1025
1026#[cfg(unix)]
1027fn pid_to_i32(pid: u32) -> Option<i32> {
1028 i32::try_from(pid).ok().filter(|&p| p > 0)
1029}
1030
1031#[cfg(unix)]
1032fn send_signal_term(pid: u32) {
1033 if let Some(pid) = pid_to_i32(pid) {
1034 unsafe {
1035 libc::kill(pid, libc::SIGTERM);
1036 }
1037 }
1038}
1039
1040#[cfg(unix)]
1041fn send_signal_kill(pid: u32) {
1042 if let Some(pid) = pid_to_i32(pid) {
1043 unsafe {
1044 libc::kill(pid, libc::SIGKILL);
1045 }
1046 }
1047}
1048
1049#[cfg(unix)]
1050fn is_process_alive(pid: u32) -> bool {
1051 let Some(pid) = pid_to_i32(pid) else {
1052 return false;
1053 };
1054 let ret = unsafe { libc::kill(pid, 0) };
1055 if ret == 0 {
1056 return true;
1057 }
1058 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1060}
1061
1062#[cfg(windows)]
1063fn send_signal_term(pid: u32) {
1064 use windows_sys::Win32::System::Console::{
1065 AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1066 };
1067
1068 unsafe {
1069 FreeConsole();
1072
1073 if AttachConsole(pid) != 0 {
1075 SetConsoleCtrlHandler(None, 1);
1078 GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1079 FreeConsole();
1082 std::thread::sleep(std::time::Duration::from_millis(50));
1086 SetConsoleCtrlHandler(None, 0);
1089 }
1090 }
1091}
1092
1093#[cfg(windows)]
1094fn send_signal_kill(pid: u32) {
1095 use windows_sys::Win32::Foundation::CloseHandle;
1096 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1097
1098 unsafe {
1099 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1100 if !handle.is_null() {
1101 TerminateProcess(handle, 1);
1102 CloseHandle(handle);
1103 }
1104 }
1105}
1106
1107#[cfg(windows)]
1108fn is_process_alive(pid: u32) -> bool {
1109 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1110 use windows_sys::Win32::System::Threading::{
1111 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1112 };
1113
1114 unsafe {
1115 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1116 if handle.is_null() {
1117 return false;
1118 }
1119 let mut exit_code: u32 = 0;
1120 let success = GetExitCodeProcess(handle, &mut exit_code);
1121 CloseHandle(handle);
1122 success != 0 && exit_code == STILL_ACTIVE as u32
1123 }
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128 use super::*;
1129
1130 #[test]
1131 fn build_node_args_basic() {
1132 let config = NodeConfig {
1133 id: 1,
1134 service_name: "node1".to_string(),
1135 rewards_address: "0xabc123".to_string(),
1136 data_dir: "/data/node-1".into(),
1137 log_dir: Some("/logs/node-1".into()),
1138 node_port: Some(12000),
1139 metrics_port: Some(13000),
1140 network_id: Some(1),
1141 binary_path: "/bin/node".into(),
1142 version: "0.1.0".to_string(),
1143 env_variables: HashMap::new(),
1144 bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1145 };
1146
1147 let args = build_node_args(&config);
1148
1149 assert!(args.contains(&"--rewards-address".to_string()));
1150 assert!(args.contains(&"0xabc123".to_string()));
1151 assert!(args.contains(&"--root-dir".to_string()));
1152 assert!(args.contains(&"/data/node-1".to_string()));
1153 assert!(args.contains(&"--enable-logging".to_string()));
1154 assert!(args.contains(&"--log-dir".to_string()));
1155 assert!(args.contains(&"/logs/node-1".to_string()));
1156 assert!(args.contains(&"--port".to_string()));
1157 assert!(args.contains(&"12000".to_string()));
1158 assert!(args.contains(&"--metrics-port".to_string()));
1159 assert!(args.contains(&"13000".to_string()));
1160 assert!(args.contains(&"--bootstrap".to_string()));
1161 assert!(args.contains(&"peer1".to_string()));
1162 assert!(args.contains(&"peer2".to_string()));
1163 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1164 }
1165
1166 #[test]
1167 fn build_node_args_minimal() {
1168 let config = NodeConfig {
1169 id: 1,
1170 service_name: "node1".to_string(),
1171 rewards_address: "0xabc".to_string(),
1172 data_dir: "/data/node-1".into(),
1173 log_dir: None,
1174 node_port: None,
1175 metrics_port: None,
1176 network_id: None,
1177 binary_path: "/bin/node".into(),
1178 version: "0.1.0".to_string(),
1179 env_variables: HashMap::new(),
1180 bootstrap_peers: vec![],
1181 };
1182
1183 let args = build_node_args(&config);
1184
1185 assert!(args.contains(&"--rewards-address".to_string()));
1186 assert!(args.contains(&"--root-dir".to_string()));
1187 assert!(!args.contains(&"--enable-logging".to_string()));
1188 assert!(!args.contains(&"--log-dir".to_string()));
1189 assert!(!args.contains(&"--port".to_string()));
1190 assert!(!args.contains(&"--metrics-port".to_string()));
1191 assert!(!args.contains(&"--bootstrap".to_string()));
1192 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1193 }
1194
1195 #[test]
1196 fn record_crash_backoff_increases() {
1197 let (tx, _rx) = broadcast::channel(16);
1198 let mut sup = Supervisor::new(tx);
1199
1200 sup.node_states.insert(
1202 1,
1203 NodeRuntime {
1204 status: NodeStatus::Running,
1205 pid: Some(100),
1206 started_at: Some(Instant::now()),
1207 restart_count: 0,
1208 first_crash_at: None,
1209 pending_version: None,
1210 },
1211 );
1212
1213 let (should_restart, attempt, backoff) = sup.record_crash(1);
1214 assert!(should_restart);
1215 assert_eq!(attempt, 1);
1216 assert_eq!(backoff, Duration::from_secs(1));
1217
1218 let (should_restart, attempt, backoff) = sup.record_crash(1);
1219 assert!(should_restart);
1220 assert_eq!(attempt, 2);
1221 assert_eq!(backoff, Duration::from_secs(2));
1222
1223 let (should_restart, attempt, backoff) = sup.record_crash(1);
1224 assert!(should_restart);
1225 assert_eq!(attempt, 3);
1226 assert_eq!(backoff, Duration::from_secs(4));
1227
1228 let (should_restart, attempt, backoff) = sup.record_crash(1);
1229 assert!(should_restart);
1230 assert_eq!(attempt, 4);
1231 assert_eq!(backoff, Duration::from_secs(8));
1232
1233 let (should_restart, attempt, _) = sup.record_crash(1);
1235 assert!(!should_restart);
1236 assert_eq!(attempt, 5);
1237 assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1238 }
1239
1240 #[test]
1241 fn node_counts_tracks_states() {
1242 let (tx, _rx) = broadcast::channel(16);
1243 let mut sup = Supervisor::new(tx);
1244
1245 sup.node_states.insert(
1246 1,
1247 NodeRuntime {
1248 status: NodeStatus::Running,
1249 pid: Some(100),
1250 started_at: Some(Instant::now()),
1251 restart_count: 0,
1252 first_crash_at: None,
1253 pending_version: None,
1254 },
1255 );
1256 sup.node_states.insert(
1257 2,
1258 NodeRuntime {
1259 status: NodeStatus::Stopped,
1260 pid: None,
1261 started_at: None,
1262 restart_count: 0,
1263 first_crash_at: None,
1264 pending_version: None,
1265 },
1266 );
1267 sup.node_states.insert(
1268 3,
1269 NodeRuntime {
1270 status: NodeStatus::Errored,
1271 pid: None,
1272 started_at: None,
1273 restart_count: 5,
1274 first_crash_at: None,
1275 pending_version: None,
1276 },
1277 );
1278
1279 let (running, stopped, errored) = sup.node_counts();
1280 assert_eq!(running, 1);
1281 assert_eq!(stopped, 1);
1282 assert_eq!(errored, 1);
1283 }
1284
1285 #[test]
1286 fn mark_upgrade_scheduled_only_affects_running_nodes() {
1287 let (tx, mut rx) = broadcast::channel(16);
1288 let mut sup = Supervisor::new(tx);
1289
1290 sup.node_states.insert(
1291 1,
1292 NodeRuntime {
1293 status: NodeStatus::Running,
1294 pid: Some(111),
1295 started_at: Some(Instant::now()),
1296 restart_count: 0,
1297 first_crash_at: None,
1298 pending_version: None,
1299 },
1300 );
1301 sup.node_states.insert(
1302 2,
1303 NodeRuntime {
1304 status: NodeStatus::Stopped,
1305 pid: None,
1306 started_at: None,
1307 restart_count: 0,
1308 first_crash_at: None,
1309 pending_version: None,
1310 },
1311 );
1312
1313 let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1315 assert!(affected);
1316 assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1317 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1318 match rx.try_recv() {
1319 Ok(NodeEvent::UpgradeScheduled {
1320 node_id,
1321 pending_version,
1322 }) => {
1323 assert_eq!(node_id, 1);
1324 assert_eq!(pending_version, "0.10.11-rc.1");
1325 }
1326 other => panic!("expected UpgradeScheduled event, got {other:?}"),
1327 }
1328
1329 let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1331 assert!(!affected);
1332 assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1333 assert!(sup.node_pending_version(2).is_none());
1334
1335 let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1337 assert!(!affected);
1338 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1340 }
1341
1342 #[test]
1343 fn node_counts_counts_upgrade_scheduled_as_running() {
1344 let (tx, _rx) = broadcast::channel(16);
1345 let mut sup = Supervisor::new(tx);
1346
1347 sup.node_states.insert(
1348 1,
1349 NodeRuntime {
1350 status: NodeStatus::UpgradeScheduled,
1351 pid: Some(111),
1352 started_at: Some(Instant::now()),
1353 restart_count: 0,
1354 first_crash_at: None,
1355 pending_version: Some("0.10.11-rc.1".to_string()),
1356 },
1357 );
1358
1359 let (running, stopped, errored) = sup.node_counts();
1360 assert_eq!(running, 1);
1361 assert_eq!(stopped, 0);
1362 assert_eq!(errored, 0);
1363 }
1364
1365 #[tokio::test]
1366 async fn stop_node_not_found() {
1367 let (tx, _rx) = broadcast::channel(16);
1368 let mut sup = Supervisor::new(tx);
1369
1370 let result = sup.stop_node(999).await;
1371 assert!(matches!(result, Err(Error::NodeNotFound(999))));
1372 }
1373
1374 #[tokio::test]
1375 async fn stop_node_not_running() {
1376 let (tx, _rx) = broadcast::channel(16);
1377 let mut sup = Supervisor::new(tx);
1378
1379 sup.node_states.insert(
1380 1,
1381 NodeRuntime {
1382 status: NodeStatus::Stopped,
1383 pid: None,
1384 started_at: None,
1385 restart_count: 0,
1386 first_crash_at: None,
1387 pending_version: None,
1388 },
1389 );
1390
1391 let result = sup.stop_node(1).await;
1392 assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1393 }
1394
1395 #[tokio::test]
1396 async fn stop_all_nodes_mixed_states() {
1397 let (tx, _rx) = broadcast::channel(16);
1398 let mut sup = Supervisor::new(tx);
1399
1400 sup.node_states.insert(
1402 1,
1403 NodeRuntime {
1404 status: NodeStatus::Running,
1405 pid: Some(999999),
1406 started_at: Some(Instant::now()),
1407 restart_count: 0,
1408 first_crash_at: None,
1409 pending_version: None,
1410 },
1411 );
1412 sup.node_states.insert(
1414 2,
1415 NodeRuntime {
1416 status: NodeStatus::Stopped,
1417 pid: None,
1418 started_at: None,
1419 restart_count: 0,
1420 first_crash_at: None,
1421 pending_version: None,
1422 },
1423 );
1424
1425 let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
1426
1427 let result = sup.stop_all_nodes(&configs).await;
1428
1429 assert_eq!(result.stopped.len(), 1);
1430 assert_eq!(result.stopped[0].node_id, 1);
1431 assert_eq!(result.stopped[0].service_name, "node1");
1432 assert_eq!(result.already_stopped, vec![2]);
1433 assert!(result.failed.is_empty());
1434 }
1435}