1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio::time::MissedTickBehavior;
8use tokio_util::sync::CancellationToken;
9
10use crate::error::{Error, Result};
11use crate::node::binary::extract_version;
12use crate::node::events::NodeEvent;
13use crate::node::process::spawn::spawn_node;
14use crate::node::registry::NodeRegistry;
15use crate::node::types::{
16 NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
17};
18
19pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
21
22pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
30
31fn node_pid_file(data_dir: &Path) -> PathBuf {
34 data_dir.join("node.pid")
35}
36
37fn write_node_pid(data_dir: &Path, pid: u32) {
41 let path = node_pid_file(data_dir);
42 if let Err(e) = std::fs::write(&path, pid.to_string()) {
43 tracing::warn!(
44 "Failed to write node pid file at {}: {e}. Node will still run, but a future \
45 daemon restart will not be able to adopt it.",
46 path.display()
47 );
48 }
49}
50
51fn remove_node_pid(data_dir: &Path) {
54 let _ = std::fs::remove_file(node_pid_file(data_dir));
55}
56
57fn read_node_pid(data_dir: &Path) -> Option<u32> {
60 std::fs::read_to_string(node_pid_file(data_dir))
61 .ok()
62 .and_then(|s| s.trim().parse().ok())
63}
64
65fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
82 let target_data_dir = config.data_dir.as_path();
83 for (pid, process) in sys.processes() {
84 if process.thread_kind().is_some() {
89 continue;
90 }
91 let Some(exe) = process.exe() else {
92 continue;
93 };
94 if exe != config.binary_path.as_path() {
95 continue;
96 }
97
98 let cmd = process.cmd();
99 let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
100 let arg = arg.to_string_lossy();
101 if let Some(value) = arg.strip_prefix("--root-dir=") {
102 Path::new(value) == target_data_dir
103 } else if arg == "--root-dir" {
104 cmd.get(i + 1)
105 .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
106 .unwrap_or(false)
107 } else {
108 false
109 }
110 });
111
112 if matches_root_dir {
113 return Some(pid.as_u32());
114 }
115 }
116 None
117}
118
119fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
128 if !is_process_alive(pid) {
129 return false;
130 }
131 match sys.process(sysinfo::Pid::from_u32(pid)) {
132 Some(process) => process.thread_kind().is_none(),
133 None => true,
134 }
135}
136
137fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
143 if let Some(pid) = read_node_pid(&config.data_dir) {
144 if pid_is_live_process(pid, sys) {
145 return Some(pid);
146 }
147 remove_node_pid(&config.data_dir);
151 }
152
153 let pid = find_running_node_process(sys, config)?;
154 write_node_pid(&config.data_dir, pid);
155 Some(pid)
156}
157
158fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
170 let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
171 let now_secs = std::time::SystemTime::now()
172 .duration_since(std::time::UNIX_EPOCH)
173 .ok()?
174 .as_secs();
175 let age = now_secs.saturating_sub(start_secs);
176 Instant::now().checked_sub(Duration::from_secs(age))
177}
178
179const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
181
182const CRASH_WINDOW: Duration = Duration::from_secs(300); const STABLE_DURATION: Duration = Duration::from_secs(300); const MAX_BACKOFF: Duration = Duration::from_secs(60);
191
192pub struct Supervisor {
194 event_tx: broadcast::Sender<NodeEvent>,
195 node_states: HashMap<u32, NodeRuntime>,
197 adopted: HashSet<u32>,
202}
203
204struct NodeRuntime {
205 status: NodeStatus,
206 pid: Option<u32>,
207 started_at: Option<Instant>,
208 restart_count: u32,
209 first_crash_at: Option<Instant>,
210 pending_version: Option<String>,
212}
213
214impl Supervisor {
215 pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
216 Self {
217 event_tx,
218 node_states: HashMap::new(),
219 adopted: HashSet::new(),
220 }
221 }
222
223 pub fn is_adopted(&self, node_id: u32) -> bool {
226 self.adopted.contains(&node_id)
227 }
228
229 fn mark_owned(&mut self, node_id: u32) {
232 self.adopted.remove(&node_id);
233 }
234
235 pub async fn start_node(
240 &mut self,
241 config: &NodeConfig,
242 supervisor_ref: Arc<RwLock<Supervisor>>,
243 registry_ref: Arc<RwLock<NodeRegistry>>,
244 ) -> Result<NodeStarted> {
245 let node_id = config.id;
246
247 if let Some(state) = self.node_states.get(&node_id) {
248 if state.status == NodeStatus::Running {
249 return Err(Error::NodeAlreadyRunning(node_id));
250 }
251 }
252
253 let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
254
255 let mut child = spawn_node_from_config(config).await?;
256 let pid = child
257 .id()
258 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
259
260 match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
265 Ok(Ok(exit_status)) => {
266 let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
270 let stderr_path = spawn_log_dir.join("stderr.log");
271 let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
272 let detail = if stderr_msg.trim().is_empty() {
273 format!("exit code: {exit_status}")
274 } else {
275 stderr_msg.trim().to_string()
276 };
277 self.node_states.insert(
278 node_id,
279 NodeRuntime {
280 status: NodeStatus::Errored,
281 pid: None,
282 started_at: None,
283 restart_count: 0,
284 first_crash_at: None,
285 pending_version: None,
286 },
287 );
288 return Err(Error::ProcessSpawn(format!(
289 "Node {node_id} exited immediately: {detail}"
290 )));
291 }
292 Ok(Err(e)) => {
293 return Err(Error::ProcessSpawn(format!(
294 "Failed to check node process status: {e}"
295 )));
296 }
297 Err(_) => {} }
299
300 self.node_states.insert(
301 node_id,
302 NodeRuntime {
303 status: NodeStatus::Running,
304 pid: Some(pid),
305 started_at: Some(Instant::now()),
306 restart_count: 0,
307 first_crash_at: None,
308 pending_version: None,
309 },
310 );
311 self.mark_owned(node_id);
314
315 let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
316
317 let result = NodeStarted {
318 node_id,
319 service_name: config.service_name.clone(),
320 pid,
321 };
322
323 let event_tx = self.event_tx.clone();
325 let config = config.clone();
326 tokio::spawn(async move {
327 monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
328 });
329
330 Ok(result)
331 }
332
333 pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
339 let state = self
340 .node_states
341 .get_mut(&node_id)
342 .ok_or(Error::NodeNotFound(node_id))?;
343
344 if state.status != NodeStatus::Running {
345 return Err(Error::NodeNotRunning(node_id));
346 }
347
348 let pid = state.pid;
349
350 let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
351 state.status = NodeStatus::Stopping;
352
353 if let Some(pid) = pid {
354 graceful_kill(pid).await;
355 }
356
357 let state = self.node_states.get_mut(&node_id).unwrap();
359 state.status = NodeStatus::Stopped;
360 state.pid = None;
361 state.started_at = None;
362
363 let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
364
365 Ok(())
366 }
367
368 pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
370 let mut stopped = Vec::new();
371 let mut failed = Vec::new();
372 let mut already_stopped = Vec::new();
373
374 for (node_id, service_name) in configs {
375 let node_id = *node_id;
376 match self.node_status(node_id) {
377 Ok(NodeStatus::Running) => {}
378 Ok(_) => {
379 already_stopped.push(node_id);
380 continue;
381 }
382 Err(_) => {
383 already_stopped.push(node_id);
384 continue;
385 }
386 }
387
388 match self.stop_node(node_id).await {
389 Ok(()) => {
390 stopped.push(NodeStopped {
391 node_id,
392 service_name: service_name.clone(),
393 });
394 }
395 Err(Error::NodeNotRunning(_)) => {
396 already_stopped.push(node_id);
397 }
398 Err(e) => {
399 failed.push(NodeStopFailed {
400 node_id,
401 service_name: service_name.clone(),
402 error: e.to_string(),
403 });
404 }
405 }
406 }
407
408 StopNodeResult {
409 stopped,
410 failed,
411 already_stopped,
412 }
413 }
414
415 pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
417 self.node_states
418 .get(&node_id)
419 .map(|s| s.status)
420 .ok_or(Error::NodeNotFound(node_id))
421 }
422
423 pub fn node_pid(&self, node_id: u32) -> Option<u32> {
425 self.node_states.get(&node_id).and_then(|s| s.pid)
426 }
427
428 pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
430 self.node_states
431 .get(&node_id)
432 .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
433 }
434
435 pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
437 self.node_states
438 .get(&node_id)
439 .and_then(|s| s.pending_version.clone())
440 }
441
442 fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
448 let Some(state) = self.node_states.get_mut(&node_id) else {
449 return false;
450 };
451 if state.status != NodeStatus::Running {
452 return false;
453 }
454 state.status = NodeStatus::UpgradeScheduled;
455 state.pending_version = Some(pending_version.clone());
456 let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
457 node_id,
458 pending_version,
459 });
460 true
461 }
462
463 pub fn is_running(&self, node_id: u32) -> bool {
465 self.node_states
466 .get(&node_id)
467 .is_some_and(|s| s.status == NodeStatus::Running)
468 }
469
470 pub fn node_counts(&self) -> (u32, u32, u32) {
472 let mut running = 0u32;
473 let mut stopped = 0u32;
474 let mut errored = 0u32;
475 for state in self.node_states.values() {
476 match state.status {
477 NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
479 running += 1
480 }
481 NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
482 NodeStatus::Errored => errored += 1,
483 }
484 }
485 (running, stopped, errored)
486 }
487
488 fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
490 if let Some(state) = self.node_states.get_mut(&node_id) {
491 state.status = status;
492 state.pid = pid;
493 if status == NodeStatus::Running {
494 state.started_at = Some(Instant::now());
495 } else {
496 state.started_at = None;
500 }
501 }
502 }
503
504 pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
527 let mut sys = sysinfo::System::new();
533 sys.refresh_processes_specifics(
534 sysinfo::ProcessesToUpdate::All,
535 true,
536 sysinfo::ProcessRefreshKind::everything(),
537 );
538
539 let mut adopted = Vec::new();
540 for config in registry.list() {
541 let Some(pid) = resolve_adopted_pid(config, &sys) else {
542 continue;
543 };
544 self.node_states.insert(
545 config.id,
546 NodeRuntime {
547 status: NodeStatus::Running,
548 pid: Some(pid),
549 started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
557 restart_count: 0,
558 first_crash_at: None,
559 pending_version: None,
560 },
561 );
562 self.adopted.insert(config.id);
565 let _ = self.event_tx.send(NodeEvent::NodeStarted {
566 node_id: config.id,
567 pid,
568 });
569 adopted.push(config.id);
570 }
571 adopted
572 }
573
574 fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
577 let state = match self.node_states.get_mut(&node_id) {
578 Some(s) => s,
579 None => return (false, 0, Duration::ZERO),
580 };
581
582 let now = Instant::now();
583
584 if let Some(started_at) = state.started_at {
586 if started_at.elapsed() >= STABLE_DURATION {
587 state.restart_count = 0;
588 state.first_crash_at = None;
589 }
590 }
591
592 state.restart_count += 1;
593 let attempt = state.restart_count;
594
595 if state.first_crash_at.is_none() {
596 state.first_crash_at = Some(now);
597 }
598
599 if let Some(first_crash) = state.first_crash_at {
601 if attempt >= MAX_CRASHES_BEFORE_ERRORED
602 && now.duration_since(first_crash) < CRASH_WINDOW
603 {
604 state.status = NodeStatus::Errored;
605 state.pid = None;
606 state.started_at = None;
607 return (false, attempt, Duration::ZERO);
608 }
609 }
610
611 let backoff_secs = 1u64 << (attempt - 1).min(5);
613 let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
614
615 (true, attempt, backoff)
616 }
617}
618
619pub fn spawn_upgrade_monitor(
629 registry: Arc<RwLock<NodeRegistry>>,
630 supervisor: Arc<RwLock<Supervisor>>,
631 interval: Duration,
632 shutdown: CancellationToken,
633) {
634 tokio::spawn(async move {
635 let mut ticker = tokio::time::interval(interval);
636 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
640 ticker.tick().await;
643
644 loop {
645 tokio::select! {
646 _ = shutdown.cancelled() => return,
647 _ = ticker.tick() => {},
648 }
649
650 let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
653 let reg = registry.read().await;
654 let sup = supervisor.read().await;
655 reg.list()
656 .into_iter()
657 .filter_map(|config| match sup.node_status(config.id) {
658 Ok(NodeStatus::Running) => Some((
659 config.id,
660 config.binary_path.clone(),
661 config.version.clone(),
662 sup.node_pending_version(config.id),
663 )),
664 _ => None,
665 })
666 .collect()
667 };
668
669 for (node_id, binary_path, recorded_version, current_pending) in candidates {
670 let observed = match extract_version(&binary_path).await {
671 Ok(v) => v,
672 Err(_) => continue,
674 };
675 if observed == recorded_version {
676 continue;
677 }
678 if current_pending.as_deref() == Some(observed.as_str()) {
679 continue;
680 }
681 supervisor
682 .write()
683 .await
684 .mark_upgrade_scheduled(node_id, observed);
685 }
686 }
687 });
688}
689
690pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
692 let mut args = vec![
693 "--rewards-address".to_string(),
694 config.rewards_address.clone(),
695 "--root-dir".to_string(),
696 config.data_dir.display().to_string(),
697 ];
698
699 if let Some(ref log_dir) = config.log_dir {
700 args.push("--enable-logging".to_string());
701 args.push("--log-dir".to_string());
702 args.push(log_dir.display().to_string());
703 }
704
705 if let Some(port) = config.node_port {
706 args.push("--port".to_string());
707 args.push(port.to_string());
708 }
709
710 if let Some(port) = config.metrics_port {
711 args.push("--metrics-port".to_string());
712 args.push(port.to_string());
713 }
714
715 for peer in &config.bootstrap_peers {
716 args.push("--bootstrap".to_string());
717 args.push(peer.clone());
718 }
719
720 if let Some(channel) = config.upgrade_channel {
721 args.push("--upgrade-channel".to_string());
722 args.push(channel.to_string());
723 }
724
725 args.push("--stop-on-upgrade".to_string());
730
731 args
732}
733
734async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
740 let args = build_node_args(config);
741 let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
742
743 let log_dir = config
744 .log_dir
745 .as_deref()
746 .unwrap_or(config.data_dir.as_path());
747
748 let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
749 if let Some(pid) = child.id() {
750 write_node_pid(&config.data_dir, pid);
751 }
752 Ok(child)
753}
754
755async fn monitor_node(
759 child: tokio::process::Child,
760 mut config: NodeConfig,
761 supervisor: Arc<RwLock<Supervisor>>,
762 registry: Arc<RwLock<NodeRegistry>>,
763 event_tx: broadcast::Sender<NodeEvent>,
764) {
765 monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
766 remove_node_pid(&config.data_dir);
767}
768
769async fn monitor_node_inner(
770 mut child: tokio::process::Child,
771 config: &mut NodeConfig,
772 supervisor: Arc<RwLock<Supervisor>>,
773 registry: Arc<RwLock<NodeRegistry>>,
774 event_tx: broadcast::Sender<NodeEvent>,
775) {
776 let node_id = config.id;
777
778 loop {
779 let exit_status = child.wait().await;
781
782 let status_at_exit = {
784 let sup = supervisor.read().await;
785 sup.node_status(node_id).ok()
786 };
787
788 match status_at_exit {
789 Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
790 Some(NodeStatus::UpgradeScheduled) => {
791 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
794 Ok(new_child) => {
795 child = new_child;
796 continue;
797 }
798 Err(e) => {
799 let _ = event_tx.send(NodeEvent::NodeErrored {
800 node_id,
801 message: format!("Failed to respawn after upgrade: {e}"),
802 });
803 let mut sup = supervisor.write().await;
804 sup.update_state(node_id, NodeStatus::Errored, None);
805 return;
806 }
807 }
808 }
809 _ => {}
810 }
811
812 let exit_code = exit_status.ok().and_then(|s| s.code());
813
814 if exit_code == Some(0) {
826 if let Ok(disk_version) = extract_version(&config.binary_path).await {
827 if disk_version != config.version {
828 {
829 let mut sup = supervisor.write().await;
830 sup.mark_upgrade_scheduled(node_id, disk_version.clone());
831 }
832 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
833 Ok(new_child) => {
834 child = new_child;
835 continue;
836 }
837 Err(e) => {
838 let _ = event_tx.send(NodeEvent::NodeErrored {
839 node_id,
840 message: format!("Failed to respawn after upgrade: {e}"),
841 });
842 let mut sup = supervisor.write().await;
843 sup.update_state(node_id, NodeStatus::Errored, None);
844 return;
845 }
846 }
847 }
848 }
849 }
853
854 let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
856
857 let (should_restart, attempt, backoff) = {
858 let mut sup = supervisor.write().await;
859 sup.record_crash(node_id)
860 };
861
862 if !should_restart {
863 let _ = event_tx.send(NodeEvent::NodeErrored {
864 node_id,
865 message: format!(
866 "Node crashed {} times within {} seconds, giving up",
867 MAX_CRASHES_BEFORE_ERRORED,
868 CRASH_WINDOW.as_secs()
869 ),
870 });
871 return;
872 }
873
874 let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
875
876 tokio::time::sleep(backoff).await;
877
878 match spawn_node_from_config(&*config).await {
880 Ok(new_child) => {
881 let pid = match new_child.id() {
882 Some(pid) => pid,
883 None => {
884 let _ = event_tx.send(NodeEvent::NodeErrored {
886 node_id,
887 message: "Restarted process exited before PID could be read"
888 .to_string(),
889 });
890 let mut sup = supervisor.write().await;
891 sup.update_state(node_id, NodeStatus::Errored, None);
892 return;
893 }
894 };
895 {
896 let mut sup = supervisor.write().await;
897 sup.update_state(node_id, NodeStatus::Running, Some(pid));
898 }
899 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
900 child = new_child;
901 }
902 Err(e) => {
903 let _ = event_tx.send(NodeEvent::NodeErrored {
904 node_id,
905 message: format!("Failed to restart node: {e}"),
906 });
907 let mut sup = supervisor.write().await;
908 sup.update_state(node_id, NodeStatus::Errored, None);
909 return;
910 }
911 }
912 }
913}
914
915async fn respawn_upgraded_node(
920 config: &mut NodeConfig,
921 supervisor: &Arc<RwLock<Supervisor>>,
922 registry: &Arc<RwLock<NodeRegistry>>,
923 event_tx: &broadcast::Sender<NodeEvent>,
924) -> Result<tokio::process::Child> {
925 let node_id = config.id;
926 let old_version = config.version.clone();
927
928 let new_child = spawn_node_from_config(config).await?;
929 let pid = new_child
930 .id()
931 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
932
933 let new_version = extract_version(&config.binary_path).await.ok();
936
937 if let Some(ref version) = new_version {
938 config.version = version.clone();
939 let mut reg = registry.write().await;
940 if let Ok(stored) = reg.get_mut(node_id) {
941 stored.version = version.clone();
942 let _ = reg.save();
943 }
944 }
945
946 {
947 let mut sup = supervisor.write().await;
948 if let Some(state) = sup.node_states.get_mut(&node_id) {
949 state.status = NodeStatus::Running;
950 state.pid = Some(pid);
951 state.started_at = Some(Instant::now());
952 state.pending_version = None;
953 state.restart_count = 0;
954 state.first_crash_at = None;
955 }
956 }
957
958 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
959 if let Some(version) = new_version {
960 let _ = event_tx.send(NodeEvent::NodeUpgraded {
961 node_id,
962 old_version,
963 new_version: version,
964 });
965 }
966
967 Ok(new_child)
968}
969
970const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
972
973async fn graceful_kill(pid: u32) {
975 send_signal_term(pid);
976
977 let start = Instant::now();
979 loop {
980 if !is_process_alive(pid) {
981 return;
982 }
983 if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
984 break;
985 }
986 tokio::time::sleep(Duration::from_millis(100)).await;
987 }
988
989 send_signal_kill(pid);
991
992 for _ in 0..10 {
994 if !is_process_alive(pid) {
995 return;
996 }
997 tokio::time::sleep(Duration::from_millis(50)).await;
998 }
999}
1000
1001fn liveness_should_stop(
1014 snapshot_pid: u32,
1015 current_pid: Option<u32>,
1016 current_status: Option<NodeStatus>,
1017) -> bool {
1018 current_status == Some(NodeStatus::Running) && current_pid == Some(snapshot_pid)
1019}
1020
1021pub fn spawn_liveness_monitor(
1033 registry: Arc<RwLock<NodeRegistry>>,
1034 supervisor: Arc<RwLock<Supervisor>>,
1035 event_tx: broadcast::Sender<NodeEvent>,
1036 interval: Duration,
1037 shutdown: CancellationToken,
1038) {
1039 tokio::spawn(async move {
1040 let mut ticker = tokio::time::interval(interval);
1041 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1045 loop {
1046 tokio::select! {
1047 _ = shutdown.cancelled() => return,
1048 _ = ticker.tick() => {}
1049 }
1050
1051 let candidates: Vec<(u32, u32, PathBuf)> =
1053 {
1054 let sup = supervisor.read().await;
1055 let reg = registry.read().await;
1056 reg.list()
1057 .into_iter()
1058 .filter_map(|config| {
1059 let pid = sup.node_pid(config.id)?;
1060 matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1061 .then_some((config.id, pid, config.data_dir.clone()))
1062 })
1063 .collect()
1064 };
1065
1066 for (node_id, pid, data_dir) in candidates {
1067 if is_process_alive(pid) {
1068 continue;
1069 }
1070
1071 if supervisor.read().await.is_adopted(node_id) {
1077 let config = {
1078 let reg = registry.read().await;
1079 reg.get(node_id).ok().cloned()
1080 };
1081 if let Some(mut config) = config {
1082 let drifted = matches!(
1083 extract_version(&config.binary_path).await,
1084 Ok(disk_version) if disk_version != config.version
1085 );
1086 if drifted {
1087 match respawn_upgraded_node(
1088 &mut config,
1089 &supervisor,
1090 ®istry,
1091 &event_tx,
1092 )
1093 .await
1094 {
1095 Ok(child) => {
1096 supervisor.write().await.mark_owned(node_id);
1099 let sup_ref = Arc::clone(&supervisor);
1100 let reg_ref = Arc::clone(®istry);
1101 let ev = event_tx.clone();
1102 tokio::spawn(async move {
1103 monitor_node(child, config, sup_ref, reg_ref, ev).await;
1104 });
1105 continue;
1106 }
1107 Err(e) => {
1108 let _ = event_tx.send(NodeEvent::NodeErrored {
1109 node_id,
1110 message: format!(
1111 "Failed to respawn adopted node after upgrade: {e}"
1112 ),
1113 });
1114 let mut sup = supervisor.write().await;
1115 sup.update_state(node_id, NodeStatus::Errored, None);
1116 sup.mark_owned(node_id);
1117 remove_node_pid(&data_dir);
1118 continue;
1119 }
1120 }
1121 }
1122 }
1123 }
1124
1125 let mut sup = supervisor.write().await;
1126 if !liveness_should_stop(pid, sup.node_pid(node_id), sup.node_status(node_id).ok())
1129 {
1130 continue;
1131 }
1132 sup.update_state(node_id, NodeStatus::Stopped, None);
1133 let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1134 remove_node_pid(&data_dir);
1135 }
1136 }
1137 });
1138}
1139
1140#[cfg(unix)]
1141fn pid_to_i32(pid: u32) -> Option<i32> {
1142 i32::try_from(pid).ok().filter(|&p| p > 0)
1143}
1144
1145#[cfg(unix)]
1146fn send_signal_term(pid: u32) {
1147 if let Some(pid) = pid_to_i32(pid) {
1148 unsafe {
1149 libc::kill(pid, libc::SIGTERM);
1150 }
1151 }
1152}
1153
1154#[cfg(unix)]
1155fn send_signal_kill(pid: u32) {
1156 if let Some(pid) = pid_to_i32(pid) {
1157 unsafe {
1158 libc::kill(pid, libc::SIGKILL);
1159 }
1160 }
1161}
1162
1163#[cfg(unix)]
1164fn is_process_alive(pid: u32) -> bool {
1165 let Some(pid) = pid_to_i32(pid) else {
1166 return false;
1167 };
1168 let ret = unsafe { libc::kill(pid, 0) };
1169 if ret == 0 {
1170 return true;
1171 }
1172 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1174}
1175
1176#[cfg(windows)]
1177fn send_signal_term(pid: u32) {
1178 use windows_sys::Win32::System::Console::{
1179 AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1180 };
1181
1182 unsafe {
1183 FreeConsole();
1186
1187 if AttachConsole(pid) != 0 {
1189 SetConsoleCtrlHandler(None, 1);
1192 GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1193 FreeConsole();
1196 std::thread::sleep(std::time::Duration::from_millis(50));
1200 SetConsoleCtrlHandler(None, 0);
1203 }
1204 }
1205}
1206
1207#[cfg(windows)]
1208fn send_signal_kill(pid: u32) {
1209 use windows_sys::Win32::Foundation::CloseHandle;
1210 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1211
1212 unsafe {
1213 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1214 if !handle.is_null() {
1215 TerminateProcess(handle, 1);
1216 CloseHandle(handle);
1217 }
1218 }
1219}
1220
1221#[cfg(windows)]
1222fn is_process_alive(pid: u32) -> bool {
1223 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1224 use windows_sys::Win32::System::Threading::{
1225 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1226 };
1227
1228 unsafe {
1229 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1230 if handle.is_null() {
1231 return false;
1232 }
1233 let mut exit_code: u32 = 0;
1234 let success = GetExitCodeProcess(handle, &mut exit_code);
1235 CloseHandle(handle);
1236 success != 0 && exit_code == STILL_ACTIVE as u32
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use super::*;
1243 use crate::node::types::UpgradeChannel;
1244
1245 #[test]
1246 fn adopted_flag_lifecycle() {
1247 let (tx, _rx) = broadcast::channel(16);
1248 let mut sup = Supervisor::new(tx);
1249
1250 assert!(!sup.is_adopted(1));
1252
1253 sup.adopted.insert(1);
1255 assert!(sup.is_adopted(1));
1256
1257 sup.mark_owned(1);
1260 assert!(!sup.is_adopted(1));
1261 }
1262
1263 #[test]
1271 fn liveness_does_not_stop_node_respawned_under_it() {
1272 let dead_snapshot_pid = 1000; let live_respawned_pid = Some(2000); assert!(
1275 !liveness_should_stop(
1276 dead_snapshot_pid,
1277 live_respawned_pid,
1278 Some(NodeStatus::Running)
1279 ),
1280 "liveness must not stop a node whose PID changed under it (respawned with a live PID)"
1281 );
1282 }
1283
1284 #[test]
1285 fn build_node_args_basic() {
1286 let config = NodeConfig {
1287 id: 1,
1288 service_name: "node1".to_string(),
1289 rewards_address: "0xabc123".to_string(),
1290 data_dir: "/data/node-1".into(),
1291 log_dir: Some("/logs/node-1".into()),
1292 node_port: Some(12000),
1293 metrics_port: Some(13000),
1294 network_id: Some(1),
1295 binary_path: "/bin/node".into(),
1296 version: "0.1.0".to_string(),
1297 env_variables: HashMap::new(),
1298 bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1299 upgrade_channel: None,
1300 };
1301
1302 let args = build_node_args(&config);
1303
1304 assert!(args.contains(&"--rewards-address".to_string()));
1305 assert!(args.contains(&"0xabc123".to_string()));
1306 assert!(args.contains(&"--root-dir".to_string()));
1307 assert!(args.contains(&"/data/node-1".to_string()));
1308 assert!(args.contains(&"--enable-logging".to_string()));
1309 assert!(args.contains(&"--log-dir".to_string()));
1310 assert!(args.contains(&"/logs/node-1".to_string()));
1311 assert!(args.contains(&"--port".to_string()));
1312 assert!(args.contains(&"12000".to_string()));
1313 assert!(args.contains(&"--metrics-port".to_string()));
1314 assert!(args.contains(&"13000".to_string()));
1315 assert!(args.contains(&"--bootstrap".to_string()));
1316 assert!(args.contains(&"peer1".to_string()));
1317 assert!(args.contains(&"peer2".to_string()));
1318 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1319 assert!(!args.contains(&"--upgrade-channel".to_string()));
1321 }
1322
1323 #[test]
1324 fn build_node_args_includes_upgrade_channel() {
1325 let mut config = NodeConfig {
1326 id: 1,
1327 service_name: "node1".to_string(),
1328 rewards_address: "0xabc".to_string(),
1329 data_dir: "/data/node-1".into(),
1330 log_dir: None,
1331 node_port: None,
1332 metrics_port: None,
1333 network_id: None,
1334 binary_path: "/bin/node".into(),
1335 version: "0.1.0".to_string(),
1336 env_variables: HashMap::new(),
1337 bootstrap_peers: vec![],
1338 upgrade_channel: Some(UpgradeChannel::Beta),
1339 };
1340
1341 let args = build_node_args(&config);
1342 let idx = args
1343 .iter()
1344 .position(|a| a == "--upgrade-channel")
1345 .expect("--upgrade-channel should be present");
1346 assert_eq!(args[idx + 1], "beta");
1347
1348 config.upgrade_channel = Some(UpgradeChannel::Stable);
1349 let args = build_node_args(&config);
1350 let idx = args.iter().position(|a| a == "--upgrade-channel").unwrap();
1351 assert_eq!(args[idx + 1], "stable");
1352 }
1353
1354 #[test]
1355 fn build_node_args_minimal() {
1356 let config = NodeConfig {
1357 id: 1,
1358 service_name: "node1".to_string(),
1359 rewards_address: "0xabc".to_string(),
1360 data_dir: "/data/node-1".into(),
1361 log_dir: None,
1362 node_port: None,
1363 metrics_port: None,
1364 network_id: None,
1365 binary_path: "/bin/node".into(),
1366 version: "0.1.0".to_string(),
1367 env_variables: HashMap::new(),
1368 bootstrap_peers: vec![],
1369 upgrade_channel: None,
1370 };
1371
1372 let args = build_node_args(&config);
1373
1374 assert!(args.contains(&"--rewards-address".to_string()));
1375 assert!(args.contains(&"--root-dir".to_string()));
1376 assert!(!args.contains(&"--enable-logging".to_string()));
1377 assert!(!args.contains(&"--log-dir".to_string()));
1378 assert!(!args.contains(&"--port".to_string()));
1379 assert!(!args.contains(&"--metrics-port".to_string()));
1380 assert!(!args.contains(&"--bootstrap".to_string()));
1381 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1382 }
1383
1384 #[test]
1385 fn record_crash_backoff_increases() {
1386 let (tx, _rx) = broadcast::channel(16);
1387 let mut sup = Supervisor::new(tx);
1388
1389 sup.node_states.insert(
1391 1,
1392 NodeRuntime {
1393 status: NodeStatus::Running,
1394 pid: Some(100),
1395 started_at: Some(Instant::now()),
1396 restart_count: 0,
1397 first_crash_at: None,
1398 pending_version: None,
1399 },
1400 );
1401
1402 let (should_restart, attempt, backoff) = sup.record_crash(1);
1403 assert!(should_restart);
1404 assert_eq!(attempt, 1);
1405 assert_eq!(backoff, Duration::from_secs(1));
1406
1407 let (should_restart, attempt, backoff) = sup.record_crash(1);
1408 assert!(should_restart);
1409 assert_eq!(attempt, 2);
1410 assert_eq!(backoff, Duration::from_secs(2));
1411
1412 let (should_restart, attempt, backoff) = sup.record_crash(1);
1413 assert!(should_restart);
1414 assert_eq!(attempt, 3);
1415 assert_eq!(backoff, Duration::from_secs(4));
1416
1417 let (should_restart, attempt, backoff) = sup.record_crash(1);
1418 assert!(should_restart);
1419 assert_eq!(attempt, 4);
1420 assert_eq!(backoff, Duration::from_secs(8));
1421
1422 let (should_restart, attempt, _) = sup.record_crash(1);
1424 assert!(!should_restart);
1425 assert_eq!(attempt, 5);
1426 assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1427 }
1428
1429 #[test]
1430 fn node_counts_tracks_states() {
1431 let (tx, _rx) = broadcast::channel(16);
1432 let mut sup = Supervisor::new(tx);
1433
1434 sup.node_states.insert(
1435 1,
1436 NodeRuntime {
1437 status: NodeStatus::Running,
1438 pid: Some(100),
1439 started_at: Some(Instant::now()),
1440 restart_count: 0,
1441 first_crash_at: None,
1442 pending_version: None,
1443 },
1444 );
1445 sup.node_states.insert(
1446 2,
1447 NodeRuntime {
1448 status: NodeStatus::Stopped,
1449 pid: None,
1450 started_at: None,
1451 restart_count: 0,
1452 first_crash_at: None,
1453 pending_version: None,
1454 },
1455 );
1456 sup.node_states.insert(
1457 3,
1458 NodeRuntime {
1459 status: NodeStatus::Errored,
1460 pid: None,
1461 started_at: None,
1462 restart_count: 5,
1463 first_crash_at: None,
1464 pending_version: None,
1465 },
1466 );
1467
1468 let (running, stopped, errored) = sup.node_counts();
1469 assert_eq!(running, 1);
1470 assert_eq!(stopped, 1);
1471 assert_eq!(errored, 1);
1472 }
1473
1474 #[test]
1475 fn mark_upgrade_scheduled_only_affects_running_nodes() {
1476 let (tx, mut rx) = broadcast::channel(16);
1477 let mut sup = Supervisor::new(tx);
1478
1479 sup.node_states.insert(
1480 1,
1481 NodeRuntime {
1482 status: NodeStatus::Running,
1483 pid: Some(111),
1484 started_at: Some(Instant::now()),
1485 restart_count: 0,
1486 first_crash_at: None,
1487 pending_version: None,
1488 },
1489 );
1490 sup.node_states.insert(
1491 2,
1492 NodeRuntime {
1493 status: NodeStatus::Stopped,
1494 pid: None,
1495 started_at: None,
1496 restart_count: 0,
1497 first_crash_at: None,
1498 pending_version: None,
1499 },
1500 );
1501
1502 let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1504 assert!(affected);
1505 assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1506 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1507 match rx.try_recv() {
1508 Ok(NodeEvent::UpgradeScheduled {
1509 node_id,
1510 pending_version,
1511 }) => {
1512 assert_eq!(node_id, 1);
1513 assert_eq!(pending_version, "0.10.11-rc.1");
1514 }
1515 other => panic!("expected UpgradeScheduled event, got {other:?}"),
1516 }
1517
1518 let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1520 assert!(!affected);
1521 assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1522 assert!(sup.node_pending_version(2).is_none());
1523
1524 let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1526 assert!(!affected);
1527 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1529 }
1530
1531 #[test]
1532 fn node_counts_counts_upgrade_scheduled_as_running() {
1533 let (tx, _rx) = broadcast::channel(16);
1534 let mut sup = Supervisor::new(tx);
1535
1536 sup.node_states.insert(
1537 1,
1538 NodeRuntime {
1539 status: NodeStatus::UpgradeScheduled,
1540 pid: Some(111),
1541 started_at: Some(Instant::now()),
1542 restart_count: 0,
1543 first_crash_at: None,
1544 pending_version: Some("0.10.11-rc.1".to_string()),
1545 },
1546 );
1547
1548 let (running, stopped, errored) = sup.node_counts();
1549 assert_eq!(running, 1);
1550 assert_eq!(stopped, 0);
1551 assert_eq!(errored, 0);
1552 }
1553
1554 #[tokio::test]
1555 async fn stop_node_not_found() {
1556 let (tx, _rx) = broadcast::channel(16);
1557 let mut sup = Supervisor::new(tx);
1558
1559 let result = sup.stop_node(999).await;
1560 assert!(matches!(result, Err(Error::NodeNotFound(999))));
1561 }
1562
1563 #[tokio::test]
1564 async fn stop_node_not_running() {
1565 let (tx, _rx) = broadcast::channel(16);
1566 let mut sup = Supervisor::new(tx);
1567
1568 sup.node_states.insert(
1569 1,
1570 NodeRuntime {
1571 status: NodeStatus::Stopped,
1572 pid: None,
1573 started_at: None,
1574 restart_count: 0,
1575 first_crash_at: None,
1576 pending_version: None,
1577 },
1578 );
1579
1580 let result = sup.stop_node(1).await;
1581 assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1582 }
1583
1584 #[tokio::test]
1585 async fn stop_all_nodes_mixed_states() {
1586 let (tx, _rx) = broadcast::channel(16);
1587 let mut sup = Supervisor::new(tx);
1588
1589 sup.node_states.insert(
1591 1,
1592 NodeRuntime {
1593 status: NodeStatus::Running,
1594 pid: Some(999999),
1595 started_at: Some(Instant::now()),
1596 restart_count: 0,
1597 first_crash_at: None,
1598 pending_version: None,
1599 },
1600 );
1601 sup.node_states.insert(
1603 2,
1604 NodeRuntime {
1605 status: NodeStatus::Stopped,
1606 pid: None,
1607 started_at: None,
1608 restart_count: 0,
1609 first_crash_at: None,
1610 pending_version: None,
1611 },
1612 );
1613
1614 let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
1615
1616 let result = sup.stop_all_nodes(&configs).await;
1617
1618 assert_eq!(result.stopped.len(), 1);
1619 assert_eq!(result.stopped[0].node_id, 1);
1620 assert_eq!(result.stopped[0].service_name, "node1");
1621 assert_eq!(result.already_stopped, vec![2]);
1622 assert!(result.failed.is_empty());
1623 }
1624}