Skip to main content

ant_core/node/daemon/
supervisor.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio::time::MissedTickBehavior;
8use tokio_util::sync::CancellationToken;
9
10use crate::error::{Error, Result};
11use crate::node::binary::extract_version;
12use crate::node::daemon::disk;
13use crate::node::daemon::health::{DiskThresholds, FleetHealth};
14use crate::node::events::NodeEvent;
15use crate::node::process::spawn::spawn_node;
16use crate::node::registry::NodeRegistry;
17use crate::node::types::{
18    EvictionRecord, NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped,
19    StopNodeResult,
20};
21
22/// How often the upgrade-detection task polls each running node's binary for a version change.
23pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
24
25/// How often the low-disk monitor checks free space at node data directories and evicts a node if a
26/// partition has fallen to its eviction threshold.
27pub const EVICTION_POLL_INTERVAL: Duration = Duration::from_secs(30);
28
29/// Safety bound on how many nodes the monitor will evict within a single check, so a misconfigured
30/// threshold or a measurement glitch can never wipe a whole fleet in one tick.
31const MAX_EVICTIONS_PER_CYCLE: usize = 4;
32
33/// How often the liveness poll verifies that each Running node's OS process still exists.
34///
35/// Nodes the current daemon spawned are watched via their owned `Child` handle in
36/// `monitor_node`, so this poll exists purely to catch exits of nodes adopted across
37/// a daemon restart (whose `Child` handle died with the previous daemon). Five seconds
38/// is a rough trade-off: long enough that the syscall cost is negligible, short enough
39/// that a crashed adopted node still looks broken to the user within a few heartbeats.
40pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
41
42/// Path of the pid file a running node writes to so a future daemon instance can
43/// adopt it across restarts. Lives alongside the node's other on-disk state.
44fn node_pid_file(data_dir: &Path) -> PathBuf {
45    data_dir.join("node.pid")
46}
47
48/// Persist the running node's PID to `<data_dir>/node.pid`. Best-effort: a failure
49/// here only costs us the ability to adopt the node after a daemon restart, so we
50/// warn and continue rather than aborting the start.
51fn write_node_pid(data_dir: &Path, pid: u32) {
52    let path = node_pid_file(data_dir);
53    if let Err(e) = std::fs::write(&path, pid.to_string()) {
54        tracing::warn!(
55            "Failed to write node pid file at {}: {e}. Node will still run, but a future \
56             daemon restart will not be able to adopt it.",
57            path.display()
58        );
59    }
60}
61
62/// Remove the pid file. Called on every terminal-exit path in `monitor_node` so the
63/// next daemon doesn't try to adopt a PID belonging to a process that's gone.
64fn remove_node_pid(data_dir: &Path) {
65    let _ = std::fs::remove_file(node_pid_file(data_dir));
66}
67
68/// Read the pid file without validating liveness. Returns `None` if the file is
69/// missing or its contents can't be parsed as a u32.
70fn read_node_pid(data_dir: &Path) -> Option<u32> {
71    std::fs::read_to_string(node_pid_file(data_dir))
72        .ok()
73        .and_then(|s| s.trim().parse().ok())
74}
75
76/// Scan the OS process table for a running node that matches `config`, as a
77/// fallback for when `<data_dir>/node.pid` is missing or stale.
78///
79/// Nodes spawned by a pre-adoption daemon never had a pid file written, so
80/// without this scan the first restart after installing the adoption fix
81/// would leave every previously-running node classified as Stopped. The scan
82/// matches on:
83///
84/// - executable path identical to `config.binary_path`, AND
85/// - command line containing `--root-dir` (as a standalone arg or
86///   `--root-dir=<path>`) whose value resolves to `config.data_dir`.
87///
88/// The double match keeps us safe when multiple nodes share the same binary
89/// on disk (common on installs where one copy services several data dirs).
90///
91/// Returns `None` if no running process matches.
92fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
93    let target_data_dir = config.data_dir.as_path();
94    for (pid, process) in sys.processes() {
95        // On Linux, `sys.processes()` enumerates /proc/<pid>/task/<tid> too, so
96        // worker threads appear alongside their thread-group leader and share
97        // the same exe + cmdline. Skip threads — we want the TGID (the real
98        // process), which is the only PID safe to signal.
99        if process.thread_kind().is_some() {
100            continue;
101        }
102        let Some(exe) = process.exe() else {
103            continue;
104        };
105        if exe != config.binary_path.as_path() {
106            continue;
107        }
108
109        let cmd = process.cmd();
110        let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
111            let arg = arg.to_string_lossy();
112            if let Some(value) = arg.strip_prefix("--root-dir=") {
113                Path::new(value) == target_data_dir
114            } else if arg == "--root-dir" {
115                cmd.get(i + 1)
116                    .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
117                    .unwrap_or(false)
118            } else {
119                false
120            }
121        });
122
123        if matches_root_dir {
124            return Some(pid.as_u32());
125        }
126    }
127    None
128}
129
130/// Check whether `pid` refers to a live, non-thread process. On Linux,
131/// `kill(tid, 0)` returns success for any thread's TID, not just the
132/// thread-group leader — so liveness alone is not enough to trust a PID
133/// loaded from the pid file. Consulting sysinfo's `thread_kind()` tells us
134/// whether the entry is a userland thread (TID) vs. the actual process
135/// (TGID). A missing sysinfo entry with a live PID is still treated as a
136/// process, since older daemons could have written the PID before sysinfo
137/// saw it.
138fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
139    if !is_process_alive(pid) {
140        return false;
141    }
142    match sys.process(sysinfo::Pid::from_u32(pid)) {
143        Some(process) => process.thread_kind().is_none(),
144        None => true,
145    }
146}
147
148/// Determine the PID to adopt for a node, trying the pid file first and
149/// falling back to a process-table scan. On successful scan, writes the pid
150/// file so the next adoption takes the fast path.
151///
152/// Returns `None` if no live process can be attributed to this node.
153fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
154    if let Some(pid) = read_node_pid(&config.data_dir) {
155        if pid_is_live_process(pid, sys) {
156            return Some(pid);
157        }
158        // Pid file points at a dead process or a thread TID (legacy daemons
159        // could record a TID because the fallback scan saw threads). Don't
160        // leave it around to mislead the next adoption pass.
161        remove_node_pid(&config.data_dir);
162    }
163
164    let pid = find_running_node_process(sys, config)?;
165    write_node_pid(&config.data_dir, pid);
166    Some(pid)
167}
168
169/// Build an `Instant` that reports the real process start time when
170/// `.elapsed()` is called on it — so uptime survives daemon restarts
171/// accurately for adopted nodes.
172///
173/// `sysinfo::Process::start_time()` returns seconds since the UNIX epoch
174/// (wall clock). `Instant` is monotonic and can't be constructed from a
175/// wall-clock value directly, so we back-date `Instant::now()` by the
176/// process's age. Returns `None` if the PID isn't in the snapshot (the
177/// process exited between scan and this call), if the system clock looks
178/// broken, or if subtraction would overflow (unrealistically-old process
179/// start times).
180fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
181    let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
182    let now_secs = std::time::SystemTime::now()
183        .duration_since(std::time::UNIX_EPOCH)
184        .ok()?
185        .as_secs();
186    let age = now_secs.saturating_sub(start_secs);
187    Instant::now().checked_sub(Duration::from_secs(age))
188}
189
190/// Maximum restart attempts before marking a node as errored.
191const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
192
193/// Window in which crashes are counted. If this many crashes happen within
194/// this duration, the node is marked errored.
195const CRASH_WINDOW: Duration = Duration::from_secs(300); // 5 minutes
196
197/// If a node runs for this long without crashing, reset the crash counter.
198const STABLE_DURATION: Duration = Duration::from_secs(300); // 5 minutes
199
200/// Maximum backoff delay between restarts.
201const MAX_BACKOFF: Duration = Duration::from_secs(60);
202
203/// Manages running node processes. Holds child process handles and runtime state.
204pub struct Supervisor {
205    event_tx: broadcast::Sender<NodeEvent>,
206    /// Runtime status of each node, keyed by node ID.
207    node_states: HashMap<u32, NodeRuntime>,
208    /// Nodes adopted from a previous daemon instance, which have no owning `monitor_node`
209    /// task (their `Child` handle died with the previous daemon). Exit detection and, on
210    /// auto-upgrade, respawn for these nodes happen in the liveness monitor instead. A node
211    /// leaves this set once this daemon (re)spawns it and owns a `monitor_node` for it.
212    adopted: HashSet<u32>,
213    /// Nodes currently being evicted (stop + data-dir delete in progress). Checked and set under
214    /// the supervisor write lock, so it atomically excludes a concurrent `start_node` for the whole
215    /// stop/delete window — before which the persisted eviction marker is not yet visible.
216    evicting: HashSet<u32>,
217}
218
219struct NodeRuntime {
220    status: NodeStatus,
221    pid: Option<u32>,
222    started_at: Option<Instant>,
223    restart_count: u32,
224    first_crash_at: Option<Instant>,
225    /// When `status == UpgradeScheduled`, the target version the on-disk binary now reports.
226    pending_version: Option<String>,
227}
228
229impl Supervisor {
230    pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
231        Self {
232            event_tx,
233            node_states: HashMap::new(),
234            adopted: HashSet::new(),
235            evicting: HashSet::new(),
236        }
237    }
238
239    /// Whether `node_id` was adopted from a previous daemon instance and is therefore not
240    /// backed by an owning `monitor_node` task in this daemon.
241    pub fn is_adopted(&self, node_id: u32) -> bool {
242        self.adopted.contains(&node_id)
243    }
244
245    /// Mark a node as under eviction so `start_node` refuses it for the whole stop/delete window.
246    /// Both this and the `start_node` check run under the supervisor write lock, so a concurrent
247    /// start can never slip in and spawn the node while its data directory is being deleted.
248    fn begin_evicting(&mut self, node_id: u32) {
249        self.evicting.insert(node_id);
250    }
251
252    /// Clear the eviction-in-progress flag (the persisted `eviction` marker keeps the node
253    /// unstartable afterwards).
254    fn finish_evicting(&mut self, node_id: u32) {
255        self.evicting.remove(&node_id);
256    }
257
258    /// Mark a node as owned by this daemon (i.e. it now has a `monitor_node` task). Clears
259    /// any adopted flag so the liveness monitor leaves its exit handling to `monitor_node`.
260    fn mark_owned(&mut self, node_id: u32) {
261        self.adopted.remove(&node_id);
262    }
263
264    /// Start a node by spawning the actual process.
265    ///
266    /// Returns `NodeStarted` on success. Spawns a background monitoring task
267    /// that watches the child process and handles restart logic.
268    pub async fn start_node(
269        &mut self,
270        config: &NodeConfig,
271        supervisor_ref: Arc<RwLock<Supervisor>>,
272        registry_ref: Arc<RwLock<NodeRegistry>>,
273    ) -> Result<NodeStarted> {
274        let node_id = config.id;
275
276        // An evicted node's data directory has been deleted; it must not be restarted. Recovery is
277        // to dismiss it (remove from the registry) and add a fresh node. The persisted marker covers
278        // the settled case; the in-progress `evicting` set (set under this same lock) covers the
279        // window where the delete is still running and the marker's config may not be visible yet.
280        if config.eviction.is_some() || self.evicting.contains(&node_id) {
281            return Err(Error::NodeEvicted(node_id));
282        }
283
284        if let Some(state) = self.node_states.get(&node_id) {
285            if state.status == NodeStatus::Running {
286                return Err(Error::NodeAlreadyRunning(node_id));
287            }
288        }
289
290        let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
291
292        let mut child = spawn_node_from_config(config).await?;
293        let pid = child
294            .id()
295            .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
296
297        // Brief health check: give the process a moment to start, then check if it
298        // exited immediately. This catches errors like invalid CLI arguments or missing
299        // shared libraries. We use timeout + wait() rather than try_wait() because
300        // tokio's child reaper requires the wait future to be polled.
301        match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
302            Ok(Ok(exit_status)) => {
303                // Process already exited — read stderr for details.
304                // spawn_node always redirects stderr to a file in the log dir
305                // (falling back to data_dir when no log dir is configured).
306                let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
307                let stderr_path = spawn_log_dir.join("stderr.log");
308                let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
309                let detail = if stderr_msg.trim().is_empty() {
310                    format!("exit code: {exit_status}")
311                } else {
312                    stderr_msg.trim().to_string()
313                };
314                self.node_states.insert(
315                    node_id,
316                    NodeRuntime {
317                        status: NodeStatus::Errored,
318                        pid: None,
319                        started_at: None,
320                        restart_count: 0,
321                        first_crash_at: None,
322                        pending_version: None,
323                    },
324                );
325                return Err(Error::ProcessSpawn(format!(
326                    "Node {node_id} exited immediately: {detail}"
327                )));
328            }
329            Ok(Err(e)) => {
330                return Err(Error::ProcessSpawn(format!(
331                    "Failed to check node process status: {e}"
332                )));
333            }
334            Err(_) => {} // Timeout — process is still running after 1s, good
335        }
336
337        self.node_states.insert(
338            node_id,
339            NodeRuntime {
340                status: NodeStatus::Running,
341                pid: Some(pid),
342                started_at: Some(Instant::now()),
343                restart_count: 0,
344                first_crash_at: None,
345                pending_version: None,
346            },
347        );
348        // This daemon now owns the process and spawns a `monitor_node` for it below, so it is
349        // no longer (or never was) an adopted node the liveness monitor must respawn.
350        self.mark_owned(node_id);
351
352        let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
353
354        let result = NodeStarted {
355            node_id,
356            service_name: config.service_name.clone(),
357            pid,
358        };
359
360        // Spawn monitoring task
361        let event_tx = self.event_tx.clone();
362        let config = config.clone();
363        tokio::spawn(async move {
364            monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
365        });
366
367        Ok(result)
368    }
369
370    /// Stop a node by gracefully terminating its process.
371    ///
372    /// Sends SIGTERM (Unix) or kills (Windows), waits up to 10 seconds for exit,
373    /// then sends SIGKILL if needed. The monitor task detects the Stopping status
374    /// and exits cleanly without attempting a restart.
375    pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
376        let state = self
377            .node_states
378            .get_mut(&node_id)
379            .ok_or(Error::NodeNotFound(node_id))?;
380
381        if state.status != NodeStatus::Running {
382            return Err(Error::NodeNotRunning(node_id));
383        }
384
385        let pid = state.pid;
386
387        let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
388        state.status = NodeStatus::Stopping;
389
390        if let Some(pid) = pid {
391            graceful_kill(pid).await;
392        }
393
394        // Update state after kill
395        let state = self.node_states.get_mut(&node_id).unwrap();
396        state.status = NodeStatus::Stopped;
397        state.pid = None;
398        state.started_at = None;
399
400        let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
401
402        Ok(())
403    }
404
405    /// Stop all running nodes, returning an aggregate result.
406    pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
407        let mut stopped = Vec::new();
408        let mut failed = Vec::new();
409        let mut already_stopped = Vec::new();
410
411        for (node_id, service_name) in configs {
412            let node_id = *node_id;
413            match self.node_status(node_id) {
414                Ok(NodeStatus::Running) => {}
415                Ok(_) => {
416                    already_stopped.push(node_id);
417                    continue;
418                }
419                Err(_) => {
420                    already_stopped.push(node_id);
421                    continue;
422                }
423            }
424
425            match self.stop_node(node_id).await {
426                Ok(()) => {
427                    stopped.push(NodeStopped {
428                        node_id,
429                        service_name: service_name.clone(),
430                    });
431                }
432                Err(Error::NodeNotRunning(_)) => {
433                    already_stopped.push(node_id);
434                }
435                Err(e) => {
436                    failed.push(NodeStopFailed {
437                        node_id,
438                        service_name: service_name.clone(),
439                        error: e.to_string(),
440                    });
441                }
442            }
443        }
444
445        StopNodeResult {
446            stopped,
447            failed,
448            already_stopped,
449        }
450    }
451
452    /// Get the status of a node.
453    pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
454        self.node_states
455            .get(&node_id)
456            .map(|s| s.status)
457            .ok_or(Error::NodeNotFound(node_id))
458    }
459
460    /// Get the PID of a running node.
461    pub fn node_pid(&self, node_id: u32) -> Option<u32> {
462        self.node_states.get(&node_id).and_then(|s| s.pid)
463    }
464
465    /// Get the uptime of a running node in seconds.
466    pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
467        self.node_states
468            .get(&node_id)
469            .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
470    }
471
472    /// The target version when the node is in `UpgradeScheduled` state, otherwise `None`.
473    pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
474        self.node_states
475            .get(&node_id)
476            .and_then(|s| s.pending_version.clone())
477    }
478
479    /// Transition a Running node into `UpgradeScheduled` with the target version.
480    ///
481    /// Only affects nodes currently in `Running`: any other state is left alone (a stopped
482    /// node legitimately has an out-of-date binary; a node already in UpgradeScheduled has
483    /// already been marked). Returns `true` if the transition happened.
484    fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
485        let Some(state) = self.node_states.get_mut(&node_id) else {
486            return false;
487        };
488        if state.status != NodeStatus::Running {
489            return false;
490        }
491        state.status = NodeStatus::UpgradeScheduled;
492        state.pending_version = Some(pending_version.clone());
493        let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
494            node_id,
495            pending_version,
496        });
497        true
498    }
499
500    /// Check whether a node is running.
501    pub fn is_running(&self, node_id: u32) -> bool {
502        self.node_states
503            .get(&node_id)
504            .is_some_and(|s| s.status == NodeStatus::Running)
505    }
506
507    /// Get counts of nodes in each state: (running, stopped, errored).
508    pub fn node_counts(&self) -> (u32, u32, u32) {
509        let mut running = 0u32;
510        let mut stopped = 0u32;
511        let mut errored = 0u32;
512        for state in self.node_states.values() {
513            match state.status {
514                // UpgradeScheduled means the process is still running; count it with running.
515                NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
516                    running += 1
517                }
518                // An evicted node is not running; count it alongside stopped for these totals.
519                NodeStatus::Stopped | NodeStatus::Stopping | NodeStatus::Evicted => stopped += 1,
520                NodeStatus::Errored => errored += 1,
521            }
522        }
523        (running, stopped, errored)
524    }
525
526    /// Update the runtime state for a node (used by the monitor task).
527    fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
528        if let Some(state) = self.node_states.get_mut(&node_id) {
529            state.status = status;
530            state.pid = pid;
531            if status == NodeStatus::Running {
532                state.started_at = Some(Instant::now());
533            } else {
534                // Clear uptime tracking for non-running states so status
535                // responses don't report a stale `uptime_secs` after the node
536                // exits (e.g. liveness monitor detecting an external kill).
537                state.started_at = None;
538            }
539        }
540    }
541
542    /// Restore running-node state from a previous daemon instance.
543    ///
544    /// For each registered node, determines the PID to adopt via
545    /// `resolve_adopted_pid`: try `<data_dir>/node.pid` first, and if it's
546    /// missing or stale, fall back to a process-table scan matching the
547    /// node's binary path and `--root-dir` argument. Live matches are
548    /// inserted into `node_states` as `Running`.
549    ///
550    /// The scan is what covers the upgrade path: nodes spawned by a
551    /// pre-adoption daemon never had a pid file written, so without the
552    /// fallback the first restart after installing this fix would still
553    /// leave every previously-running node classified as Stopped.
554    ///
555    /// Must be called before the HTTP server starts accepting requests —
556    /// the window between `Supervisor::new` and adoption is where the API
557    /// would otherwise report live nodes as Stopped. Adopted nodes have no
558    /// associated `monitor_node` task (the `tokio::process::Child` handle
559    /// belonged to the previous daemon, and `tokio::process::Child::wait`
560    /// only works for the process's actual parent). Their exits are
561    /// detected instead by the `spawn_liveness_monitor` polling task.
562    ///
563    /// Returns the list of node IDs that were adopted.
564    pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
565        // Populated upfront so every adopted node gets its real start time via
566        // `process_started_at`, not just those that went through the scan
567        // fallback. The extra ~50 ms at daemon startup is a one-time cost
568        // that's cheaper than users seeing uptime reset every time the daemon
569        // restarts.
570        let mut sys = sysinfo::System::new();
571        sys.refresh_processes_specifics(
572            sysinfo::ProcessesToUpdate::All,
573            true,
574            sysinfo::ProcessRefreshKind::everything(),
575        );
576
577        let mut adopted = Vec::new();
578        for config in registry.list() {
579            let Some(pid) = resolve_adopted_pid(config, &sys) else {
580                continue;
581            };
582            self.node_states.insert(
583                config.id,
584                NodeRuntime {
585                    status: NodeStatus::Running,
586                    pid: Some(pid),
587                    // Back-date to the real process start time so uptime
588                    // reported to the API is wall-clock accurate across
589                    // daemon restarts. Falls back to `Instant::now()` only
590                    // if sysinfo can't report the start time (PID raced out
591                    // of the snapshot, or a broken clock) — better to show
592                    // uptime counting from adoption than to claim the node
593                    // is Stopped.
594                    started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
595                    restart_count: 0,
596                    first_crash_at: None,
597                    pending_version: None,
598                },
599            );
600            // No owning `monitor_node` exists for an adopted process (its `Child` died with the
601            // previous daemon), so flag it for the liveness monitor to handle its exit/respawn.
602            self.adopted.insert(config.id);
603            let _ = self.event_tx.send(NodeEvent::NodeStarted {
604                node_id: config.id,
605                pid,
606            });
607            adopted.push(config.id);
608        }
609        adopted
610    }
611
612    /// Record a crash and determine if the node should be restarted or marked errored.
613    /// Returns (should_restart, attempt_number, backoff_duration).
614    fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
615        let state = match self.node_states.get_mut(&node_id) {
616            Some(s) => s,
617            None => return (false, 0, Duration::ZERO),
618        };
619
620        let now = Instant::now();
621
622        // Check if we were stable long enough to reset crash counter
623        if let Some(started_at) = state.started_at {
624            if started_at.elapsed() >= STABLE_DURATION {
625                state.restart_count = 0;
626                state.first_crash_at = None;
627            }
628        }
629
630        state.restart_count += 1;
631        let attempt = state.restart_count;
632
633        if state.first_crash_at.is_none() {
634            state.first_crash_at = Some(now);
635        }
636
637        // Check if too many crashes in the window
638        if let Some(first_crash) = state.first_crash_at {
639            if attempt >= MAX_CRASHES_BEFORE_ERRORED
640                && now.duration_since(first_crash) < CRASH_WINDOW
641            {
642                state.status = NodeStatus::Errored;
643                state.pid = None;
644                state.started_at = None;
645                return (false, attempt, Duration::ZERO);
646            }
647        }
648
649        // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s cap
650        let backoff_secs = 1u64 << (attempt - 1).min(5);
651        let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
652
653        (true, attempt, backoff)
654    }
655}
656
657/// Periodically probe each Running node's on-disk binary for a version change.
658///
659/// When a node's binary-on-disk reports a different version than was recorded in the registry
660/// at `ant node add` time, ant-node has replaced the binary in place as part of its auto-upgrade
661/// flow and will restart the process shortly. We flip the node to `UpgradeScheduled` with the
662/// target version, which lets `ant node status` render the in-between state and lets
663/// `monitor_node` reclassify the upcoming clean exit as an expected restart rather than a crash.
664///
665/// The task exits when `shutdown` is cancelled.
666pub fn spawn_upgrade_monitor(
667    registry: Arc<RwLock<NodeRegistry>>,
668    supervisor: Arc<RwLock<Supervisor>>,
669    interval: Duration,
670    shutdown: CancellationToken,
671) {
672    tokio::spawn(async move {
673        let mut ticker = tokio::time::interval(interval);
674        // After a Windows sleep/hibernate the default `Burst` catch-up would fire one
675        // tick per missed interval back-to-back, producing a flood of `extract_version`
676        // subprocess spawns. `Skip` resumes on the next aligned tick instead.
677        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
678        // Skip the immediate first tick — we don't want to probe while nodes are still in the
679        // Starting -> Running transition.
680        ticker.tick().await;
681
682        loop {
683            tokio::select! {
684                _ = shutdown.cancelled() => return,
685                _ = ticker.tick() => {},
686            }
687
688            // Collect a snapshot of (node_id, binary_path, recorded_version, current_pending)
689            // to release the locks before running --version subprocesses (which take time).
690            let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
691                let reg = registry.read().await;
692                let sup = supervisor.read().await;
693                reg.list()
694                    .into_iter()
695                    .filter_map(|config| match sup.node_status(config.id) {
696                        Ok(NodeStatus::Running) => Some((
697                            config.id,
698                            config.binary_path.clone(),
699                            config.version.clone(),
700                            sup.node_pending_version(config.id),
701                        )),
702                        _ => None,
703                    })
704                    .collect()
705            };
706
707            for (node_id, binary_path, recorded_version, current_pending) in candidates {
708                let observed = match extract_version(&binary_path).await {
709                    Ok(v) => v,
710                    // Transient failures (e.g. binary mid-replacement) — skip this round.
711                    Err(_) => continue,
712                };
713                if observed == recorded_version {
714                    continue;
715                }
716                if current_pending.as_deref() == Some(observed.as_str()) {
717                    continue;
718                }
719                supervisor
720                    .write()
721                    .await
722                    .mark_upgrade_scheduled(node_id, observed);
723            }
724        }
725    });
726}
727
728/// Background task: monitor free disk space at each node's data directory and, when a partition
729/// falls to its eviction threshold, automatically evict a node to reclaim space.
730///
731/// Each tick it (1) measures every running node's data directory grouped by partition, (2) refreshes
732/// the shared [`FleetHealth`] snapshot so the CLI/GUI can show how close the fleet is to an
733/// eviction, and (3) evicts the selected candidate on any partition that is at/below the threshold
734/// *and* still has at least two nodes (so a node remains to benefit). Eviction stops the process,
735/// deletes the data directory, records a persisted [`EvictionRecord`], and emits an event. It
736/// re-measures and may evict again — bounded by `MAX_EVICTIONS_PER_CYCLE` — because a single
737/// eviction may not free enough on a heavily over-provisioned partition.
738///
739/// The task exits when `shutdown` is cancelled.
740pub fn spawn_eviction_monitor(
741    registry: Arc<RwLock<NodeRegistry>>,
742    supervisor: Arc<RwLock<Supervisor>>,
743    event_tx: broadcast::Sender<NodeEvent>,
744    health: Arc<RwLock<FleetHealth>>,
745    thresholds: DiskThresholds,
746    interval: Duration,
747    shutdown: CancellationToken,
748) {
749    tokio::spawn(async move {
750        let mut ticker = tokio::time::interval(interval);
751        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
752        // Skip the immediate first tick so we don't evict while nodes are still starting up.
753        ticker.tick().await;
754
755        loop {
756            tokio::select! {
757                _ = shutdown.cancelled() => return,
758                _ = ticker.tick() => {},
759            }
760
761            run_eviction_cycle(&registry, &supervisor, &event_tx, &health, &thresholds).await;
762        }
763    });
764}
765
766/// Run one disk-pressure check: evict as needed (bounded), then refresh the health snapshot.
767async fn run_eviction_cycle(
768    registry: &Arc<RwLock<NodeRegistry>>,
769    supervisor: &Arc<RwLock<Supervisor>>,
770    event_tx: &broadcast::Sender<NodeEvent>,
771    health: &Arc<RwLock<FleetHealth>>,
772    thresholds: &DiskThresholds,
773) {
774    for _ in 0..MAX_EVICTIONS_PER_CYCLE {
775        let partitions = disk::partition_states(running_nodes(registry, supervisor).await);
776
777        // A partition needs an eviction when it is at/below the threshold and has a spare node to
778        // sacrifice (≥2 nodes, so one remains). The sole-node case is deliberately left for the
779        // health layer to surface as Critical rather than auto-evicting the only node.
780        let target = partitions
781            .iter()
782            .find(|p| p.available_bytes <= thresholds.eviction_bytes && p.nodes.len() >= 2);
783
784        let Some(partition) = target else {
785            // Nothing more to evict: publish the current health and finish this cycle.
786            publish_health(
787                health,
788                event_tx,
789                FleetHealth::from_partitions(&partitions, thresholds),
790            )
791            .await;
792            return;
793        };
794
795        let Some(candidate) = partition.eviction_candidate().cloned() else {
796            break;
797        };
798
799        evict_node(
800            registry,
801            supervisor,
802            event_tx,
803            &candidate,
804            partition.available_bytes,
805        )
806        .await;
807    }
808
809    // Reached the per-cycle eviction cap (or hit a candidate-less partition): refresh health so the
810    // snapshot reflects reality before the next tick.
811    let partitions = disk::partition_states(running_nodes(registry, supervisor).await);
812    publish_health(
813        health,
814        event_tx,
815        FleetHealth::from_partitions(&partitions, thresholds),
816    )
817    .await;
818}
819
820/// Snapshot of currently-running, non-evicted nodes as `(id, data_dir)` pairs.
821async fn running_nodes(
822    registry: &Arc<RwLock<NodeRegistry>>,
823    supervisor: &Arc<RwLock<Supervisor>>,
824) -> Vec<(u32, PathBuf)> {
825    let reg = registry.read().await;
826    let sup = supervisor.read().await;
827    reg.list()
828        .into_iter()
829        .filter(|config| config.eviction.is_none())
830        .filter(|config| matches!(sup.node_status(config.id), Ok(NodeStatus::Running)))
831        .map(|config| (config.id, config.data_dir.clone()))
832        .collect()
833}
834
835/// Delete a directory tree, retrying briefly to tolerate transient locks.
836///
837/// A node we just killed can hold its data files open for a short moment after exit — on Windows
838/// especially (its LMDB memory map and its own copied `ant-node` binary), and antivirus/indexers can
839/// grab transient handles — so `remove_dir_all` fails with "access denied / in use" until the OS
840/// releases them. A bounded exponential backoff gives it time; on Unix the first attempt almost
841/// always succeeds. Returns `Ok` on success or if the directory is already gone.
842async fn remove_dir_all_with_retry(path: &Path) -> std::io::Result<()> {
843    const MAX_ATTEMPTS: u32 = 8;
844    let mut delay = Duration::from_millis(100);
845    for attempt in 1..=MAX_ATTEMPTS {
846        match std::fs::remove_dir_all(path) {
847            Ok(()) => return Ok(()),
848            // Already gone — nothing left to reclaim; treat as success.
849            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
850            Err(e) if attempt == MAX_ATTEMPTS => return Err(e),
851            Err(_) => {
852                tokio::time::sleep(delay).await;
853                delay = (delay * 2).min(Duration::from_secs(1));
854            }
855        }
856    }
857    Ok(())
858}
859
860/// Write (or overwrite) a node's persisted eviction marker and save the registry.
861async fn persist_eviction_marker(
862    registry: &Arc<RwLock<NodeRegistry>>,
863    node_id: u32,
864    reason: &str,
865    evicted_at: u64,
866    reclaimed_bytes: u64,
867) {
868    let mut reg = registry.write().await;
869    if let Ok(config) = reg.get_mut(node_id) {
870        config.eviction = Some(EvictionRecord {
871            reason: reason.to_string(),
872            evicted_at,
873            reclaimed_bytes,
874        });
875    }
876    if let Err(e) = reg.save() {
877        tracing::error!("Eviction: failed to persist registry for node {node_id}: {e}");
878    }
879}
880
881/// Evict a single node: make it unstartable, stop it, delete its data directory, finalise the
882/// persisted marker, and emit an event.
883///
884/// Ordering matters for safety. The node is made unstartable *before* the stop/delete window — via
885/// the in-memory `evicting` flag (set under the supervisor lock, so it atomically excludes a
886/// concurrent `start_node`) and a persisted eviction marker (which also survives a daemon crash
887/// mid-delete). Only then do we stop and delete. Rollback is deliberately not attempted: once the
888/// process is stopped the node stays `Evicted` (terminal) whether or not the delete succeeds.
889async fn evict_node(
890    registry: &Arc<RwLock<NodeRegistry>>,
891    supervisor: &Arc<RwLock<Supervisor>>,
892    event_tx: &broadcast::Sender<NodeEvent>,
893    candidate: &disk::NodeDiskUsage,
894    available_before: u64,
895) {
896    let node_id = candidate.node_id;
897    let reclaimable = candidate.size_bytes;
898    let evicted_at = now_unix_secs();
899
900    // 1. Make the node unstartable up front (before we touch the process or its files).
901    supervisor.write().await.begin_evicting(node_id);
902    let pending_reason = format!(
903        "Automatically evicted to reclaim disk space: only {} free on its partition. \
904         Deleting its data directory to recover ~{}.",
905        fmt_bytes(available_before),
906        fmt_bytes(reclaimable),
907    );
908    persist_eviction_marker(registry, node_id, &pending_reason, evicted_at, reclaimable).await;
909
910    // 2. Stop the process (still marked Running, so this actually kills it). The monitor_node task
911    //    sees the Stopping/Stopped transition and will not respawn it.
912    if let Err(e) = supervisor.write().await.stop_node(node_id).await {
913        tracing::warn!("Eviction: failed to stop node {node_id} before deletion: {e}");
914    }
915
916    // 3. Delete the data directory — this reclaims the disk space. A just-killed node can briefly
917    //    hold its files open (LMDB memory map, its own copied binary); on Windows `remove_dir_all`
918    //    then fails until the OS releases the handles, so retry with backoff.
919    let deleted = match remove_dir_all_with_retry(&candidate.data_dir).await {
920        Ok(()) => true,
921        Err(e) => {
922            tracing::error!(
923                "Eviction: could not delete data dir {} for node {node_id} after retries: {e}. \
924                 Disk space was NOT reclaimed; manual cleanup may be required.",
925                candidate.data_dir.display()
926            );
927            false
928        }
929    };
930
931    // 4. Finalise the marker to reflect what actually happened, flip runtime state to Evicted, and
932    //    clear the in-progress flag (the persisted marker keeps the node unstartable from here).
933    let (reclaimed_bytes, reason) = if deleted {
934        (
935            reclaimable,
936            format!(
937                "Automatically evicted to reclaim disk space: only {} free on its partition. \
938                 Its data directory was deleted, recovering ~{}.",
939                fmt_bytes(available_before),
940                fmt_bytes(reclaimable),
941            ),
942        )
943    } else {
944        (
945            0,
946            format!(
947                "Automatically evicted due to low disk space (only {} free on its partition), but \
948                 its data directory could not be deleted, so space was not reclaimed. Manual \
949                 cleanup of {} may be needed.",
950                fmt_bytes(available_before),
951                candidate.data_dir.display(),
952            ),
953        )
954    };
955    persist_eviction_marker(registry, node_id, &reason, evicted_at, reclaimed_bytes).await;
956    {
957        let mut sup = supervisor.write().await;
958        sup.update_state(node_id, NodeStatus::Evicted, None);
959        sup.finish_evicting(node_id);
960    }
961
962    tracing::info!(
963        "Evicted node {node_id}, reclaimed ~{} ({reason})",
964        fmt_bytes(reclaimed_bytes)
965    );
966    let _ = event_tx.send(NodeEvent::NodeEvicted {
967        node_id,
968        reason,
969        reclaimed_bytes,
970    });
971}
972
973/// Store the new health snapshot, emitting a `FleetHealthChanged` event if the overall level moved.
974async fn publish_health(
975    health: &Arc<RwLock<FleetHealth>>,
976    event_tx: &broadcast::Sender<NodeEvent>,
977    next: FleetHealth,
978) {
979    let changed = {
980        let mut current = health.write().await;
981        let changed = current.overall != next.overall;
982        *current = next.clone();
983        changed
984    };
985    if changed {
986        let _ = event_tx.send(NodeEvent::FleetHealthChanged {
987            overall: serde_json::to_value(next.overall)
988                .ok()
989                .and_then(|v| v.as_str().map(str::to_owned))
990                .unwrap_or_default(),
991        });
992    }
993}
994
995/// Current Unix time in whole seconds. Falls back to 0 if the clock is before the epoch.
996fn now_unix_secs() -> u64 {
997    std::time::SystemTime::now()
998        .duration_since(std::time::UNIX_EPOCH)
999        .map(|d| d.as_secs())
1000        .unwrap_or(0)
1001}
1002
1003/// Format a byte count as a human-friendly string (GiB/MiB), matching the health layer's style.
1004fn fmt_bytes(bytes: u64) -> String {
1005    const MIB: f64 = 1024.0 * 1024.0;
1006    const GIB: f64 = 1024.0 * MIB;
1007    let b = bytes as f64;
1008    if b >= GIB {
1009        format!("{:.2} GiB", b / GIB)
1010    } else {
1011        format!("{:.0} MiB", b / MIB)
1012    }
1013}
1014
1015/// Build CLI arguments for the node binary from a NodeConfig.
1016pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
1017    let mut args = vec![
1018        "--rewards-address".to_string(),
1019        config.rewards_address.clone(),
1020        "--root-dir".to_string(),
1021        config.data_dir.display().to_string(),
1022    ];
1023
1024    if let Some(ref log_dir) = config.log_dir {
1025        args.push("--enable-logging".to_string());
1026        args.push("--log-dir".to_string());
1027        args.push(log_dir.display().to_string());
1028    }
1029
1030    if let Some(port) = config.node_port {
1031        args.push("--port".to_string());
1032        args.push(port.to_string());
1033    }
1034
1035    for peer in &config.bootstrap_peers {
1036        args.push("--bootstrap".to_string());
1037        args.push(peer.clone());
1038    }
1039
1040    if let Some(channel) = config.upgrade_channel {
1041        args.push("--upgrade-channel".to_string());
1042        args.push(channel.to_string());
1043    }
1044
1045    // The daemon's supervisor is the service manager. Tell ant-node not to spawn its own
1046    // replacement on auto-upgrade; instead, exit cleanly and let us respawn. Without this,
1047    // ant-node's default spawn-grandchild-then-exit flow races for the node's port during
1048    // the parent's graceful shutdown and the grandchild fails to bind.
1049    args.push("--stop-on-upgrade".to_string());
1050
1051    // Always emit the EVM network so the node's payment network is explicit rather than relying on
1052    // the binary's built-in default.
1053    args.push("--evm-network".to_string());
1054    args.push(config.evm_network.as_arg().to_string());
1055
1056    args
1057}
1058
1059/// Spawn a node process from a NodeConfig.
1060///
1061/// Writes `<data_dir>/node.pid` on successful spawn so that a future daemon instance
1062/// can adopt the running process via `Supervisor::adopt_from_registry`. The file is
1063/// cleaned up by `monitor_node` on the node's terminal exit.
1064async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
1065    let args = build_node_args(config);
1066    let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
1067
1068    let log_dir = config
1069        .log_dir
1070        .as_deref()
1071        .unwrap_or(config.data_dir.as_path());
1072
1073    let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
1074    if let Some(pid) = child.id() {
1075        write_node_pid(&config.data_dir, pid);
1076    }
1077    Ok(child)
1078}
1079
1080/// Monitor a node process. On exit, handle restart logic. On permanent exit
1081/// (user stop, crash limit, errored), cleans up the pid file so a subsequent
1082/// daemon restart doesn't try to adopt a dead process.
1083async fn monitor_node(
1084    child: tokio::process::Child,
1085    mut config: NodeConfig,
1086    supervisor: Arc<RwLock<Supervisor>>,
1087    registry: Arc<RwLock<NodeRegistry>>,
1088    event_tx: broadcast::Sender<NodeEvent>,
1089) {
1090    monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
1091    remove_node_pid(&config.data_dir);
1092}
1093
1094async fn monitor_node_inner(
1095    mut child: tokio::process::Child,
1096    config: &mut NodeConfig,
1097    supervisor: Arc<RwLock<Supervisor>>,
1098    registry: Arc<RwLock<NodeRegistry>>,
1099    event_tx: broadcast::Sender<NodeEvent>,
1100) {
1101    let node_id = config.id;
1102
1103    loop {
1104        // Wait for the process to exit
1105        let exit_status = child.wait().await;
1106
1107        // Check whether this is a scheduled upgrade restart or an intentional stop.
1108        let status_at_exit = {
1109            let sup = supervisor.read().await;
1110            sup.node_status(node_id).ok()
1111        };
1112
1113        match status_at_exit {
1114            // Stopped/Stopping are intentional; Evicted means the daemon deleted the data dir to
1115            // reclaim space. In all three cases the node must not be respawned.
1116            Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) | Some(NodeStatus::Evicted) => {
1117                return
1118            }
1119            Some(NodeStatus::UpgradeScheduled) => {
1120                // ant-node cleanly exited after replacing its binary in place. Respawn
1121                // directly (no backoff, no crash counter) and refresh the recorded version.
1122                match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
1123                    Ok(new_child) => {
1124                        child = new_child;
1125                        continue;
1126                    }
1127                    Err(e) => {
1128                        let _ = event_tx.send(NodeEvent::NodeErrored {
1129                            node_id,
1130                            message: format!("Failed to respawn after upgrade: {e}"),
1131                        });
1132                        let mut sup = supervisor.write().await;
1133                        sup.update_state(node_id, NodeStatus::Errored, None);
1134                        return;
1135                    }
1136                }
1137            }
1138            _ => {}
1139        }
1140
1141        let exit_code = exit_status.ok().and_then(|s| s.code());
1142
1143        // A process-reported exit that wasn't user-initiated (Stopping was filtered above) is
1144        // either an auto-upgrade (exit 0 after ant-node replaced its binary) or a crash. In
1145        // neither case should the node be parked in `Stopped` — that state is reserved for
1146        // intentional user stops.
1147        //
1148        // Distinguish upgrade from crash by checking whether the on-disk binary's version
1149        // drifted from the registry. Between replacing its binary and actually exiting,
1150        // ant-node can hold the process open for anywhere from seconds to minutes, depending
1151        // on in-flight work and its own config. The periodic version poll will usually have
1152        // flipped the node to `UpgradeScheduled` well before the exit, but when the window is
1153        // short we cannot rely on that — hence this synchronous re-check here.
1154        if exit_code == Some(0) {
1155            if let Ok(disk_version) = extract_version(&config.binary_path).await {
1156                if disk_version != config.version {
1157                    {
1158                        let mut sup = supervisor.write().await;
1159                        sup.mark_upgrade_scheduled(node_id, disk_version.clone());
1160                    }
1161                    match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
1162                        Ok(new_child) => {
1163                            child = new_child;
1164                            continue;
1165                        }
1166                        Err(e) => {
1167                            let _ = event_tx.send(NodeEvent::NodeErrored {
1168                                node_id,
1169                                message: format!("Failed to respawn after upgrade: {e}"),
1170                            });
1171                            let mut sup = supervisor.write().await;
1172                            sup.update_state(node_id, NodeStatus::Errored, None);
1173                            return;
1174                        }
1175                    }
1176                }
1177            }
1178            // Exit 0 but the binary didn't change — fall through to the crash / restart path.
1179            // We report the crash with the exit code preserved; the crash counter guards
1180            // against infinite restart loops if the process keeps exiting immediately.
1181        }
1182
1183        // Crash (or clean exit that wasn't an upgrade)
1184        let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
1185
1186        let (should_restart, attempt, backoff) = {
1187            let mut sup = supervisor.write().await;
1188            sup.record_crash(node_id)
1189        };
1190
1191        if !should_restart {
1192            let _ = event_tx.send(NodeEvent::NodeErrored {
1193                node_id,
1194                message: format!(
1195                    "Node crashed {} times within {} seconds, giving up",
1196                    MAX_CRASHES_BEFORE_ERRORED,
1197                    CRASH_WINDOW.as_secs()
1198                ),
1199            });
1200            return;
1201        }
1202
1203        let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
1204
1205        tokio::time::sleep(backoff).await;
1206
1207        // Try to restart
1208        match spawn_node_from_config(&*config).await {
1209            Ok(new_child) => {
1210                let pid = match new_child.id() {
1211                    Some(pid) => pid,
1212                    None => {
1213                        // Process exited before we could read its PID
1214                        let _ = event_tx.send(NodeEvent::NodeErrored {
1215                            node_id,
1216                            message: "Restarted process exited before PID could be read"
1217                                .to_string(),
1218                        });
1219                        let mut sup = supervisor.write().await;
1220                        sup.update_state(node_id, NodeStatus::Errored, None);
1221                        return;
1222                    }
1223                };
1224                {
1225                    let mut sup = supervisor.write().await;
1226                    sup.update_state(node_id, NodeStatus::Running, Some(pid));
1227                }
1228                let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
1229                child = new_child;
1230            }
1231            Err(e) => {
1232                let _ = event_tx.send(NodeEvent::NodeErrored {
1233                    node_id,
1234                    message: format!("Failed to restart node: {e}"),
1235                });
1236                let mut sup = supervisor.write().await;
1237                sup.update_state(node_id, NodeStatus::Errored, None);
1238                return;
1239            }
1240        }
1241    }
1242}
1243
1244/// Respawn a node whose `UpgradeScheduled` status tells us the exit was expected.
1245///
1246/// On success: persists the new version to the registry, updates the in-memory config clone,
1247/// clears pending_version, sets status back to Running, and fires `NodeUpgraded`.
1248async fn respawn_upgraded_node(
1249    config: &mut NodeConfig,
1250    supervisor: &Arc<RwLock<Supervisor>>,
1251    registry: &Arc<RwLock<NodeRegistry>>,
1252    event_tx: &broadcast::Sender<NodeEvent>,
1253) -> Result<tokio::process::Child> {
1254    let node_id = config.id;
1255    let old_version = config.version.clone();
1256
1257    let new_child = spawn_node_from_config(config).await?;
1258    let pid = new_child
1259        .id()
1260        .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
1261
1262    // Read the new version from the replaced binary. If this fails we still consider the respawn
1263    // successful; we just don't refresh the recorded version this round.
1264    let new_version = extract_version(&config.binary_path).await.ok();
1265
1266    if let Some(ref version) = new_version {
1267        config.version = version.clone();
1268        let mut reg = registry.write().await;
1269        if let Ok(stored) = reg.get_mut(node_id) {
1270            stored.version = version.clone();
1271            let _ = reg.save();
1272        }
1273    }
1274
1275    {
1276        let mut sup = supervisor.write().await;
1277        if let Some(state) = sup.node_states.get_mut(&node_id) {
1278            state.status = NodeStatus::Running;
1279            state.pid = Some(pid);
1280            state.started_at = Some(Instant::now());
1281            state.pending_version = None;
1282            state.restart_count = 0;
1283            state.first_crash_at = None;
1284        }
1285    }
1286
1287    let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
1288    if let Some(version) = new_version {
1289        let _ = event_tx.send(NodeEvent::NodeUpgraded {
1290            node_id,
1291            old_version,
1292            new_version: version,
1293        });
1294    }
1295
1296    Ok(new_child)
1297}
1298
1299/// Timeout for graceful shutdown before force-killing.
1300const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
1301
1302/// Send SIGTERM to a process, wait for it to exit, and SIGKILL if it doesn't.
1303async fn graceful_kill(pid: u32) {
1304    send_signal_term(pid);
1305
1306    // Poll for process exit
1307    let start = Instant::now();
1308    loop {
1309        if !is_process_alive(pid) {
1310            return;
1311        }
1312        if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
1313            break;
1314        }
1315        tokio::time::sleep(Duration::from_millis(100)).await;
1316    }
1317
1318    // Force kill if still alive
1319    send_signal_kill(pid);
1320
1321    // Brief wait for force kill to take effect
1322    for _ in 0..10 {
1323        if !is_process_alive(pid) {
1324            return;
1325        }
1326        tokio::time::sleep(Duration::from_millis(50)).await;
1327    }
1328}
1329
1330/// Decide whether the liveness monitor should flip a node it found dead to `Stopped`.
1331///
1332/// `snapshot_pid` is the PID the sweep captured and then observed to be dead. `current_pid`
1333/// and `current_status` are the node's recorded state at the moment of the decision — which
1334/// may differ from the snapshot (e.g. an upgrade respawn replaced the PID with a live one
1335/// while leaving the status `Running`).
1336///
1337/// We only stop the node if it is still `Running` AND the recorded PID is still the one we
1338/// observed dead. The PID check is essential: between the snapshot and now, an upgrade (or
1339/// crash) respawn can have replaced the dead `snapshot_pid` with a live `current_pid` while
1340/// keeping the status `Running`. Stopping in that case would clobber a healthy, freshly
1341/// respawned process (the "running node reported as stopped after an upgrade" bug).
1342fn liveness_should_stop(
1343    snapshot_pid: u32,
1344    current_pid: Option<u32>,
1345    current_status: Option<NodeStatus>,
1346) -> bool {
1347    current_status == Some(NodeStatus::Running) && current_pid == Some(snapshot_pid)
1348}
1349
1350/// Poll each Running node's PID for OS liveness every `LIVENESS_POLL_INTERVAL`,
1351/// flipping dead ones to `Stopped` and emitting `NodeStopped`.
1352///
1353/// Exists to detect exits of nodes adopted across a daemon restart
1354/// (`Supervisor::adopt_from_registry`). Daemon-spawned nodes have a
1355/// `monitor_node` task awaiting on the owned `Child` handle, which detects
1356/// exit immediately — the poll is redundant-but-harmless for them. Adopted
1357/// nodes don't have a `Child` (it died with the previous daemon), so the poll
1358/// is the only way the supervisor learns that one has exited.
1359///
1360/// The task terminates when `shutdown` is cancelled.
1361pub fn spawn_liveness_monitor(
1362    registry: Arc<RwLock<NodeRegistry>>,
1363    supervisor: Arc<RwLock<Supervisor>>,
1364    event_tx: broadcast::Sender<NodeEvent>,
1365    interval: Duration,
1366    shutdown: CancellationToken,
1367) {
1368    tokio::spawn(async move {
1369        let mut ticker = tokio::time::interval(interval);
1370        // Don't burst-catchup after a Windows sleep/hibernate: a flood of liveness
1371        // probes serves no purpose, and uniform `Skip` policy across supervisor
1372        // monitors keeps post-wake behaviour predictable.
1373        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1374        loop {
1375            tokio::select! {
1376                _ = shutdown.cancelled() => return,
1377                _ = ticker.tick() => {}
1378            }
1379
1380            // Snapshot candidates to release locks before the per-process syscalls.
1381            let candidates: Vec<(u32, u32, PathBuf)> =
1382                {
1383                    let sup = supervisor.read().await;
1384                    let reg = registry.read().await;
1385                    reg.list()
1386                        .into_iter()
1387                        .filter_map(|config| {
1388                            let pid = sup.node_pid(config.id)?;
1389                            matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1390                                .then_some((config.id, pid, config.data_dir.clone()))
1391                        })
1392                        .collect()
1393                };
1394
1395            for (node_id, pid, data_dir) in candidates {
1396                if is_process_alive(pid) {
1397                    continue;
1398                }
1399
1400                // Adopted nodes have no owning `monitor_node`, so this poll is their only
1401                // supervisor. If such a node's process died and the on-disk binary version has
1402                // drifted from the registry, the exit was an auto-upgrade — `--stop-on-upgrade`
1403                // expects the service manager (us) to restart it. Respawn it on the new binary
1404                // and hand it a `monitor_node`, rather than leaving it dead and flagged Stopped.
1405                if supervisor.read().await.is_adopted(node_id) {
1406                    let config = {
1407                        let reg = registry.read().await;
1408                        reg.get(node_id).ok().cloned()
1409                    };
1410                    if let Some(mut config) = config {
1411                        let drifted = matches!(
1412                            extract_version(&config.binary_path).await,
1413                            Ok(disk_version) if disk_version != config.version
1414                        );
1415                        if drifted {
1416                            match respawn_upgraded_node(
1417                                &mut config,
1418                                &supervisor,
1419                                &registry,
1420                                &event_tx,
1421                            )
1422                            .await
1423                            {
1424                                Ok(child) => {
1425                                    // Now owned by this daemon: clear the adopted flag and give
1426                                    // it a monitor_node so future exits are handled there.
1427                                    supervisor.write().await.mark_owned(node_id);
1428                                    let sup_ref = Arc::clone(&supervisor);
1429                                    let reg_ref = Arc::clone(&registry);
1430                                    let ev = event_tx.clone();
1431                                    tokio::spawn(async move {
1432                                        monitor_node(child, config, sup_ref, reg_ref, ev).await;
1433                                    });
1434                                    continue;
1435                                }
1436                                Err(e) => {
1437                                    let _ = event_tx.send(NodeEvent::NodeErrored {
1438                                        node_id,
1439                                        message: format!(
1440                                            "Failed to respawn adopted node after upgrade: {e}"
1441                                        ),
1442                                    });
1443                                    let mut sup = supervisor.write().await;
1444                                    sup.update_state(node_id, NodeStatus::Errored, None);
1445                                    sup.mark_owned(node_id);
1446                                    remove_node_pid(&data_dir);
1447                                    continue;
1448                                }
1449                            }
1450                        }
1451                    }
1452                }
1453
1454                let mut sup = supervisor.write().await;
1455                // Re-check under the write lock to avoid racing with a concurrent
1456                // start/stop that flipped the state between the snapshot and now.
1457                if !liveness_should_stop(pid, sup.node_pid(node_id), sup.node_status(node_id).ok())
1458                {
1459                    continue;
1460                }
1461                sup.update_state(node_id, NodeStatus::Stopped, None);
1462                let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1463                remove_node_pid(&data_dir);
1464            }
1465        }
1466    });
1467}
1468
1469#[cfg(unix)]
1470fn pid_to_i32(pid: u32) -> Option<i32> {
1471    i32::try_from(pid).ok().filter(|&p| p > 0)
1472}
1473
1474#[cfg(unix)]
1475fn send_signal_term(pid: u32) {
1476    if let Some(pid) = pid_to_i32(pid) {
1477        unsafe {
1478            libc::kill(pid, libc::SIGTERM);
1479        }
1480    }
1481}
1482
1483#[cfg(unix)]
1484fn send_signal_kill(pid: u32) {
1485    if let Some(pid) = pid_to_i32(pid) {
1486        unsafe {
1487            libc::kill(pid, libc::SIGKILL);
1488        }
1489    }
1490}
1491
1492#[cfg(unix)]
1493fn is_process_alive(pid: u32) -> bool {
1494    let Some(pid) = pid_to_i32(pid) else {
1495        return false;
1496    };
1497    let ret = unsafe { libc::kill(pid, 0) };
1498    if ret == 0 {
1499        return true;
1500    }
1501    // EPERM means the process exists but we lack permission to signal it
1502    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1503}
1504
1505#[cfg(windows)]
1506fn send_signal_term(pid: u32) {
1507    use windows_sys::Win32::System::Console::{
1508        AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1509    };
1510
1511    unsafe {
1512        // Detach from our own console (no-op if daemon has none, which is
1513        // typical since it's spawned with DETACHED_PROCESS).
1514        FreeConsole();
1515
1516        // Attach to the target process's console and send Ctrl+C
1517        if AttachConsole(pid) != 0 {
1518            // Disable Ctrl+C handling so GenerateConsoleCtrlEvent doesn't
1519            // terminate us while we're attached to the node's console.
1520            SetConsoleCtrlHandler(None, 1);
1521            GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1522            // Detach from the node's console first — once detached, the
1523            // async Ctrl+C event can only reach the node, not us.
1524            FreeConsole();
1525            // Brief delay to let the event drain before re-enabling our
1526            // handler. Without this, the handler thread can process the
1527            // event between FreeConsole and SetConsoleCtrlHandler.
1528            std::thread::sleep(std::time::Duration::from_millis(50));
1529            // Restore Ctrl+C handling so `daemon run` (foreground mode)
1530            // can still be stopped via Ctrl+C / tokio::signal::ctrl_c().
1531            SetConsoleCtrlHandler(None, 0);
1532        }
1533    }
1534}
1535
1536#[cfg(windows)]
1537fn send_signal_kill(pid: u32) {
1538    use windows_sys::Win32::Foundation::CloseHandle;
1539    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1540
1541    unsafe {
1542        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1543        if !handle.is_null() {
1544            TerminateProcess(handle, 1);
1545            CloseHandle(handle);
1546        }
1547    }
1548}
1549
1550#[cfg(windows)]
1551fn is_process_alive(pid: u32) -> bool {
1552    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1553    use windows_sys::Win32::System::Threading::{
1554        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1555    };
1556
1557    unsafe {
1558        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1559        if handle.is_null() {
1560            return false;
1561        }
1562        let mut exit_code: u32 = 0;
1563        let success = GetExitCodeProcess(handle, &mut exit_code);
1564        CloseHandle(handle);
1565        success != 0 && exit_code == STILL_ACTIVE as u32
1566    }
1567}
1568
1569#[cfg(test)]
1570mod tests {
1571    use super::*;
1572    use crate::node::types::{EvmNetwork, UpgradeChannel};
1573
1574    #[tokio::test]
1575    async fn remove_dir_all_with_retry_deletes_tree_and_tolerates_missing() {
1576        let tmp = tempfile::tempdir().unwrap();
1577        let dir = tmp.path().join("node-data");
1578        std::fs::create_dir_all(dir.join("sub")).unwrap();
1579        std::fs::write(dir.join("sub").join("data.mdb"), vec![0u8; 128]).unwrap();
1580
1581        // Deletes an existing tree.
1582        remove_dir_all_with_retry(&dir).await.unwrap();
1583        assert!(!dir.exists());
1584
1585        // Idempotent: an already-gone path is treated as success (no error).
1586        remove_dir_all_with_retry(&dir).await.unwrap();
1587    }
1588
1589    #[tokio::test]
1590    async fn start_node_rejects_a_node_being_evicted() {
1591        let (tx, _rx) = broadcast::channel(16);
1592        let sup = Arc::new(RwLock::new(Supervisor::new(tx)));
1593
1594        let tmp = tempfile::tempdir().unwrap();
1595        let reg = Arc::new(RwLock::new(
1596            NodeRegistry::load(&tmp.path().join("reg.json")).unwrap(),
1597        ));
1598
1599        let config = NodeConfig {
1600            id: 7,
1601            service_name: "node7".to_string(),
1602            rewards_address: "0xabc".to_string(),
1603            data_dir: tmp.path().join("node-7"),
1604            log_dir: None,
1605            node_port: None,
1606            binary_path: "/bin/node".into(),
1607            version: "0.1.0".to_string(),
1608            env_variables: HashMap::new(),
1609            bootstrap_peers: vec![],
1610            upgrade_channel: None,
1611            evm_network: EvmNetwork::default(),
1612            eviction: None,
1613        };
1614
1615        // Flagged as evicting -> start is refused before any spawn attempt, closing the race with
1616        // the concurrent stop/delete in evict_node.
1617        sup.write().await.begin_evicting(7);
1618        let res = sup
1619            .write()
1620            .await
1621            .start_node(&config, sup.clone(), reg.clone())
1622            .await;
1623        assert!(matches!(res, Err(Error::NodeEvicted(7))));
1624
1625        // Clearing the flag removes the in-progress guard (the persisted marker takes over).
1626        sup.write().await.finish_evicting(7);
1627        assert!(!sup.read().await.evicting.contains(&7));
1628    }
1629
1630    #[test]
1631    fn adopted_flag_lifecycle() {
1632        let (tx, _rx) = broadcast::channel(16);
1633        let mut sup = Supervisor::new(tx);
1634
1635        // Nodes are not adopted by default.
1636        assert!(!sup.is_adopted(1));
1637
1638        // adopt_from_registry flags nodes carried over from a previous daemon.
1639        sup.adopted.insert(1);
1640        assert!(sup.is_adopted(1));
1641
1642        // Once this daemon (re)spawns the node and owns a monitor_node for it, the flag
1643        // clears so the liveness monitor stops treating its exit as needing a respawn.
1644        sup.mark_owned(1);
1645        assert!(!sup.is_adopted(1));
1646    }
1647
1648    // Regression test for the "running node reported as stopped after an upgrade" bug.
1649    //
1650    // A daemon-spawned node was respawned by monitor_node after an upgrade, so the recorded
1651    // state is now Running with a live PID_new. A liveness sweep that snapshotted the old,
1652    // now-dead PID then acts: it must NOT mark the node Stopped, because the running process
1653    // is the new one. `liveness_should_stop` guards against this by also requiring the recorded
1654    // PID to still match the one the sweep observed dead.
1655    #[test]
1656    fn liveness_does_not_stop_node_respawned_under_it() {
1657        let dead_snapshot_pid = 1000; // PID the sweep captured and found dead
1658        let live_respawned_pid = Some(2000); // PID_new from the upgrade respawn (alive)
1659        assert!(
1660            !liveness_should_stop(
1661                dead_snapshot_pid,
1662                live_respawned_pid,
1663                Some(NodeStatus::Running)
1664            ),
1665            "liveness must not stop a node whose PID changed under it (respawned with a live PID)"
1666        );
1667    }
1668
1669    #[test]
1670    fn build_node_args_basic() {
1671        let config = NodeConfig {
1672            id: 1,
1673            service_name: "node1".to_string(),
1674            rewards_address: "0xabc123".to_string(),
1675            data_dir: "/data/node-1".into(),
1676            log_dir: Some("/logs/node-1".into()),
1677            node_port: Some(12000),
1678            binary_path: "/bin/node".into(),
1679            version: "0.1.0".to_string(),
1680            env_variables: HashMap::new(),
1681            bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1682            upgrade_channel: None,
1683            evm_network: EvmNetwork::default(),
1684            eviction: None,
1685        };
1686
1687        let args = build_node_args(&config);
1688
1689        assert!(args.contains(&"--rewards-address".to_string()));
1690        assert!(args.contains(&"0xabc123".to_string()));
1691        assert!(args.contains(&"--root-dir".to_string()));
1692        assert!(args.contains(&"/data/node-1".to_string()));
1693        assert!(args.contains(&"--enable-logging".to_string()));
1694        assert!(args.contains(&"--log-dir".to_string()));
1695        assert!(args.contains(&"/logs/node-1".to_string()));
1696        assert!(args.contains(&"--port".to_string()));
1697        assert!(args.contains(&"12000".to_string()));
1698        assert!(args.contains(&"--bootstrap".to_string()));
1699        assert!(args.contains(&"peer1".to_string()));
1700        assert!(args.contains(&"peer2".to_string()));
1701        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1702        // No upgrade channel configured -> no --upgrade-channel argument.
1703        assert!(!args.contains(&"--upgrade-channel".to_string()));
1704        // EVM network defaults to Arbitrum One, emitted as a --evm-network flag.
1705        assert_eq!(evm_network_arg(&args), Some("arbitrum-one"));
1706    }
1707
1708    /// Return the value following `--evm-network` in a built arg list, if present.
1709    fn evm_network_arg(args: &[String]) -> Option<&str> {
1710        let idx = args.iter().position(|a| a == "--evm-network")?;
1711        args.get(idx + 1).map(String::as_str)
1712    }
1713
1714    #[test]
1715    fn build_node_args_emits_evm_network_flag() {
1716        let mut config = NodeConfig {
1717            id: 1,
1718            service_name: "node1".to_string(),
1719            rewards_address: "0xabc".to_string(),
1720            data_dir: "/data/node-1".into(),
1721            log_dir: None,
1722            node_port: None,
1723            binary_path: "/bin/node".into(),
1724            version: "0.1.0".to_string(),
1725            env_variables: HashMap::new(),
1726            bootstrap_peers: vec![],
1727            upgrade_channel: None,
1728            evm_network: EvmNetwork::ArbitrumSepolia,
1729            eviction: None,
1730        };
1731
1732        let args = build_node_args(&config);
1733        assert_eq!(evm_network_arg(&args), Some("arbitrum-sepolia"));
1734
1735        config.evm_network = EvmNetwork::ArbitrumOne;
1736        let args = build_node_args(&config);
1737        assert_eq!(evm_network_arg(&args), Some("arbitrum-one"));
1738    }
1739
1740    #[test]
1741    fn build_node_args_includes_upgrade_channel() {
1742        let mut config = NodeConfig {
1743            id: 1,
1744            service_name: "node1".to_string(),
1745            rewards_address: "0xabc".to_string(),
1746            data_dir: "/data/node-1".into(),
1747            log_dir: None,
1748            node_port: None,
1749            binary_path: "/bin/node".into(),
1750            version: "0.1.0".to_string(),
1751            env_variables: HashMap::new(),
1752            bootstrap_peers: vec![],
1753            upgrade_channel: Some(UpgradeChannel::Beta),
1754            evm_network: EvmNetwork::default(),
1755            eviction: None,
1756        };
1757
1758        let args = build_node_args(&config);
1759        let idx = args
1760            .iter()
1761            .position(|a| a == "--upgrade-channel")
1762            .expect("--upgrade-channel should be present");
1763        assert_eq!(args[idx + 1], "beta");
1764
1765        config.upgrade_channel = Some(UpgradeChannel::Stable);
1766        let args = build_node_args(&config);
1767        let idx = args.iter().position(|a| a == "--upgrade-channel").unwrap();
1768        assert_eq!(args[idx + 1], "stable");
1769    }
1770
1771    #[test]
1772    fn build_node_args_minimal() {
1773        let config = NodeConfig {
1774            id: 1,
1775            service_name: "node1".to_string(),
1776            rewards_address: "0xabc".to_string(),
1777            data_dir: "/data/node-1".into(),
1778            log_dir: None,
1779            node_port: None,
1780            binary_path: "/bin/node".into(),
1781            version: "0.1.0".to_string(),
1782            env_variables: HashMap::new(),
1783            bootstrap_peers: vec![],
1784            upgrade_channel: None,
1785            evm_network: EvmNetwork::default(),
1786            eviction: None,
1787        };
1788
1789        let args = build_node_args(&config);
1790
1791        assert!(args.contains(&"--rewards-address".to_string()));
1792        assert!(args.contains(&"--root-dir".to_string()));
1793        assert!(!args.contains(&"--enable-logging".to_string()));
1794        assert!(!args.contains(&"--log-dir".to_string()));
1795        assert!(!args.contains(&"--port".to_string()));
1796        assert!(!args.contains(&"--bootstrap".to_string()));
1797        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1798    }
1799
1800    #[test]
1801    fn record_crash_backoff_increases() {
1802        let (tx, _rx) = broadcast::channel(16);
1803        let mut sup = Supervisor::new(tx);
1804
1805        // Insert a running node
1806        sup.node_states.insert(
1807            1,
1808            NodeRuntime {
1809                status: NodeStatus::Running,
1810                pid: Some(100),
1811                started_at: Some(Instant::now()),
1812                restart_count: 0,
1813                first_crash_at: None,
1814                pending_version: None,
1815            },
1816        );
1817
1818        let (should_restart, attempt, backoff) = sup.record_crash(1);
1819        assert!(should_restart);
1820        assert_eq!(attempt, 1);
1821        assert_eq!(backoff, Duration::from_secs(1));
1822
1823        let (should_restart, attempt, backoff) = sup.record_crash(1);
1824        assert!(should_restart);
1825        assert_eq!(attempt, 2);
1826        assert_eq!(backoff, Duration::from_secs(2));
1827
1828        let (should_restart, attempt, backoff) = sup.record_crash(1);
1829        assert!(should_restart);
1830        assert_eq!(attempt, 3);
1831        assert_eq!(backoff, Duration::from_secs(4));
1832
1833        let (should_restart, attempt, backoff) = sup.record_crash(1);
1834        assert!(should_restart);
1835        assert_eq!(attempt, 4);
1836        assert_eq!(backoff, Duration::from_secs(8));
1837
1838        // 5th crash within window → errored
1839        let (should_restart, attempt, _) = sup.record_crash(1);
1840        assert!(!should_restart);
1841        assert_eq!(attempt, 5);
1842        assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1843    }
1844
1845    #[test]
1846    fn node_counts_tracks_states() {
1847        let (tx, _rx) = broadcast::channel(16);
1848        let mut sup = Supervisor::new(tx);
1849
1850        sup.node_states.insert(
1851            1,
1852            NodeRuntime {
1853                status: NodeStatus::Running,
1854                pid: Some(100),
1855                started_at: Some(Instant::now()),
1856                restart_count: 0,
1857                first_crash_at: None,
1858                pending_version: None,
1859            },
1860        );
1861        sup.node_states.insert(
1862            2,
1863            NodeRuntime {
1864                status: NodeStatus::Stopped,
1865                pid: None,
1866                started_at: None,
1867                restart_count: 0,
1868                first_crash_at: None,
1869                pending_version: None,
1870            },
1871        );
1872        sup.node_states.insert(
1873            3,
1874            NodeRuntime {
1875                status: NodeStatus::Errored,
1876                pid: None,
1877                started_at: None,
1878                restart_count: 5,
1879                first_crash_at: None,
1880                pending_version: None,
1881            },
1882        );
1883
1884        let (running, stopped, errored) = sup.node_counts();
1885        assert_eq!(running, 1);
1886        assert_eq!(stopped, 1);
1887        assert_eq!(errored, 1);
1888    }
1889
1890    #[test]
1891    fn mark_upgrade_scheduled_only_affects_running_nodes() {
1892        let (tx, mut rx) = broadcast::channel(16);
1893        let mut sup = Supervisor::new(tx);
1894
1895        sup.node_states.insert(
1896            1,
1897            NodeRuntime {
1898                status: NodeStatus::Running,
1899                pid: Some(111),
1900                started_at: Some(Instant::now()),
1901                restart_count: 0,
1902                first_crash_at: None,
1903                pending_version: None,
1904            },
1905        );
1906        sup.node_states.insert(
1907            2,
1908            NodeRuntime {
1909                status: NodeStatus::Stopped,
1910                pid: None,
1911                started_at: None,
1912                restart_count: 0,
1913                first_crash_at: None,
1914                pending_version: None,
1915            },
1916        );
1917
1918        // Running node: transitions to UpgradeScheduled with pending_version set and event fires.
1919        let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1920        assert!(affected);
1921        assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1922        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1923        match rx.try_recv() {
1924            Ok(NodeEvent::UpgradeScheduled {
1925                node_id,
1926                pending_version,
1927            }) => {
1928                assert_eq!(node_id, 1);
1929                assert_eq!(pending_version, "0.10.11-rc.1");
1930            }
1931            other => panic!("expected UpgradeScheduled event, got {other:?}"),
1932        }
1933
1934        // Stopped node: untouched, no event fired.
1935        let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1936        assert!(!affected);
1937        assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1938        assert!(sup.node_pending_version(2).is_none());
1939
1940        // Already-UpgradeScheduled node: calling again is a no-op.
1941        let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1942        assert!(!affected);
1943        // Pending version is the original one set.
1944        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1945    }
1946
1947    #[test]
1948    fn node_counts_counts_upgrade_scheduled_as_running() {
1949        let (tx, _rx) = broadcast::channel(16);
1950        let mut sup = Supervisor::new(tx);
1951
1952        sup.node_states.insert(
1953            1,
1954            NodeRuntime {
1955                status: NodeStatus::UpgradeScheduled,
1956                pid: Some(111),
1957                started_at: Some(Instant::now()),
1958                restart_count: 0,
1959                first_crash_at: None,
1960                pending_version: Some("0.10.11-rc.1".to_string()),
1961            },
1962        );
1963
1964        let (running, stopped, errored) = sup.node_counts();
1965        assert_eq!(running, 1);
1966        assert_eq!(stopped, 0);
1967        assert_eq!(errored, 0);
1968    }
1969
1970    #[tokio::test]
1971    async fn stop_node_not_found() {
1972        let (tx, _rx) = broadcast::channel(16);
1973        let mut sup = Supervisor::new(tx);
1974
1975        let result = sup.stop_node(999).await;
1976        assert!(matches!(result, Err(Error::NodeNotFound(999))));
1977    }
1978
1979    #[tokio::test]
1980    async fn stop_node_not_running() {
1981        let (tx, _rx) = broadcast::channel(16);
1982        let mut sup = Supervisor::new(tx);
1983
1984        sup.node_states.insert(
1985            1,
1986            NodeRuntime {
1987                status: NodeStatus::Stopped,
1988                pid: None,
1989                started_at: None,
1990                restart_count: 0,
1991                first_crash_at: None,
1992                pending_version: None,
1993            },
1994        );
1995
1996        let result = sup.stop_node(1).await;
1997        assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1998    }
1999
2000    #[tokio::test]
2001    async fn stop_all_nodes_mixed_states() {
2002        let (tx, _rx) = broadcast::channel(16);
2003        let mut sup = Supervisor::new(tx);
2004
2005        // Node 1: running (but with a fake PID that won't exist)
2006        sup.node_states.insert(
2007            1,
2008            NodeRuntime {
2009                status: NodeStatus::Running,
2010                pid: Some(999999),
2011                started_at: Some(Instant::now()),
2012                restart_count: 0,
2013                first_crash_at: None,
2014                pending_version: None,
2015            },
2016        );
2017        // Node 2: already stopped
2018        sup.node_states.insert(
2019            2,
2020            NodeRuntime {
2021                status: NodeStatus::Stopped,
2022                pid: None,
2023                started_at: None,
2024                restart_count: 0,
2025                first_crash_at: None,
2026                pending_version: None,
2027            },
2028        );
2029
2030        let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
2031
2032        let result = sup.stop_all_nodes(&configs).await;
2033
2034        assert_eq!(result.stopped.len(), 1);
2035        assert_eq!(result.stopped[0].node_id, 1);
2036        assert_eq!(result.stopped[0].service_name, "node1");
2037        assert_eq!(result.already_stopped, vec![2]);
2038        assert!(result.failed.is_empty());
2039    }
2040}