1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio::time::MissedTickBehavior;
8use tokio_util::sync::CancellationToken;
9
10use crate::error::{Error, Result};
11use crate::node::binary::extract_version;
12use crate::node::daemon::disk;
13use crate::node::daemon::health::{DiskThresholds, FleetHealth};
14use crate::node::events::NodeEvent;
15use crate::node::process::spawn::spawn_node;
16use crate::node::registry::NodeRegistry;
17use crate::node::types::{
18 EvictionRecord, NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped,
19 StopNodeResult,
20};
21
22pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
24
25pub const EVICTION_POLL_INTERVAL: Duration = Duration::from_secs(30);
28
29const MAX_EVICTIONS_PER_CYCLE: usize = 4;
32
33pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
41
42fn node_pid_file(data_dir: &Path) -> PathBuf {
45 data_dir.join("node.pid")
46}
47
48fn write_node_pid(data_dir: &Path, pid: u32) {
52 let path = node_pid_file(data_dir);
53 if let Err(e) = std::fs::write(&path, pid.to_string()) {
54 tracing::warn!(
55 "Failed to write node pid file at {}: {e}. Node will still run, but a future \
56 daemon restart will not be able to adopt it.",
57 path.display()
58 );
59 }
60}
61
62fn remove_node_pid(data_dir: &Path) {
65 let _ = std::fs::remove_file(node_pid_file(data_dir));
66}
67
68fn read_node_pid(data_dir: &Path) -> Option<u32> {
71 std::fs::read_to_string(node_pid_file(data_dir))
72 .ok()
73 .and_then(|s| s.trim().parse().ok())
74}
75
76fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
93 let target_data_dir = config.data_dir.as_path();
94 for (pid, process) in sys.processes() {
95 if process.thread_kind().is_some() {
100 continue;
101 }
102 let Some(exe) = process.exe() else {
103 continue;
104 };
105 if exe != config.binary_path.as_path() {
106 continue;
107 }
108
109 let cmd = process.cmd();
110 let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
111 let arg = arg.to_string_lossy();
112 if let Some(value) = arg.strip_prefix("--root-dir=") {
113 Path::new(value) == target_data_dir
114 } else if arg == "--root-dir" {
115 cmd.get(i + 1)
116 .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
117 .unwrap_or(false)
118 } else {
119 false
120 }
121 });
122
123 if matches_root_dir {
124 return Some(pid.as_u32());
125 }
126 }
127 None
128}
129
130fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
139 if !is_process_alive(pid) {
140 return false;
141 }
142 match sys.process(sysinfo::Pid::from_u32(pid)) {
143 Some(process) => process.thread_kind().is_none(),
144 None => true,
145 }
146}
147
148fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
154 if let Some(pid) = read_node_pid(&config.data_dir) {
155 if pid_is_live_process(pid, sys) {
156 return Some(pid);
157 }
158 remove_node_pid(&config.data_dir);
162 }
163
164 let pid = find_running_node_process(sys, config)?;
165 write_node_pid(&config.data_dir, pid);
166 Some(pid)
167}
168
169fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
181 let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
182 let now_secs = std::time::SystemTime::now()
183 .duration_since(std::time::UNIX_EPOCH)
184 .ok()?
185 .as_secs();
186 let age = now_secs.saturating_sub(start_secs);
187 Instant::now().checked_sub(Duration::from_secs(age))
188}
189
190const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
192
193const CRASH_WINDOW: Duration = Duration::from_secs(300); const STABLE_DURATION: Duration = Duration::from_secs(300); const MAX_BACKOFF: Duration = Duration::from_secs(60);
202
203pub struct Supervisor {
205 event_tx: broadcast::Sender<NodeEvent>,
206 node_states: HashMap<u32, NodeRuntime>,
208 adopted: HashSet<u32>,
213 evicting: HashSet<u32>,
217}
218
219struct NodeRuntime {
220 status: NodeStatus,
221 pid: Option<u32>,
222 started_at: Option<Instant>,
223 restart_count: u32,
224 first_crash_at: Option<Instant>,
225 pending_version: Option<String>,
227}
228
229impl Supervisor {
230 pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
231 Self {
232 event_tx,
233 node_states: HashMap::new(),
234 adopted: HashSet::new(),
235 evicting: HashSet::new(),
236 }
237 }
238
239 pub fn is_adopted(&self, node_id: u32) -> bool {
242 self.adopted.contains(&node_id)
243 }
244
245 fn begin_evicting(&mut self, node_id: u32) {
249 self.evicting.insert(node_id);
250 }
251
252 fn finish_evicting(&mut self, node_id: u32) {
255 self.evicting.remove(&node_id);
256 }
257
258 fn mark_owned(&mut self, node_id: u32) {
261 self.adopted.remove(&node_id);
262 }
263
264 pub async fn start_node(
269 &mut self,
270 config: &NodeConfig,
271 supervisor_ref: Arc<RwLock<Supervisor>>,
272 registry_ref: Arc<RwLock<NodeRegistry>>,
273 ) -> Result<NodeStarted> {
274 let node_id = config.id;
275
276 if config.eviction.is_some() || self.evicting.contains(&node_id) {
281 return Err(Error::NodeEvicted(node_id));
282 }
283
284 if let Some(state) = self.node_states.get(&node_id) {
285 if state.status == NodeStatus::Running {
286 return Err(Error::NodeAlreadyRunning(node_id));
287 }
288 }
289
290 let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
291
292 let mut child = spawn_node_from_config(config).await?;
293 let pid = child
294 .id()
295 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
296
297 match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
302 Ok(Ok(exit_status)) => {
303 let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
307 let stderr_path = spawn_log_dir.join("stderr.log");
308 let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
309 let detail = if stderr_msg.trim().is_empty() {
310 format!("exit code: {exit_status}")
311 } else {
312 stderr_msg.trim().to_string()
313 };
314 self.node_states.insert(
315 node_id,
316 NodeRuntime {
317 status: NodeStatus::Errored,
318 pid: None,
319 started_at: None,
320 restart_count: 0,
321 first_crash_at: None,
322 pending_version: None,
323 },
324 );
325 return Err(Error::ProcessSpawn(format!(
326 "Node {node_id} exited immediately: {detail}"
327 )));
328 }
329 Ok(Err(e)) => {
330 return Err(Error::ProcessSpawn(format!(
331 "Failed to check node process status: {e}"
332 )));
333 }
334 Err(_) => {} }
336
337 self.node_states.insert(
338 node_id,
339 NodeRuntime {
340 status: NodeStatus::Running,
341 pid: Some(pid),
342 started_at: Some(Instant::now()),
343 restart_count: 0,
344 first_crash_at: None,
345 pending_version: None,
346 },
347 );
348 self.mark_owned(node_id);
351
352 let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
353
354 let result = NodeStarted {
355 node_id,
356 service_name: config.service_name.clone(),
357 pid,
358 };
359
360 let event_tx = self.event_tx.clone();
362 let config = config.clone();
363 tokio::spawn(async move {
364 monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
365 });
366
367 Ok(result)
368 }
369
370 pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
376 let state = self
377 .node_states
378 .get_mut(&node_id)
379 .ok_or(Error::NodeNotFound(node_id))?;
380
381 if state.status != NodeStatus::Running {
382 return Err(Error::NodeNotRunning(node_id));
383 }
384
385 let pid = state.pid;
386
387 let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
388 state.status = NodeStatus::Stopping;
389
390 if let Some(pid) = pid {
391 graceful_kill(pid).await;
392 }
393
394 let state = self.node_states.get_mut(&node_id).unwrap();
396 state.status = NodeStatus::Stopped;
397 state.pid = None;
398 state.started_at = None;
399
400 let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
401
402 Ok(())
403 }
404
405 pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
407 let mut stopped = Vec::new();
408 let mut failed = Vec::new();
409 let mut already_stopped = Vec::new();
410
411 for (node_id, service_name) in configs {
412 let node_id = *node_id;
413 match self.node_status(node_id) {
414 Ok(NodeStatus::Running) => {}
415 Ok(_) => {
416 already_stopped.push(node_id);
417 continue;
418 }
419 Err(_) => {
420 already_stopped.push(node_id);
421 continue;
422 }
423 }
424
425 match self.stop_node(node_id).await {
426 Ok(()) => {
427 stopped.push(NodeStopped {
428 node_id,
429 service_name: service_name.clone(),
430 });
431 }
432 Err(Error::NodeNotRunning(_)) => {
433 already_stopped.push(node_id);
434 }
435 Err(e) => {
436 failed.push(NodeStopFailed {
437 node_id,
438 service_name: service_name.clone(),
439 error: e.to_string(),
440 });
441 }
442 }
443 }
444
445 StopNodeResult {
446 stopped,
447 failed,
448 already_stopped,
449 }
450 }
451
452 pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
454 self.node_states
455 .get(&node_id)
456 .map(|s| s.status)
457 .ok_or(Error::NodeNotFound(node_id))
458 }
459
460 pub fn node_pid(&self, node_id: u32) -> Option<u32> {
462 self.node_states.get(&node_id).and_then(|s| s.pid)
463 }
464
465 pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
467 self.node_states
468 .get(&node_id)
469 .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
470 }
471
472 pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
474 self.node_states
475 .get(&node_id)
476 .and_then(|s| s.pending_version.clone())
477 }
478
479 fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
485 let Some(state) = self.node_states.get_mut(&node_id) else {
486 return false;
487 };
488 if state.status != NodeStatus::Running {
489 return false;
490 }
491 state.status = NodeStatus::UpgradeScheduled;
492 state.pending_version = Some(pending_version.clone());
493 let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
494 node_id,
495 pending_version,
496 });
497 true
498 }
499
500 pub fn is_running(&self, node_id: u32) -> bool {
502 self.node_states
503 .get(&node_id)
504 .is_some_and(|s| s.status == NodeStatus::Running)
505 }
506
507 pub fn node_counts(&self) -> (u32, u32, u32) {
509 let mut running = 0u32;
510 let mut stopped = 0u32;
511 let mut errored = 0u32;
512 for state in self.node_states.values() {
513 match state.status {
514 NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
516 running += 1
517 }
518 NodeStatus::Stopped | NodeStatus::Stopping | NodeStatus::Evicted => stopped += 1,
520 NodeStatus::Errored => errored += 1,
521 }
522 }
523 (running, stopped, errored)
524 }
525
526 fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
528 if let Some(state) = self.node_states.get_mut(&node_id) {
529 state.status = status;
530 state.pid = pid;
531 if status == NodeStatus::Running {
532 state.started_at = Some(Instant::now());
533 } else {
534 state.started_at = None;
538 }
539 }
540 }
541
542 pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
565 let mut sys = sysinfo::System::new();
571 sys.refresh_processes_specifics(
572 sysinfo::ProcessesToUpdate::All,
573 true,
574 sysinfo::ProcessRefreshKind::everything(),
575 );
576
577 let mut adopted = Vec::new();
578 for config in registry.list() {
579 let Some(pid) = resolve_adopted_pid(config, &sys) else {
580 continue;
581 };
582 self.node_states.insert(
583 config.id,
584 NodeRuntime {
585 status: NodeStatus::Running,
586 pid: Some(pid),
587 started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
595 restart_count: 0,
596 first_crash_at: None,
597 pending_version: None,
598 },
599 );
600 self.adopted.insert(config.id);
603 let _ = self.event_tx.send(NodeEvent::NodeStarted {
604 node_id: config.id,
605 pid,
606 });
607 adopted.push(config.id);
608 }
609 adopted
610 }
611
612 fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
615 let state = match self.node_states.get_mut(&node_id) {
616 Some(s) => s,
617 None => return (false, 0, Duration::ZERO),
618 };
619
620 let now = Instant::now();
621
622 if let Some(started_at) = state.started_at {
624 if started_at.elapsed() >= STABLE_DURATION {
625 state.restart_count = 0;
626 state.first_crash_at = None;
627 }
628 }
629
630 state.restart_count += 1;
631 let attempt = state.restart_count;
632
633 if state.first_crash_at.is_none() {
634 state.first_crash_at = Some(now);
635 }
636
637 if let Some(first_crash) = state.first_crash_at {
639 if attempt >= MAX_CRASHES_BEFORE_ERRORED
640 && now.duration_since(first_crash) < CRASH_WINDOW
641 {
642 state.status = NodeStatus::Errored;
643 state.pid = None;
644 state.started_at = None;
645 return (false, attempt, Duration::ZERO);
646 }
647 }
648
649 let backoff_secs = 1u64 << (attempt - 1).min(5);
651 let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
652
653 (true, attempt, backoff)
654 }
655}
656
657pub fn spawn_upgrade_monitor(
667 registry: Arc<RwLock<NodeRegistry>>,
668 supervisor: Arc<RwLock<Supervisor>>,
669 interval: Duration,
670 shutdown: CancellationToken,
671) {
672 tokio::spawn(async move {
673 let mut ticker = tokio::time::interval(interval);
674 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
678 ticker.tick().await;
681
682 loop {
683 tokio::select! {
684 _ = shutdown.cancelled() => return,
685 _ = ticker.tick() => {},
686 }
687
688 let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
691 let reg = registry.read().await;
692 let sup = supervisor.read().await;
693 reg.list()
694 .into_iter()
695 .filter_map(|config| match sup.node_status(config.id) {
696 Ok(NodeStatus::Running) => Some((
697 config.id,
698 config.binary_path.clone(),
699 config.version.clone(),
700 sup.node_pending_version(config.id),
701 )),
702 _ => None,
703 })
704 .collect()
705 };
706
707 for (node_id, binary_path, recorded_version, current_pending) in candidates {
708 let observed = match extract_version(&binary_path).await {
709 Ok(v) => v,
710 Err(_) => continue,
712 };
713 if observed == recorded_version {
714 continue;
715 }
716 if current_pending.as_deref() == Some(observed.as_str()) {
717 continue;
718 }
719 supervisor
720 .write()
721 .await
722 .mark_upgrade_scheduled(node_id, observed);
723 }
724 }
725 });
726}
727
728pub fn spawn_eviction_monitor(
741 registry: Arc<RwLock<NodeRegistry>>,
742 supervisor: Arc<RwLock<Supervisor>>,
743 event_tx: broadcast::Sender<NodeEvent>,
744 health: Arc<RwLock<FleetHealth>>,
745 thresholds: DiskThresholds,
746 interval: Duration,
747 shutdown: CancellationToken,
748) {
749 tokio::spawn(async move {
750 let mut ticker = tokio::time::interval(interval);
751 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
752 ticker.tick().await;
754
755 loop {
756 tokio::select! {
757 _ = shutdown.cancelled() => return,
758 _ = ticker.tick() => {},
759 }
760
761 run_eviction_cycle(®istry, &supervisor, &event_tx, &health, &thresholds).await;
762 }
763 });
764}
765
766async fn run_eviction_cycle(
768 registry: &Arc<RwLock<NodeRegistry>>,
769 supervisor: &Arc<RwLock<Supervisor>>,
770 event_tx: &broadcast::Sender<NodeEvent>,
771 health: &Arc<RwLock<FleetHealth>>,
772 thresholds: &DiskThresholds,
773) {
774 for _ in 0..MAX_EVICTIONS_PER_CYCLE {
775 let partitions = disk::partition_states(running_nodes(registry, supervisor).await);
776
777 let target = partitions
781 .iter()
782 .find(|p| p.available_bytes <= thresholds.eviction_bytes && p.nodes.len() >= 2);
783
784 let Some(partition) = target else {
785 publish_health(
787 health,
788 event_tx,
789 FleetHealth::from_partitions(&partitions, thresholds),
790 )
791 .await;
792 return;
793 };
794
795 let Some(candidate) = partition.eviction_candidate().cloned() else {
796 break;
797 };
798
799 evict_node(
800 registry,
801 supervisor,
802 event_tx,
803 &candidate,
804 partition.available_bytes,
805 )
806 .await;
807 }
808
809 let partitions = disk::partition_states(running_nodes(registry, supervisor).await);
812 publish_health(
813 health,
814 event_tx,
815 FleetHealth::from_partitions(&partitions, thresholds),
816 )
817 .await;
818}
819
820async fn running_nodes(
822 registry: &Arc<RwLock<NodeRegistry>>,
823 supervisor: &Arc<RwLock<Supervisor>>,
824) -> Vec<(u32, PathBuf)> {
825 let reg = registry.read().await;
826 let sup = supervisor.read().await;
827 reg.list()
828 .into_iter()
829 .filter(|config| config.eviction.is_none())
830 .filter(|config| matches!(sup.node_status(config.id), Ok(NodeStatus::Running)))
831 .map(|config| (config.id, config.data_dir.clone()))
832 .collect()
833}
834
835async fn remove_dir_all_with_retry(path: &Path) -> std::io::Result<()> {
843 const MAX_ATTEMPTS: u32 = 8;
844 let mut delay = Duration::from_millis(100);
845 for attempt in 1..=MAX_ATTEMPTS {
846 match std::fs::remove_dir_all(path) {
847 Ok(()) => return Ok(()),
848 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
850 Err(e) if attempt == MAX_ATTEMPTS => return Err(e),
851 Err(_) => {
852 tokio::time::sleep(delay).await;
853 delay = (delay * 2).min(Duration::from_secs(1));
854 }
855 }
856 }
857 Ok(())
858}
859
860async fn persist_eviction_marker(
862 registry: &Arc<RwLock<NodeRegistry>>,
863 node_id: u32,
864 reason: &str,
865 evicted_at: u64,
866 reclaimed_bytes: u64,
867) {
868 let mut reg = registry.write().await;
869 if let Ok(config) = reg.get_mut(node_id) {
870 config.eviction = Some(EvictionRecord {
871 reason: reason.to_string(),
872 evicted_at,
873 reclaimed_bytes,
874 });
875 }
876 if let Err(e) = reg.save() {
877 tracing::error!("Eviction: failed to persist registry for node {node_id}: {e}");
878 }
879}
880
881async fn evict_node(
890 registry: &Arc<RwLock<NodeRegistry>>,
891 supervisor: &Arc<RwLock<Supervisor>>,
892 event_tx: &broadcast::Sender<NodeEvent>,
893 candidate: &disk::NodeDiskUsage,
894 available_before: u64,
895) {
896 let node_id = candidate.node_id;
897 let reclaimable = candidate.size_bytes;
898 let evicted_at = now_unix_secs();
899
900 supervisor.write().await.begin_evicting(node_id);
902 let pending_reason = format!(
903 "Automatically evicted to reclaim disk space: only {} free on its partition. \
904 Deleting its data directory to recover ~{}.",
905 fmt_bytes(available_before),
906 fmt_bytes(reclaimable),
907 );
908 persist_eviction_marker(registry, node_id, &pending_reason, evicted_at, reclaimable).await;
909
910 if let Err(e) = supervisor.write().await.stop_node(node_id).await {
913 tracing::warn!("Eviction: failed to stop node {node_id} before deletion: {e}");
914 }
915
916 let deleted = match remove_dir_all_with_retry(&candidate.data_dir).await {
920 Ok(()) => true,
921 Err(e) => {
922 tracing::error!(
923 "Eviction: could not delete data dir {} for node {node_id} after retries: {e}. \
924 Disk space was NOT reclaimed; manual cleanup may be required.",
925 candidate.data_dir.display()
926 );
927 false
928 }
929 };
930
931 let (reclaimed_bytes, reason) = if deleted {
934 (
935 reclaimable,
936 format!(
937 "Automatically evicted to reclaim disk space: only {} free on its partition. \
938 Its data directory was deleted, recovering ~{}.",
939 fmt_bytes(available_before),
940 fmt_bytes(reclaimable),
941 ),
942 )
943 } else {
944 (
945 0,
946 format!(
947 "Automatically evicted due to low disk space (only {} free on its partition), but \
948 its data directory could not be deleted, so space was not reclaimed. Manual \
949 cleanup of {} may be needed.",
950 fmt_bytes(available_before),
951 candidate.data_dir.display(),
952 ),
953 )
954 };
955 persist_eviction_marker(registry, node_id, &reason, evicted_at, reclaimed_bytes).await;
956 {
957 let mut sup = supervisor.write().await;
958 sup.update_state(node_id, NodeStatus::Evicted, None);
959 sup.finish_evicting(node_id);
960 }
961
962 tracing::info!(
963 "Evicted node {node_id}, reclaimed ~{} ({reason})",
964 fmt_bytes(reclaimed_bytes)
965 );
966 let _ = event_tx.send(NodeEvent::NodeEvicted {
967 node_id,
968 reason,
969 reclaimed_bytes,
970 });
971}
972
973async fn publish_health(
975 health: &Arc<RwLock<FleetHealth>>,
976 event_tx: &broadcast::Sender<NodeEvent>,
977 next: FleetHealth,
978) {
979 let changed = {
980 let mut current = health.write().await;
981 let changed = current.overall != next.overall;
982 *current = next.clone();
983 changed
984 };
985 if changed {
986 let _ = event_tx.send(NodeEvent::FleetHealthChanged {
987 overall: serde_json::to_value(next.overall)
988 .ok()
989 .and_then(|v| v.as_str().map(str::to_owned))
990 .unwrap_or_default(),
991 });
992 }
993}
994
995fn now_unix_secs() -> u64 {
997 std::time::SystemTime::now()
998 .duration_since(std::time::UNIX_EPOCH)
999 .map(|d| d.as_secs())
1000 .unwrap_or(0)
1001}
1002
1003fn fmt_bytes(bytes: u64) -> String {
1005 const MIB: f64 = 1024.0 * 1024.0;
1006 const GIB: f64 = 1024.0 * MIB;
1007 let b = bytes as f64;
1008 if b >= GIB {
1009 format!("{:.2} GiB", b / GIB)
1010 } else {
1011 format!("{:.0} MiB", b / MIB)
1012 }
1013}
1014
1015pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
1017 let mut args = vec![
1018 "--rewards-address".to_string(),
1019 config.rewards_address.clone(),
1020 "--root-dir".to_string(),
1021 config.data_dir.display().to_string(),
1022 ];
1023
1024 if let Some(ref log_dir) = config.log_dir {
1025 args.push("--enable-logging".to_string());
1026 args.push("--log-dir".to_string());
1027 args.push(log_dir.display().to_string());
1028 }
1029
1030 if let Some(port) = config.node_port {
1031 args.push("--port".to_string());
1032 args.push(port.to_string());
1033 }
1034
1035 for peer in &config.bootstrap_peers {
1036 args.push("--bootstrap".to_string());
1037 args.push(peer.clone());
1038 }
1039
1040 if let Some(channel) = config.upgrade_channel {
1041 args.push("--upgrade-channel".to_string());
1042 args.push(channel.to_string());
1043 }
1044
1045 args.push("--stop-on-upgrade".to_string());
1050
1051 args.push("--evm-network".to_string());
1054 args.push(config.evm_network.as_arg().to_string());
1055
1056 args
1057}
1058
1059async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
1065 let args = build_node_args(config);
1066 let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
1067
1068 let log_dir = config
1069 .log_dir
1070 .as_deref()
1071 .unwrap_or(config.data_dir.as_path());
1072
1073 let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
1074 if let Some(pid) = child.id() {
1075 write_node_pid(&config.data_dir, pid);
1076 }
1077 Ok(child)
1078}
1079
1080async fn monitor_node(
1084 child: tokio::process::Child,
1085 mut config: NodeConfig,
1086 supervisor: Arc<RwLock<Supervisor>>,
1087 registry: Arc<RwLock<NodeRegistry>>,
1088 event_tx: broadcast::Sender<NodeEvent>,
1089) {
1090 monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
1091 remove_node_pid(&config.data_dir);
1092}
1093
1094async fn monitor_node_inner(
1095 mut child: tokio::process::Child,
1096 config: &mut NodeConfig,
1097 supervisor: Arc<RwLock<Supervisor>>,
1098 registry: Arc<RwLock<NodeRegistry>>,
1099 event_tx: broadcast::Sender<NodeEvent>,
1100) {
1101 let node_id = config.id;
1102
1103 loop {
1104 let exit_status = child.wait().await;
1106
1107 let status_at_exit = {
1109 let sup = supervisor.read().await;
1110 sup.node_status(node_id).ok()
1111 };
1112
1113 match status_at_exit {
1114 Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) | Some(NodeStatus::Evicted) => {
1117 return
1118 }
1119 Some(NodeStatus::UpgradeScheduled) => {
1120 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
1123 Ok(new_child) => {
1124 child = new_child;
1125 continue;
1126 }
1127 Err(e) => {
1128 let _ = event_tx.send(NodeEvent::NodeErrored {
1129 node_id,
1130 message: format!("Failed to respawn after upgrade: {e}"),
1131 });
1132 let mut sup = supervisor.write().await;
1133 sup.update_state(node_id, NodeStatus::Errored, None);
1134 return;
1135 }
1136 }
1137 }
1138 _ => {}
1139 }
1140
1141 let exit_code = exit_status.ok().and_then(|s| s.code());
1142
1143 if exit_code == Some(0) {
1155 if let Ok(disk_version) = extract_version(&config.binary_path).await {
1156 if disk_version != config.version {
1157 {
1158 let mut sup = supervisor.write().await;
1159 sup.mark_upgrade_scheduled(node_id, disk_version.clone());
1160 }
1161 match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
1162 Ok(new_child) => {
1163 child = new_child;
1164 continue;
1165 }
1166 Err(e) => {
1167 let _ = event_tx.send(NodeEvent::NodeErrored {
1168 node_id,
1169 message: format!("Failed to respawn after upgrade: {e}"),
1170 });
1171 let mut sup = supervisor.write().await;
1172 sup.update_state(node_id, NodeStatus::Errored, None);
1173 return;
1174 }
1175 }
1176 }
1177 }
1178 }
1182
1183 let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
1185
1186 let (should_restart, attempt, backoff) = {
1187 let mut sup = supervisor.write().await;
1188 sup.record_crash(node_id)
1189 };
1190
1191 if !should_restart {
1192 let _ = event_tx.send(NodeEvent::NodeErrored {
1193 node_id,
1194 message: format!(
1195 "Node crashed {} times within {} seconds, giving up",
1196 MAX_CRASHES_BEFORE_ERRORED,
1197 CRASH_WINDOW.as_secs()
1198 ),
1199 });
1200 return;
1201 }
1202
1203 let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
1204
1205 tokio::time::sleep(backoff).await;
1206
1207 match spawn_node_from_config(&*config).await {
1209 Ok(new_child) => {
1210 let pid = match new_child.id() {
1211 Some(pid) => pid,
1212 None => {
1213 let _ = event_tx.send(NodeEvent::NodeErrored {
1215 node_id,
1216 message: "Restarted process exited before PID could be read"
1217 .to_string(),
1218 });
1219 let mut sup = supervisor.write().await;
1220 sup.update_state(node_id, NodeStatus::Errored, None);
1221 return;
1222 }
1223 };
1224 {
1225 let mut sup = supervisor.write().await;
1226 sup.update_state(node_id, NodeStatus::Running, Some(pid));
1227 }
1228 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
1229 child = new_child;
1230 }
1231 Err(e) => {
1232 let _ = event_tx.send(NodeEvent::NodeErrored {
1233 node_id,
1234 message: format!("Failed to restart node: {e}"),
1235 });
1236 let mut sup = supervisor.write().await;
1237 sup.update_state(node_id, NodeStatus::Errored, None);
1238 return;
1239 }
1240 }
1241 }
1242}
1243
1244async fn respawn_upgraded_node(
1249 config: &mut NodeConfig,
1250 supervisor: &Arc<RwLock<Supervisor>>,
1251 registry: &Arc<RwLock<NodeRegistry>>,
1252 event_tx: &broadcast::Sender<NodeEvent>,
1253) -> Result<tokio::process::Child> {
1254 let node_id = config.id;
1255 let old_version = config.version.clone();
1256
1257 let new_child = spawn_node_from_config(config).await?;
1258 let pid = new_child
1259 .id()
1260 .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
1261
1262 let new_version = extract_version(&config.binary_path).await.ok();
1265
1266 if let Some(ref version) = new_version {
1267 config.version = version.clone();
1268 let mut reg = registry.write().await;
1269 if let Ok(stored) = reg.get_mut(node_id) {
1270 stored.version = version.clone();
1271 let _ = reg.save();
1272 }
1273 }
1274
1275 {
1276 let mut sup = supervisor.write().await;
1277 if let Some(state) = sup.node_states.get_mut(&node_id) {
1278 state.status = NodeStatus::Running;
1279 state.pid = Some(pid);
1280 state.started_at = Some(Instant::now());
1281 state.pending_version = None;
1282 state.restart_count = 0;
1283 state.first_crash_at = None;
1284 }
1285 }
1286
1287 let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
1288 if let Some(version) = new_version {
1289 let _ = event_tx.send(NodeEvent::NodeUpgraded {
1290 node_id,
1291 old_version,
1292 new_version: version,
1293 });
1294 }
1295
1296 Ok(new_child)
1297}
1298
1299const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
1301
1302async fn graceful_kill(pid: u32) {
1304 send_signal_term(pid);
1305
1306 let start = Instant::now();
1308 loop {
1309 if !is_process_alive(pid) {
1310 return;
1311 }
1312 if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
1313 break;
1314 }
1315 tokio::time::sleep(Duration::from_millis(100)).await;
1316 }
1317
1318 send_signal_kill(pid);
1320
1321 for _ in 0..10 {
1323 if !is_process_alive(pid) {
1324 return;
1325 }
1326 tokio::time::sleep(Duration::from_millis(50)).await;
1327 }
1328}
1329
1330fn liveness_should_stop(
1343 snapshot_pid: u32,
1344 current_pid: Option<u32>,
1345 current_status: Option<NodeStatus>,
1346) -> bool {
1347 current_status == Some(NodeStatus::Running) && current_pid == Some(snapshot_pid)
1348}
1349
1350pub fn spawn_liveness_monitor(
1362 registry: Arc<RwLock<NodeRegistry>>,
1363 supervisor: Arc<RwLock<Supervisor>>,
1364 event_tx: broadcast::Sender<NodeEvent>,
1365 interval: Duration,
1366 shutdown: CancellationToken,
1367) {
1368 tokio::spawn(async move {
1369 let mut ticker = tokio::time::interval(interval);
1370 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1374 loop {
1375 tokio::select! {
1376 _ = shutdown.cancelled() => return,
1377 _ = ticker.tick() => {}
1378 }
1379
1380 let candidates: Vec<(u32, u32, PathBuf)> =
1382 {
1383 let sup = supervisor.read().await;
1384 let reg = registry.read().await;
1385 reg.list()
1386 .into_iter()
1387 .filter_map(|config| {
1388 let pid = sup.node_pid(config.id)?;
1389 matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1390 .then_some((config.id, pid, config.data_dir.clone()))
1391 })
1392 .collect()
1393 };
1394
1395 for (node_id, pid, data_dir) in candidates {
1396 if is_process_alive(pid) {
1397 continue;
1398 }
1399
1400 if supervisor.read().await.is_adopted(node_id) {
1406 let config = {
1407 let reg = registry.read().await;
1408 reg.get(node_id).ok().cloned()
1409 };
1410 if let Some(mut config) = config {
1411 let drifted = matches!(
1412 extract_version(&config.binary_path).await,
1413 Ok(disk_version) if disk_version != config.version
1414 );
1415 if drifted {
1416 match respawn_upgraded_node(
1417 &mut config,
1418 &supervisor,
1419 ®istry,
1420 &event_tx,
1421 )
1422 .await
1423 {
1424 Ok(child) => {
1425 supervisor.write().await.mark_owned(node_id);
1428 let sup_ref = Arc::clone(&supervisor);
1429 let reg_ref = Arc::clone(®istry);
1430 let ev = event_tx.clone();
1431 tokio::spawn(async move {
1432 monitor_node(child, config, sup_ref, reg_ref, ev).await;
1433 });
1434 continue;
1435 }
1436 Err(e) => {
1437 let _ = event_tx.send(NodeEvent::NodeErrored {
1438 node_id,
1439 message: format!(
1440 "Failed to respawn adopted node after upgrade: {e}"
1441 ),
1442 });
1443 let mut sup = supervisor.write().await;
1444 sup.update_state(node_id, NodeStatus::Errored, None);
1445 sup.mark_owned(node_id);
1446 remove_node_pid(&data_dir);
1447 continue;
1448 }
1449 }
1450 }
1451 }
1452 }
1453
1454 let mut sup = supervisor.write().await;
1455 if !liveness_should_stop(pid, sup.node_pid(node_id), sup.node_status(node_id).ok())
1458 {
1459 continue;
1460 }
1461 sup.update_state(node_id, NodeStatus::Stopped, None);
1462 let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1463 remove_node_pid(&data_dir);
1464 }
1465 }
1466 });
1467}
1468
1469#[cfg(unix)]
1470fn pid_to_i32(pid: u32) -> Option<i32> {
1471 i32::try_from(pid).ok().filter(|&p| p > 0)
1472}
1473
1474#[cfg(unix)]
1475fn send_signal_term(pid: u32) {
1476 if let Some(pid) = pid_to_i32(pid) {
1477 unsafe {
1478 libc::kill(pid, libc::SIGTERM);
1479 }
1480 }
1481}
1482
1483#[cfg(unix)]
1484fn send_signal_kill(pid: u32) {
1485 if let Some(pid) = pid_to_i32(pid) {
1486 unsafe {
1487 libc::kill(pid, libc::SIGKILL);
1488 }
1489 }
1490}
1491
1492#[cfg(unix)]
1493fn is_process_alive(pid: u32) -> bool {
1494 let Some(pid) = pid_to_i32(pid) else {
1495 return false;
1496 };
1497 let ret = unsafe { libc::kill(pid, 0) };
1498 if ret == 0 {
1499 return true;
1500 }
1501 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1503}
1504
1505#[cfg(windows)]
1506fn send_signal_term(pid: u32) {
1507 use windows_sys::Win32::System::Console::{
1508 AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1509 };
1510
1511 unsafe {
1512 FreeConsole();
1515
1516 if AttachConsole(pid) != 0 {
1518 SetConsoleCtrlHandler(None, 1);
1521 GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1522 FreeConsole();
1525 std::thread::sleep(std::time::Duration::from_millis(50));
1529 SetConsoleCtrlHandler(None, 0);
1532 }
1533 }
1534}
1535
1536#[cfg(windows)]
1537fn send_signal_kill(pid: u32) {
1538 use windows_sys::Win32::Foundation::CloseHandle;
1539 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1540
1541 unsafe {
1542 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1543 if !handle.is_null() {
1544 TerminateProcess(handle, 1);
1545 CloseHandle(handle);
1546 }
1547 }
1548}
1549
1550#[cfg(windows)]
1551fn is_process_alive(pid: u32) -> bool {
1552 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1553 use windows_sys::Win32::System::Threading::{
1554 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1555 };
1556
1557 unsafe {
1558 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1559 if handle.is_null() {
1560 return false;
1561 }
1562 let mut exit_code: u32 = 0;
1563 let success = GetExitCodeProcess(handle, &mut exit_code);
1564 CloseHandle(handle);
1565 success != 0 && exit_code == STILL_ACTIVE as u32
1566 }
1567}
1568
1569#[cfg(test)]
1570mod tests {
1571 use super::*;
1572 use crate::node::types::{EvmNetwork, UpgradeChannel};
1573
1574 #[tokio::test]
1575 async fn remove_dir_all_with_retry_deletes_tree_and_tolerates_missing() {
1576 let tmp = tempfile::tempdir().unwrap();
1577 let dir = tmp.path().join("node-data");
1578 std::fs::create_dir_all(dir.join("sub")).unwrap();
1579 std::fs::write(dir.join("sub").join("data.mdb"), vec![0u8; 128]).unwrap();
1580
1581 remove_dir_all_with_retry(&dir).await.unwrap();
1583 assert!(!dir.exists());
1584
1585 remove_dir_all_with_retry(&dir).await.unwrap();
1587 }
1588
1589 #[tokio::test]
1590 async fn start_node_rejects_a_node_being_evicted() {
1591 let (tx, _rx) = broadcast::channel(16);
1592 let sup = Arc::new(RwLock::new(Supervisor::new(tx)));
1593
1594 let tmp = tempfile::tempdir().unwrap();
1595 let reg = Arc::new(RwLock::new(
1596 NodeRegistry::load(&tmp.path().join("reg.json")).unwrap(),
1597 ));
1598
1599 let config = NodeConfig {
1600 id: 7,
1601 service_name: "node7".to_string(),
1602 rewards_address: "0xabc".to_string(),
1603 data_dir: tmp.path().join("node-7"),
1604 log_dir: None,
1605 node_port: None,
1606 binary_path: "/bin/node".into(),
1607 version: "0.1.0".to_string(),
1608 env_variables: HashMap::new(),
1609 bootstrap_peers: vec![],
1610 upgrade_channel: None,
1611 evm_network: EvmNetwork::default(),
1612 eviction: None,
1613 };
1614
1615 sup.write().await.begin_evicting(7);
1618 let res = sup
1619 .write()
1620 .await
1621 .start_node(&config, sup.clone(), reg.clone())
1622 .await;
1623 assert!(matches!(res, Err(Error::NodeEvicted(7))));
1624
1625 sup.write().await.finish_evicting(7);
1627 assert!(!sup.read().await.evicting.contains(&7));
1628 }
1629
1630 #[test]
1631 fn adopted_flag_lifecycle() {
1632 let (tx, _rx) = broadcast::channel(16);
1633 let mut sup = Supervisor::new(tx);
1634
1635 assert!(!sup.is_adopted(1));
1637
1638 sup.adopted.insert(1);
1640 assert!(sup.is_adopted(1));
1641
1642 sup.mark_owned(1);
1645 assert!(!sup.is_adopted(1));
1646 }
1647
1648 #[test]
1656 fn liveness_does_not_stop_node_respawned_under_it() {
1657 let dead_snapshot_pid = 1000; let live_respawned_pid = Some(2000); assert!(
1660 !liveness_should_stop(
1661 dead_snapshot_pid,
1662 live_respawned_pid,
1663 Some(NodeStatus::Running)
1664 ),
1665 "liveness must not stop a node whose PID changed under it (respawned with a live PID)"
1666 );
1667 }
1668
1669 #[test]
1670 fn build_node_args_basic() {
1671 let config = NodeConfig {
1672 id: 1,
1673 service_name: "node1".to_string(),
1674 rewards_address: "0xabc123".to_string(),
1675 data_dir: "/data/node-1".into(),
1676 log_dir: Some("/logs/node-1".into()),
1677 node_port: Some(12000),
1678 binary_path: "/bin/node".into(),
1679 version: "0.1.0".to_string(),
1680 env_variables: HashMap::new(),
1681 bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1682 upgrade_channel: None,
1683 evm_network: EvmNetwork::default(),
1684 eviction: None,
1685 };
1686
1687 let args = build_node_args(&config);
1688
1689 assert!(args.contains(&"--rewards-address".to_string()));
1690 assert!(args.contains(&"0xabc123".to_string()));
1691 assert!(args.contains(&"--root-dir".to_string()));
1692 assert!(args.contains(&"/data/node-1".to_string()));
1693 assert!(args.contains(&"--enable-logging".to_string()));
1694 assert!(args.contains(&"--log-dir".to_string()));
1695 assert!(args.contains(&"/logs/node-1".to_string()));
1696 assert!(args.contains(&"--port".to_string()));
1697 assert!(args.contains(&"12000".to_string()));
1698 assert!(args.contains(&"--bootstrap".to_string()));
1699 assert!(args.contains(&"peer1".to_string()));
1700 assert!(args.contains(&"peer2".to_string()));
1701 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1702 assert!(!args.contains(&"--upgrade-channel".to_string()));
1704 assert_eq!(evm_network_arg(&args), Some("arbitrum-one"));
1706 }
1707
1708 fn evm_network_arg(args: &[String]) -> Option<&str> {
1710 let idx = args.iter().position(|a| a == "--evm-network")?;
1711 args.get(idx + 1).map(String::as_str)
1712 }
1713
1714 #[test]
1715 fn build_node_args_emits_evm_network_flag() {
1716 let mut config = NodeConfig {
1717 id: 1,
1718 service_name: "node1".to_string(),
1719 rewards_address: "0xabc".to_string(),
1720 data_dir: "/data/node-1".into(),
1721 log_dir: None,
1722 node_port: None,
1723 binary_path: "/bin/node".into(),
1724 version: "0.1.0".to_string(),
1725 env_variables: HashMap::new(),
1726 bootstrap_peers: vec![],
1727 upgrade_channel: None,
1728 evm_network: EvmNetwork::ArbitrumSepolia,
1729 eviction: None,
1730 };
1731
1732 let args = build_node_args(&config);
1733 assert_eq!(evm_network_arg(&args), Some("arbitrum-sepolia"));
1734
1735 config.evm_network = EvmNetwork::ArbitrumOne;
1736 let args = build_node_args(&config);
1737 assert_eq!(evm_network_arg(&args), Some("arbitrum-one"));
1738 }
1739
1740 #[test]
1741 fn build_node_args_includes_upgrade_channel() {
1742 let mut config = NodeConfig {
1743 id: 1,
1744 service_name: "node1".to_string(),
1745 rewards_address: "0xabc".to_string(),
1746 data_dir: "/data/node-1".into(),
1747 log_dir: None,
1748 node_port: None,
1749 binary_path: "/bin/node".into(),
1750 version: "0.1.0".to_string(),
1751 env_variables: HashMap::new(),
1752 bootstrap_peers: vec![],
1753 upgrade_channel: Some(UpgradeChannel::Beta),
1754 evm_network: EvmNetwork::default(),
1755 eviction: None,
1756 };
1757
1758 let args = build_node_args(&config);
1759 let idx = args
1760 .iter()
1761 .position(|a| a == "--upgrade-channel")
1762 .expect("--upgrade-channel should be present");
1763 assert_eq!(args[idx + 1], "beta");
1764
1765 config.upgrade_channel = Some(UpgradeChannel::Stable);
1766 let args = build_node_args(&config);
1767 let idx = args.iter().position(|a| a == "--upgrade-channel").unwrap();
1768 assert_eq!(args[idx + 1], "stable");
1769 }
1770
1771 #[test]
1772 fn build_node_args_minimal() {
1773 let config = NodeConfig {
1774 id: 1,
1775 service_name: "node1".to_string(),
1776 rewards_address: "0xabc".to_string(),
1777 data_dir: "/data/node-1".into(),
1778 log_dir: None,
1779 node_port: None,
1780 binary_path: "/bin/node".into(),
1781 version: "0.1.0".to_string(),
1782 env_variables: HashMap::new(),
1783 bootstrap_peers: vec![],
1784 upgrade_channel: None,
1785 evm_network: EvmNetwork::default(),
1786 eviction: None,
1787 };
1788
1789 let args = build_node_args(&config);
1790
1791 assert!(args.contains(&"--rewards-address".to_string()));
1792 assert!(args.contains(&"--root-dir".to_string()));
1793 assert!(!args.contains(&"--enable-logging".to_string()));
1794 assert!(!args.contains(&"--log-dir".to_string()));
1795 assert!(!args.contains(&"--port".to_string()));
1796 assert!(!args.contains(&"--bootstrap".to_string()));
1797 assert!(args.contains(&"--stop-on-upgrade".to_string()));
1798 }
1799
1800 #[test]
1801 fn record_crash_backoff_increases() {
1802 let (tx, _rx) = broadcast::channel(16);
1803 let mut sup = Supervisor::new(tx);
1804
1805 sup.node_states.insert(
1807 1,
1808 NodeRuntime {
1809 status: NodeStatus::Running,
1810 pid: Some(100),
1811 started_at: Some(Instant::now()),
1812 restart_count: 0,
1813 first_crash_at: None,
1814 pending_version: None,
1815 },
1816 );
1817
1818 let (should_restart, attempt, backoff) = sup.record_crash(1);
1819 assert!(should_restart);
1820 assert_eq!(attempt, 1);
1821 assert_eq!(backoff, Duration::from_secs(1));
1822
1823 let (should_restart, attempt, backoff) = sup.record_crash(1);
1824 assert!(should_restart);
1825 assert_eq!(attempt, 2);
1826 assert_eq!(backoff, Duration::from_secs(2));
1827
1828 let (should_restart, attempt, backoff) = sup.record_crash(1);
1829 assert!(should_restart);
1830 assert_eq!(attempt, 3);
1831 assert_eq!(backoff, Duration::from_secs(4));
1832
1833 let (should_restart, attempt, backoff) = sup.record_crash(1);
1834 assert!(should_restart);
1835 assert_eq!(attempt, 4);
1836 assert_eq!(backoff, Duration::from_secs(8));
1837
1838 let (should_restart, attempt, _) = sup.record_crash(1);
1840 assert!(!should_restart);
1841 assert_eq!(attempt, 5);
1842 assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1843 }
1844
1845 #[test]
1846 fn node_counts_tracks_states() {
1847 let (tx, _rx) = broadcast::channel(16);
1848 let mut sup = Supervisor::new(tx);
1849
1850 sup.node_states.insert(
1851 1,
1852 NodeRuntime {
1853 status: NodeStatus::Running,
1854 pid: Some(100),
1855 started_at: Some(Instant::now()),
1856 restart_count: 0,
1857 first_crash_at: None,
1858 pending_version: None,
1859 },
1860 );
1861 sup.node_states.insert(
1862 2,
1863 NodeRuntime {
1864 status: NodeStatus::Stopped,
1865 pid: None,
1866 started_at: None,
1867 restart_count: 0,
1868 first_crash_at: None,
1869 pending_version: None,
1870 },
1871 );
1872 sup.node_states.insert(
1873 3,
1874 NodeRuntime {
1875 status: NodeStatus::Errored,
1876 pid: None,
1877 started_at: None,
1878 restart_count: 5,
1879 first_crash_at: None,
1880 pending_version: None,
1881 },
1882 );
1883
1884 let (running, stopped, errored) = sup.node_counts();
1885 assert_eq!(running, 1);
1886 assert_eq!(stopped, 1);
1887 assert_eq!(errored, 1);
1888 }
1889
1890 #[test]
1891 fn mark_upgrade_scheduled_only_affects_running_nodes() {
1892 let (tx, mut rx) = broadcast::channel(16);
1893 let mut sup = Supervisor::new(tx);
1894
1895 sup.node_states.insert(
1896 1,
1897 NodeRuntime {
1898 status: NodeStatus::Running,
1899 pid: Some(111),
1900 started_at: Some(Instant::now()),
1901 restart_count: 0,
1902 first_crash_at: None,
1903 pending_version: None,
1904 },
1905 );
1906 sup.node_states.insert(
1907 2,
1908 NodeRuntime {
1909 status: NodeStatus::Stopped,
1910 pid: None,
1911 started_at: None,
1912 restart_count: 0,
1913 first_crash_at: None,
1914 pending_version: None,
1915 },
1916 );
1917
1918 let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1920 assert!(affected);
1921 assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1922 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1923 match rx.try_recv() {
1924 Ok(NodeEvent::UpgradeScheduled {
1925 node_id,
1926 pending_version,
1927 }) => {
1928 assert_eq!(node_id, 1);
1929 assert_eq!(pending_version, "0.10.11-rc.1");
1930 }
1931 other => panic!("expected UpgradeScheduled event, got {other:?}"),
1932 }
1933
1934 let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1936 assert!(!affected);
1937 assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1938 assert!(sup.node_pending_version(2).is_none());
1939
1940 let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1942 assert!(!affected);
1943 assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1945 }
1946
1947 #[test]
1948 fn node_counts_counts_upgrade_scheduled_as_running() {
1949 let (tx, _rx) = broadcast::channel(16);
1950 let mut sup = Supervisor::new(tx);
1951
1952 sup.node_states.insert(
1953 1,
1954 NodeRuntime {
1955 status: NodeStatus::UpgradeScheduled,
1956 pid: Some(111),
1957 started_at: Some(Instant::now()),
1958 restart_count: 0,
1959 first_crash_at: None,
1960 pending_version: Some("0.10.11-rc.1".to_string()),
1961 },
1962 );
1963
1964 let (running, stopped, errored) = sup.node_counts();
1965 assert_eq!(running, 1);
1966 assert_eq!(stopped, 0);
1967 assert_eq!(errored, 0);
1968 }
1969
1970 #[tokio::test]
1971 async fn stop_node_not_found() {
1972 let (tx, _rx) = broadcast::channel(16);
1973 let mut sup = Supervisor::new(tx);
1974
1975 let result = sup.stop_node(999).await;
1976 assert!(matches!(result, Err(Error::NodeNotFound(999))));
1977 }
1978
1979 #[tokio::test]
1980 async fn stop_node_not_running() {
1981 let (tx, _rx) = broadcast::channel(16);
1982 let mut sup = Supervisor::new(tx);
1983
1984 sup.node_states.insert(
1985 1,
1986 NodeRuntime {
1987 status: NodeStatus::Stopped,
1988 pid: None,
1989 started_at: None,
1990 restart_count: 0,
1991 first_crash_at: None,
1992 pending_version: None,
1993 },
1994 );
1995
1996 let result = sup.stop_node(1).await;
1997 assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1998 }
1999
2000 #[tokio::test]
2001 async fn stop_all_nodes_mixed_states() {
2002 let (tx, _rx) = broadcast::channel(16);
2003 let mut sup = Supervisor::new(tx);
2004
2005 sup.node_states.insert(
2007 1,
2008 NodeRuntime {
2009 status: NodeStatus::Running,
2010 pid: Some(999999),
2011 started_at: Some(Instant::now()),
2012 restart_count: 0,
2013 first_crash_at: None,
2014 pending_version: None,
2015 },
2016 );
2017 sup.node_states.insert(
2019 2,
2020 NodeRuntime {
2021 status: NodeStatus::Stopped,
2022 pid: None,
2023 started_at: None,
2024 restart_count: 0,
2025 first_crash_at: None,
2026 pending_version: None,
2027 },
2028 );
2029
2030 let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
2031
2032 let result = sup.stop_all_nodes(&configs).await;
2033
2034 assert_eq!(result.stopped.len(), 1);
2035 assert_eq!(result.stopped[0].node_id, 1);
2036 assert_eq!(result.stopped[0].service_name, "node1");
2037 assert_eq!(result.already_stopped, vec![2]);
2038 assert!(result.failed.is_empty());
2039 }
2040}