Skip to main content

ant_core/node/daemon/
supervisor.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio::time::MissedTickBehavior;
8use tokio_util::sync::CancellationToken;
9
10use crate::error::{Error, Result};
11use crate::node::binary::extract_version;
12use crate::node::events::NodeEvent;
13use crate::node::process::spawn::spawn_node;
14use crate::node::registry::NodeRegistry;
15use crate::node::types::{
16    NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
17};
18
19/// How often the upgrade-detection task polls each running node's binary for a version change.
20pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
21
22/// How often the liveness poll verifies that each Running node's OS process still exists.
23///
24/// Nodes the current daemon spawned are watched via their owned `Child` handle in
25/// `monitor_node`, so this poll exists purely to catch exits of nodes adopted across
26/// a daemon restart (whose `Child` handle died with the previous daemon). Five seconds
27/// is a rough trade-off: long enough that the syscall cost is negligible, short enough
28/// that a crashed adopted node still looks broken to the user within a few heartbeats.
29pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
30
31/// Path of the pid file a running node writes to so a future daemon instance can
32/// adopt it across restarts. Lives alongside the node's other on-disk state.
33fn node_pid_file(data_dir: &Path) -> PathBuf {
34    data_dir.join("node.pid")
35}
36
37/// Persist the running node's PID to `<data_dir>/node.pid`. Best-effort: a failure
38/// here only costs us the ability to adopt the node after a daemon restart, so we
39/// warn and continue rather than aborting the start.
40fn write_node_pid(data_dir: &Path, pid: u32) {
41    let path = node_pid_file(data_dir);
42    if let Err(e) = std::fs::write(&path, pid.to_string()) {
43        tracing::warn!(
44            "Failed to write node pid file at {}: {e}. Node will still run, but a future \
45             daemon restart will not be able to adopt it.",
46            path.display()
47        );
48    }
49}
50
51/// Remove the pid file. Called on every terminal-exit path in `monitor_node` so the
52/// next daemon doesn't try to adopt a PID belonging to a process that's gone.
53fn remove_node_pid(data_dir: &Path) {
54    let _ = std::fs::remove_file(node_pid_file(data_dir));
55}
56
57/// Read the pid file without validating liveness. Returns `None` if the file is
58/// missing or its contents can't be parsed as a u32.
59fn read_node_pid(data_dir: &Path) -> Option<u32> {
60    std::fs::read_to_string(node_pid_file(data_dir))
61        .ok()
62        .and_then(|s| s.trim().parse().ok())
63}
64
65/// Scan the OS process table for a running node that matches `config`, as a
66/// fallback for when `<data_dir>/node.pid` is missing or stale.
67///
68/// Nodes spawned by a pre-adoption daemon never had a pid file written, so
69/// without this scan the first restart after installing the adoption fix
70/// would leave every previously-running node classified as Stopped. The scan
71/// matches on:
72///
73/// - executable path identical to `config.binary_path`, AND
74/// - command line containing `--root-dir` (as a standalone arg or
75///   `--root-dir=<path>`) whose value resolves to `config.data_dir`.
76///
77/// The double match keeps us safe when multiple nodes share the same binary
78/// on disk (common on installs where one copy services several data dirs).
79///
80/// Returns `None` if no running process matches.
81fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
82    let target_data_dir = config.data_dir.as_path();
83    for (pid, process) in sys.processes() {
84        // On Linux, `sys.processes()` enumerates /proc/<pid>/task/<tid> too, so
85        // worker threads appear alongside their thread-group leader and share
86        // the same exe + cmdline. Skip threads — we want the TGID (the real
87        // process), which is the only PID safe to signal.
88        if process.thread_kind().is_some() {
89            continue;
90        }
91        let Some(exe) = process.exe() else {
92            continue;
93        };
94        if exe != config.binary_path.as_path() {
95            continue;
96        }
97
98        let cmd = process.cmd();
99        let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
100            let arg = arg.to_string_lossy();
101            if let Some(value) = arg.strip_prefix("--root-dir=") {
102                Path::new(value) == target_data_dir
103            } else if arg == "--root-dir" {
104                cmd.get(i + 1)
105                    .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
106                    .unwrap_or(false)
107            } else {
108                false
109            }
110        });
111
112        if matches_root_dir {
113            return Some(pid.as_u32());
114        }
115    }
116    None
117}
118
119/// Check whether `pid` refers to a live, non-thread process. On Linux,
120/// `kill(tid, 0)` returns success for any thread's TID, not just the
121/// thread-group leader — so liveness alone is not enough to trust a PID
122/// loaded from the pid file. Consulting sysinfo's `thread_kind()` tells us
123/// whether the entry is a userland thread (TID) vs. the actual process
124/// (TGID). A missing sysinfo entry with a live PID is still treated as a
125/// process, since older daemons could have written the PID before sysinfo
126/// saw it.
127fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
128    if !is_process_alive(pid) {
129        return false;
130    }
131    match sys.process(sysinfo::Pid::from_u32(pid)) {
132        Some(process) => process.thread_kind().is_none(),
133        None => true,
134    }
135}
136
137/// Determine the PID to adopt for a node, trying the pid file first and
138/// falling back to a process-table scan. On successful scan, writes the pid
139/// file so the next adoption takes the fast path.
140///
141/// Returns `None` if no live process can be attributed to this node.
142fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
143    if let Some(pid) = read_node_pid(&config.data_dir) {
144        if pid_is_live_process(pid, sys) {
145            return Some(pid);
146        }
147        // Pid file points at a dead process or a thread TID (legacy daemons
148        // could record a TID because the fallback scan saw threads). Don't
149        // leave it around to mislead the next adoption pass.
150        remove_node_pid(&config.data_dir);
151    }
152
153    let pid = find_running_node_process(sys, config)?;
154    write_node_pid(&config.data_dir, pid);
155    Some(pid)
156}
157
158/// Build an `Instant` that reports the real process start time when
159/// `.elapsed()` is called on it — so uptime survives daemon restarts
160/// accurately for adopted nodes.
161///
162/// `sysinfo::Process::start_time()` returns seconds since the UNIX epoch
163/// (wall clock). `Instant` is monotonic and can't be constructed from a
164/// wall-clock value directly, so we back-date `Instant::now()` by the
165/// process's age. Returns `None` if the PID isn't in the snapshot (the
166/// process exited between scan and this call), if the system clock looks
167/// broken, or if subtraction would overflow (unrealistically-old process
168/// start times).
169fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
170    let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
171    let now_secs = std::time::SystemTime::now()
172        .duration_since(std::time::UNIX_EPOCH)
173        .ok()?
174        .as_secs();
175    let age = now_secs.saturating_sub(start_secs);
176    Instant::now().checked_sub(Duration::from_secs(age))
177}
178
179/// Maximum restart attempts before marking a node as errored.
180const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
181
182/// Window in which crashes are counted. If this many crashes happen within
183/// this duration, the node is marked errored.
184const CRASH_WINDOW: Duration = Duration::from_secs(300); // 5 minutes
185
186/// If a node runs for this long without crashing, reset the crash counter.
187const STABLE_DURATION: Duration = Duration::from_secs(300); // 5 minutes
188
189/// Maximum backoff delay between restarts.
190const MAX_BACKOFF: Duration = Duration::from_secs(60);
191
192/// Manages running node processes. Holds child process handles and runtime state.
193pub struct Supervisor {
194    event_tx: broadcast::Sender<NodeEvent>,
195    /// Runtime status of each node, keyed by node ID.
196    node_states: HashMap<u32, NodeRuntime>,
197}
198
199struct NodeRuntime {
200    status: NodeStatus,
201    pid: Option<u32>,
202    started_at: Option<Instant>,
203    restart_count: u32,
204    first_crash_at: Option<Instant>,
205    /// When `status == UpgradeScheduled`, the target version the on-disk binary now reports.
206    pending_version: Option<String>,
207}
208
209impl Supervisor {
210    pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
211        Self {
212            event_tx,
213            node_states: HashMap::new(),
214        }
215    }
216
217    /// Start a node by spawning the actual process.
218    ///
219    /// Returns `NodeStarted` on success. Spawns a background monitoring task
220    /// that watches the child process and handles restart logic.
221    pub async fn start_node(
222        &mut self,
223        config: &NodeConfig,
224        supervisor_ref: Arc<RwLock<Supervisor>>,
225        registry_ref: Arc<RwLock<NodeRegistry>>,
226    ) -> Result<NodeStarted> {
227        let node_id = config.id;
228
229        if let Some(state) = self.node_states.get(&node_id) {
230            if state.status == NodeStatus::Running {
231                return Err(Error::NodeAlreadyRunning(node_id));
232            }
233        }
234
235        let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
236
237        let mut child = spawn_node_from_config(config).await?;
238        let pid = child
239            .id()
240            .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
241
242        // Brief health check: give the process a moment to start, then check if it
243        // exited immediately. This catches errors like invalid CLI arguments or missing
244        // shared libraries. We use timeout + wait() rather than try_wait() because
245        // tokio's child reaper requires the wait future to be polled.
246        match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
247            Ok(Ok(exit_status)) => {
248                // Process already exited — read stderr for details.
249                // spawn_node always redirects stderr to a file in the log dir
250                // (falling back to data_dir when no log dir is configured).
251                let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
252                let stderr_path = spawn_log_dir.join("stderr.log");
253                let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
254                let detail = if stderr_msg.trim().is_empty() {
255                    format!("exit code: {exit_status}")
256                } else {
257                    stderr_msg.trim().to_string()
258                };
259                self.node_states.insert(
260                    node_id,
261                    NodeRuntime {
262                        status: NodeStatus::Errored,
263                        pid: None,
264                        started_at: None,
265                        restart_count: 0,
266                        first_crash_at: None,
267                        pending_version: None,
268                    },
269                );
270                return Err(Error::ProcessSpawn(format!(
271                    "Node {node_id} exited immediately: {detail}"
272                )));
273            }
274            Ok(Err(e)) => {
275                return Err(Error::ProcessSpawn(format!(
276                    "Failed to check node process status: {e}"
277                )));
278            }
279            Err(_) => {} // Timeout — process is still running after 1s, good
280        }
281
282        self.node_states.insert(
283            node_id,
284            NodeRuntime {
285                status: NodeStatus::Running,
286                pid: Some(pid),
287                started_at: Some(Instant::now()),
288                restart_count: 0,
289                first_crash_at: None,
290                pending_version: None,
291            },
292        );
293
294        let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
295
296        let result = NodeStarted {
297            node_id,
298            service_name: config.service_name.clone(),
299            pid,
300        };
301
302        // Spawn monitoring task
303        let event_tx = self.event_tx.clone();
304        let config = config.clone();
305        tokio::spawn(async move {
306            monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
307        });
308
309        Ok(result)
310    }
311
312    /// Stop a node by gracefully terminating its process.
313    ///
314    /// Sends SIGTERM (Unix) or kills (Windows), waits up to 10 seconds for exit,
315    /// then sends SIGKILL if needed. The monitor task detects the Stopping status
316    /// and exits cleanly without attempting a restart.
317    pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
318        let state = self
319            .node_states
320            .get_mut(&node_id)
321            .ok_or(Error::NodeNotFound(node_id))?;
322
323        if state.status != NodeStatus::Running {
324            return Err(Error::NodeNotRunning(node_id));
325        }
326
327        let pid = state.pid;
328
329        let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
330        state.status = NodeStatus::Stopping;
331
332        if let Some(pid) = pid {
333            graceful_kill(pid).await;
334        }
335
336        // Update state after kill
337        let state = self.node_states.get_mut(&node_id).unwrap();
338        state.status = NodeStatus::Stopped;
339        state.pid = None;
340        state.started_at = None;
341
342        let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
343
344        Ok(())
345    }
346
347    /// Stop all running nodes, returning an aggregate result.
348    pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
349        let mut stopped = Vec::new();
350        let mut failed = Vec::new();
351        let mut already_stopped = Vec::new();
352
353        for (node_id, service_name) in configs {
354            let node_id = *node_id;
355            match self.node_status(node_id) {
356                Ok(NodeStatus::Running) => {}
357                Ok(_) => {
358                    already_stopped.push(node_id);
359                    continue;
360                }
361                Err(_) => {
362                    already_stopped.push(node_id);
363                    continue;
364                }
365            }
366
367            match self.stop_node(node_id).await {
368                Ok(()) => {
369                    stopped.push(NodeStopped {
370                        node_id,
371                        service_name: service_name.clone(),
372                    });
373                }
374                Err(Error::NodeNotRunning(_)) => {
375                    already_stopped.push(node_id);
376                }
377                Err(e) => {
378                    failed.push(NodeStopFailed {
379                        node_id,
380                        service_name: service_name.clone(),
381                        error: e.to_string(),
382                    });
383                }
384            }
385        }
386
387        StopNodeResult {
388            stopped,
389            failed,
390            already_stopped,
391        }
392    }
393
394    /// Get the status of a node.
395    pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
396        self.node_states
397            .get(&node_id)
398            .map(|s| s.status)
399            .ok_or(Error::NodeNotFound(node_id))
400    }
401
402    /// Get the PID of a running node.
403    pub fn node_pid(&self, node_id: u32) -> Option<u32> {
404        self.node_states.get(&node_id).and_then(|s| s.pid)
405    }
406
407    /// Get the uptime of a running node in seconds.
408    pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
409        self.node_states
410            .get(&node_id)
411            .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
412    }
413
414    /// The target version when the node is in `UpgradeScheduled` state, otherwise `None`.
415    pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
416        self.node_states
417            .get(&node_id)
418            .and_then(|s| s.pending_version.clone())
419    }
420
421    /// Transition a Running node into `UpgradeScheduled` with the target version.
422    ///
423    /// Only affects nodes currently in `Running`: any other state is left alone (a stopped
424    /// node legitimately has an out-of-date binary; a node already in UpgradeScheduled has
425    /// already been marked). Returns `true` if the transition happened.
426    fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
427        let Some(state) = self.node_states.get_mut(&node_id) else {
428            return false;
429        };
430        if state.status != NodeStatus::Running {
431            return false;
432        }
433        state.status = NodeStatus::UpgradeScheduled;
434        state.pending_version = Some(pending_version.clone());
435        let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
436            node_id,
437            pending_version,
438        });
439        true
440    }
441
442    /// Check whether a node is running.
443    pub fn is_running(&self, node_id: u32) -> bool {
444        self.node_states
445            .get(&node_id)
446            .is_some_and(|s| s.status == NodeStatus::Running)
447    }
448
449    /// Get counts of nodes in each state: (running, stopped, errored).
450    pub fn node_counts(&self) -> (u32, u32, u32) {
451        let mut running = 0u32;
452        let mut stopped = 0u32;
453        let mut errored = 0u32;
454        for state in self.node_states.values() {
455            match state.status {
456                // UpgradeScheduled means the process is still running; count it with running.
457                NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
458                    running += 1
459                }
460                NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
461                NodeStatus::Errored => errored += 1,
462            }
463        }
464        (running, stopped, errored)
465    }
466
467    /// Update the runtime state for a node (used by the monitor task).
468    fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
469        if let Some(state) = self.node_states.get_mut(&node_id) {
470            state.status = status;
471            state.pid = pid;
472            if status == NodeStatus::Running {
473                state.started_at = Some(Instant::now());
474            } else {
475                // Clear uptime tracking for non-running states so status
476                // responses don't report a stale `uptime_secs` after the node
477                // exits (e.g. liveness monitor detecting an external kill).
478                state.started_at = None;
479            }
480        }
481    }
482
483    /// Restore running-node state from a previous daemon instance.
484    ///
485    /// For each registered node, determines the PID to adopt via
486    /// `resolve_adopted_pid`: try `<data_dir>/node.pid` first, and if it's
487    /// missing or stale, fall back to a process-table scan matching the
488    /// node's binary path and `--root-dir` argument. Live matches are
489    /// inserted into `node_states` as `Running`.
490    ///
491    /// The scan is what covers the upgrade path: nodes spawned by a
492    /// pre-adoption daemon never had a pid file written, so without the
493    /// fallback the first restart after installing this fix would still
494    /// leave every previously-running node classified as Stopped.
495    ///
496    /// Must be called before the HTTP server starts accepting requests —
497    /// the window between `Supervisor::new` and adoption is where the API
498    /// would otherwise report live nodes as Stopped. Adopted nodes have no
499    /// associated `monitor_node` task (the `tokio::process::Child` handle
500    /// belonged to the previous daemon, and `tokio::process::Child::wait`
501    /// only works for the process's actual parent). Their exits are
502    /// detected instead by the `spawn_liveness_monitor` polling task.
503    ///
504    /// Returns the list of node IDs that were adopted.
505    pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
506        // Populated upfront so every adopted node gets its real start time via
507        // `process_started_at`, not just those that went through the scan
508        // fallback. The extra ~50 ms at daemon startup is a one-time cost
509        // that's cheaper than users seeing uptime reset every time the daemon
510        // restarts.
511        let mut sys = sysinfo::System::new();
512        sys.refresh_processes_specifics(
513            sysinfo::ProcessesToUpdate::All,
514            true,
515            sysinfo::ProcessRefreshKind::everything(),
516        );
517
518        let mut adopted = Vec::new();
519        for config in registry.list() {
520            let Some(pid) = resolve_adopted_pid(config, &sys) else {
521                continue;
522            };
523            self.node_states.insert(
524                config.id,
525                NodeRuntime {
526                    status: NodeStatus::Running,
527                    pid: Some(pid),
528                    // Back-date to the real process start time so uptime
529                    // reported to the API is wall-clock accurate across
530                    // daemon restarts. Falls back to `Instant::now()` only
531                    // if sysinfo can't report the start time (PID raced out
532                    // of the snapshot, or a broken clock) — better to show
533                    // uptime counting from adoption than to claim the node
534                    // is Stopped.
535                    started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
536                    restart_count: 0,
537                    first_crash_at: None,
538                    pending_version: None,
539                },
540            );
541            let _ = self.event_tx.send(NodeEvent::NodeStarted {
542                node_id: config.id,
543                pid,
544            });
545            adopted.push(config.id);
546        }
547        adopted
548    }
549
550    /// Record a crash and determine if the node should be restarted or marked errored.
551    /// Returns (should_restart, attempt_number, backoff_duration).
552    fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
553        let state = match self.node_states.get_mut(&node_id) {
554            Some(s) => s,
555            None => return (false, 0, Duration::ZERO),
556        };
557
558        let now = Instant::now();
559
560        // Check if we were stable long enough to reset crash counter
561        if let Some(started_at) = state.started_at {
562            if started_at.elapsed() >= STABLE_DURATION {
563                state.restart_count = 0;
564                state.first_crash_at = None;
565            }
566        }
567
568        state.restart_count += 1;
569        let attempt = state.restart_count;
570
571        if state.first_crash_at.is_none() {
572            state.first_crash_at = Some(now);
573        }
574
575        // Check if too many crashes in the window
576        if let Some(first_crash) = state.first_crash_at {
577            if attempt >= MAX_CRASHES_BEFORE_ERRORED
578                && now.duration_since(first_crash) < CRASH_WINDOW
579            {
580                state.status = NodeStatus::Errored;
581                state.pid = None;
582                state.started_at = None;
583                return (false, attempt, Duration::ZERO);
584            }
585        }
586
587        // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s cap
588        let backoff_secs = 1u64 << (attempt - 1).min(5);
589        let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
590
591        (true, attempt, backoff)
592    }
593}
594
595/// Periodically probe each Running node's on-disk binary for a version change.
596///
597/// When a node's binary-on-disk reports a different version than was recorded in the registry
598/// at `ant node add` time, ant-node has replaced the binary in place as part of its auto-upgrade
599/// flow and will restart the process shortly. We flip the node to `UpgradeScheduled` with the
600/// target version, which lets `ant node status` render the in-between state and lets
601/// `monitor_node` reclassify the upcoming clean exit as an expected restart rather than a crash.
602///
603/// The task exits when `shutdown` is cancelled.
604pub fn spawn_upgrade_monitor(
605    registry: Arc<RwLock<NodeRegistry>>,
606    supervisor: Arc<RwLock<Supervisor>>,
607    interval: Duration,
608    shutdown: CancellationToken,
609) {
610    tokio::spawn(async move {
611        let mut ticker = tokio::time::interval(interval);
612        // After a Windows sleep/hibernate the default `Burst` catch-up would fire one
613        // tick per missed interval back-to-back, producing a flood of `extract_version`
614        // subprocess spawns. `Skip` resumes on the next aligned tick instead.
615        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
616        // Skip the immediate first tick — we don't want to probe while nodes are still in the
617        // Starting -> Running transition.
618        ticker.tick().await;
619
620        loop {
621            tokio::select! {
622                _ = shutdown.cancelled() => return,
623                _ = ticker.tick() => {},
624            }
625
626            // Collect a snapshot of (node_id, binary_path, recorded_version, current_pending)
627            // to release the locks before running --version subprocesses (which take time).
628            let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
629                let reg = registry.read().await;
630                let sup = supervisor.read().await;
631                reg.list()
632                    .into_iter()
633                    .filter_map(|config| match sup.node_status(config.id) {
634                        Ok(NodeStatus::Running) => Some((
635                            config.id,
636                            config.binary_path.clone(),
637                            config.version.clone(),
638                            sup.node_pending_version(config.id),
639                        )),
640                        _ => None,
641                    })
642                    .collect()
643            };
644
645            for (node_id, binary_path, recorded_version, current_pending) in candidates {
646                let observed = match extract_version(&binary_path).await {
647                    Ok(v) => v,
648                    // Transient failures (e.g. binary mid-replacement) — skip this round.
649                    Err(_) => continue,
650                };
651                if observed == recorded_version {
652                    continue;
653                }
654                if current_pending.as_deref() == Some(observed.as_str()) {
655                    continue;
656                }
657                supervisor
658                    .write()
659                    .await
660                    .mark_upgrade_scheduled(node_id, observed);
661            }
662        }
663    });
664}
665
666/// Build CLI arguments for the node binary from a NodeConfig.
667pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
668    let mut args = vec![
669        "--rewards-address".to_string(),
670        config.rewards_address.clone(),
671        "--root-dir".to_string(),
672        config.data_dir.display().to_string(),
673    ];
674
675    if let Some(ref log_dir) = config.log_dir {
676        args.push("--enable-logging".to_string());
677        args.push("--log-dir".to_string());
678        args.push(log_dir.display().to_string());
679    }
680
681    if let Some(port) = config.node_port {
682        args.push("--port".to_string());
683        args.push(port.to_string());
684    }
685
686    if let Some(port) = config.metrics_port {
687        args.push("--metrics-port".to_string());
688        args.push(port.to_string());
689    }
690
691    for peer in &config.bootstrap_peers {
692        args.push("--bootstrap".to_string());
693        args.push(peer.clone());
694    }
695
696    // The daemon's supervisor is the service manager. Tell ant-node not to spawn its own
697    // replacement on auto-upgrade; instead, exit cleanly and let us respawn. Without this,
698    // ant-node's default spawn-grandchild-then-exit flow races for the node's port during
699    // the parent's graceful shutdown and the grandchild fails to bind.
700    args.push("--stop-on-upgrade".to_string());
701
702    args
703}
704
705/// Spawn a node process from a NodeConfig.
706///
707/// Writes `<data_dir>/node.pid` on successful spawn so that a future daemon instance
708/// can adopt the running process via `Supervisor::adopt_from_registry`. The file is
709/// cleaned up by `monitor_node` on the node's terminal exit.
710async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
711    let args = build_node_args(config);
712    let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
713
714    let log_dir = config
715        .log_dir
716        .as_deref()
717        .unwrap_or(config.data_dir.as_path());
718
719    let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
720    if let Some(pid) = child.id() {
721        write_node_pid(&config.data_dir, pid);
722    }
723    Ok(child)
724}
725
726/// Monitor a node process. On exit, handle restart logic. On permanent exit
727/// (user stop, crash limit, errored), cleans up the pid file so a subsequent
728/// daemon restart doesn't try to adopt a dead process.
729async fn monitor_node(
730    child: tokio::process::Child,
731    mut config: NodeConfig,
732    supervisor: Arc<RwLock<Supervisor>>,
733    registry: Arc<RwLock<NodeRegistry>>,
734    event_tx: broadcast::Sender<NodeEvent>,
735) {
736    monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
737    remove_node_pid(&config.data_dir);
738}
739
740async fn monitor_node_inner(
741    mut child: tokio::process::Child,
742    config: &mut NodeConfig,
743    supervisor: Arc<RwLock<Supervisor>>,
744    registry: Arc<RwLock<NodeRegistry>>,
745    event_tx: broadcast::Sender<NodeEvent>,
746) {
747    let node_id = config.id;
748
749    loop {
750        // Wait for the process to exit
751        let exit_status = child.wait().await;
752
753        // Check whether this is a scheduled upgrade restart or an intentional stop.
754        let status_at_exit = {
755            let sup = supervisor.read().await;
756            sup.node_status(node_id).ok()
757        };
758
759        match status_at_exit {
760            Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
761            Some(NodeStatus::UpgradeScheduled) => {
762                // ant-node cleanly exited after replacing its binary in place. Respawn
763                // directly (no backoff, no crash counter) and refresh the recorded version.
764                match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
765                    Ok(new_child) => {
766                        child = new_child;
767                        continue;
768                    }
769                    Err(e) => {
770                        let _ = event_tx.send(NodeEvent::NodeErrored {
771                            node_id,
772                            message: format!("Failed to respawn after upgrade: {e}"),
773                        });
774                        let mut sup = supervisor.write().await;
775                        sup.update_state(node_id, NodeStatus::Errored, None);
776                        return;
777                    }
778                }
779            }
780            _ => {}
781        }
782
783        let exit_code = exit_status.ok().and_then(|s| s.code());
784
785        // A process-reported exit that wasn't user-initiated (Stopping was filtered above) is
786        // either an auto-upgrade (exit 0 after ant-node replaced its binary) or a crash. In
787        // neither case should the node be parked in `Stopped` — that state is reserved for
788        // intentional user stops.
789        //
790        // Distinguish upgrade from crash by checking whether the on-disk binary's version
791        // drifted from the registry. Between replacing its binary and actually exiting,
792        // ant-node can hold the process open for anywhere from seconds to minutes, depending
793        // on in-flight work and its own config. The periodic version poll will usually have
794        // flipped the node to `UpgradeScheduled` well before the exit, but when the window is
795        // short we cannot rely on that — hence this synchronous re-check here.
796        if exit_code == Some(0) {
797            if let Ok(disk_version) = extract_version(&config.binary_path).await {
798                if disk_version != config.version {
799                    {
800                        let mut sup = supervisor.write().await;
801                        sup.mark_upgrade_scheduled(node_id, disk_version.clone());
802                    }
803                    match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
804                        Ok(new_child) => {
805                            child = new_child;
806                            continue;
807                        }
808                        Err(e) => {
809                            let _ = event_tx.send(NodeEvent::NodeErrored {
810                                node_id,
811                                message: format!("Failed to respawn after upgrade: {e}"),
812                            });
813                            let mut sup = supervisor.write().await;
814                            sup.update_state(node_id, NodeStatus::Errored, None);
815                            return;
816                        }
817                    }
818                }
819            }
820            // Exit 0 but the binary didn't change — fall through to the crash / restart path.
821            // We report the crash with the exit code preserved; the crash counter guards
822            // against infinite restart loops if the process keeps exiting immediately.
823        }
824
825        // Crash (or clean exit that wasn't an upgrade)
826        let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
827
828        let (should_restart, attempt, backoff) = {
829            let mut sup = supervisor.write().await;
830            sup.record_crash(node_id)
831        };
832
833        if !should_restart {
834            let _ = event_tx.send(NodeEvent::NodeErrored {
835                node_id,
836                message: format!(
837                    "Node crashed {} times within {} seconds, giving up",
838                    MAX_CRASHES_BEFORE_ERRORED,
839                    CRASH_WINDOW.as_secs()
840                ),
841            });
842            return;
843        }
844
845        let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
846
847        tokio::time::sleep(backoff).await;
848
849        // Try to restart
850        match spawn_node_from_config(&*config).await {
851            Ok(new_child) => {
852                let pid = match new_child.id() {
853                    Some(pid) => pid,
854                    None => {
855                        // Process exited before we could read its PID
856                        let _ = event_tx.send(NodeEvent::NodeErrored {
857                            node_id,
858                            message: "Restarted process exited before PID could be read"
859                                .to_string(),
860                        });
861                        let mut sup = supervisor.write().await;
862                        sup.update_state(node_id, NodeStatus::Errored, None);
863                        return;
864                    }
865                };
866                {
867                    let mut sup = supervisor.write().await;
868                    sup.update_state(node_id, NodeStatus::Running, Some(pid));
869                }
870                let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
871                child = new_child;
872            }
873            Err(e) => {
874                let _ = event_tx.send(NodeEvent::NodeErrored {
875                    node_id,
876                    message: format!("Failed to restart node: {e}"),
877                });
878                let mut sup = supervisor.write().await;
879                sup.update_state(node_id, NodeStatus::Errored, None);
880                return;
881            }
882        }
883    }
884}
885
886/// Respawn a node whose `UpgradeScheduled` status tells us the exit was expected.
887///
888/// On success: persists the new version to the registry, updates the in-memory config clone,
889/// clears pending_version, sets status back to Running, and fires `NodeUpgraded`.
890async fn respawn_upgraded_node(
891    config: &mut NodeConfig,
892    supervisor: &Arc<RwLock<Supervisor>>,
893    registry: &Arc<RwLock<NodeRegistry>>,
894    event_tx: &broadcast::Sender<NodeEvent>,
895) -> Result<tokio::process::Child> {
896    let node_id = config.id;
897    let old_version = config.version.clone();
898
899    let new_child = spawn_node_from_config(config).await?;
900    let pid = new_child
901        .id()
902        .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
903
904    // Read the new version from the replaced binary. If this fails we still consider the respawn
905    // successful; we just don't refresh the recorded version this round.
906    let new_version = extract_version(&config.binary_path).await.ok();
907
908    if let Some(ref version) = new_version {
909        config.version = version.clone();
910        let mut reg = registry.write().await;
911        if let Ok(stored) = reg.get_mut(node_id) {
912            stored.version = version.clone();
913            let _ = reg.save();
914        }
915    }
916
917    {
918        let mut sup = supervisor.write().await;
919        if let Some(state) = sup.node_states.get_mut(&node_id) {
920            state.status = NodeStatus::Running;
921            state.pid = Some(pid);
922            state.started_at = Some(Instant::now());
923            state.pending_version = None;
924            state.restart_count = 0;
925            state.first_crash_at = None;
926        }
927    }
928
929    let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
930    if let Some(version) = new_version {
931        let _ = event_tx.send(NodeEvent::NodeUpgraded {
932            node_id,
933            old_version,
934            new_version: version,
935        });
936    }
937
938    Ok(new_child)
939}
940
941/// Timeout for graceful shutdown before force-killing.
942const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
943
944/// Send SIGTERM to a process, wait for it to exit, and SIGKILL if it doesn't.
945async fn graceful_kill(pid: u32) {
946    send_signal_term(pid);
947
948    // Poll for process exit
949    let start = Instant::now();
950    loop {
951        if !is_process_alive(pid) {
952            return;
953        }
954        if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
955            break;
956        }
957        tokio::time::sleep(Duration::from_millis(100)).await;
958    }
959
960    // Force kill if still alive
961    send_signal_kill(pid);
962
963    // Brief wait for force kill to take effect
964    for _ in 0..10 {
965        if !is_process_alive(pid) {
966            return;
967        }
968        tokio::time::sleep(Duration::from_millis(50)).await;
969    }
970}
971
972/// Poll each Running node's PID for OS liveness every `LIVENESS_POLL_INTERVAL`,
973/// flipping dead ones to `Stopped` and emitting `NodeStopped`.
974///
975/// Exists to detect exits of nodes adopted across a daemon restart
976/// (`Supervisor::adopt_from_registry`). Daemon-spawned nodes have a
977/// `monitor_node` task awaiting on the owned `Child` handle, which detects
978/// exit immediately — the poll is redundant-but-harmless for them. Adopted
979/// nodes don't have a `Child` (it died with the previous daemon), so the poll
980/// is the only way the supervisor learns that one has exited.
981///
982/// The task terminates when `shutdown` is cancelled.
983pub fn spawn_liveness_monitor(
984    registry: Arc<RwLock<NodeRegistry>>,
985    supervisor: Arc<RwLock<Supervisor>>,
986    event_tx: broadcast::Sender<NodeEvent>,
987    interval: Duration,
988    shutdown: CancellationToken,
989) {
990    tokio::spawn(async move {
991        let mut ticker = tokio::time::interval(interval);
992        // Don't burst-catchup after a Windows sleep/hibernate: a flood of liveness
993        // probes serves no purpose, and uniform `Skip` policy across supervisor
994        // monitors keeps post-wake behaviour predictable.
995        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
996        loop {
997            tokio::select! {
998                _ = shutdown.cancelled() => return,
999                _ = ticker.tick() => {}
1000            }
1001
1002            // Snapshot candidates to release locks before the per-process syscalls.
1003            let candidates: Vec<(u32, u32, PathBuf)> =
1004                {
1005                    let sup = supervisor.read().await;
1006                    let reg = registry.read().await;
1007                    reg.list()
1008                        .into_iter()
1009                        .filter_map(|config| {
1010                            let pid = sup.node_pid(config.id)?;
1011                            matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1012                                .then_some((config.id, pid, config.data_dir.clone()))
1013                        })
1014                        .collect()
1015                };
1016
1017            for (node_id, pid, data_dir) in candidates {
1018                if is_process_alive(pid) {
1019                    continue;
1020                }
1021                let mut sup = supervisor.write().await;
1022                // Re-check under the write lock to avoid racing with a concurrent
1023                // start/stop that flipped the state between the snapshot and now.
1024                if !matches!(sup.node_status(node_id), Ok(NodeStatus::Running)) {
1025                    continue;
1026                }
1027                sup.update_state(node_id, NodeStatus::Stopped, None);
1028                let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1029                remove_node_pid(&data_dir);
1030            }
1031        }
1032    });
1033}
1034
1035#[cfg(unix)]
1036fn pid_to_i32(pid: u32) -> Option<i32> {
1037    i32::try_from(pid).ok().filter(|&p| p > 0)
1038}
1039
1040#[cfg(unix)]
1041fn send_signal_term(pid: u32) {
1042    if let Some(pid) = pid_to_i32(pid) {
1043        unsafe {
1044            libc::kill(pid, libc::SIGTERM);
1045        }
1046    }
1047}
1048
1049#[cfg(unix)]
1050fn send_signal_kill(pid: u32) {
1051    if let Some(pid) = pid_to_i32(pid) {
1052        unsafe {
1053            libc::kill(pid, libc::SIGKILL);
1054        }
1055    }
1056}
1057
1058#[cfg(unix)]
1059fn is_process_alive(pid: u32) -> bool {
1060    let Some(pid) = pid_to_i32(pid) else {
1061        return false;
1062    };
1063    let ret = unsafe { libc::kill(pid, 0) };
1064    if ret == 0 {
1065        return true;
1066    }
1067    // EPERM means the process exists but we lack permission to signal it
1068    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1069}
1070
1071#[cfg(windows)]
1072fn send_signal_term(pid: u32) {
1073    use windows_sys::Win32::System::Console::{
1074        AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1075    };
1076
1077    unsafe {
1078        // Detach from our own console (no-op if daemon has none, which is
1079        // typical since it's spawned with DETACHED_PROCESS).
1080        FreeConsole();
1081
1082        // Attach to the target process's console and send Ctrl+C
1083        if AttachConsole(pid) != 0 {
1084            // Disable Ctrl+C handling so GenerateConsoleCtrlEvent doesn't
1085            // terminate us while we're attached to the node's console.
1086            SetConsoleCtrlHandler(None, 1);
1087            GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1088            // Detach from the node's console first — once detached, the
1089            // async Ctrl+C event can only reach the node, not us.
1090            FreeConsole();
1091            // Brief delay to let the event drain before re-enabling our
1092            // handler. Without this, the handler thread can process the
1093            // event between FreeConsole and SetConsoleCtrlHandler.
1094            std::thread::sleep(std::time::Duration::from_millis(50));
1095            // Restore Ctrl+C handling so `daemon run` (foreground mode)
1096            // can still be stopped via Ctrl+C / tokio::signal::ctrl_c().
1097            SetConsoleCtrlHandler(None, 0);
1098        }
1099    }
1100}
1101
1102#[cfg(windows)]
1103fn send_signal_kill(pid: u32) {
1104    use windows_sys::Win32::Foundation::CloseHandle;
1105    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1106
1107    unsafe {
1108        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1109        if !handle.is_null() {
1110            TerminateProcess(handle, 1);
1111            CloseHandle(handle);
1112        }
1113    }
1114}
1115
1116#[cfg(windows)]
1117fn is_process_alive(pid: u32) -> bool {
1118    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1119    use windows_sys::Win32::System::Threading::{
1120        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1121    };
1122
1123    unsafe {
1124        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1125        if handle.is_null() {
1126            return false;
1127        }
1128        let mut exit_code: u32 = 0;
1129        let success = GetExitCodeProcess(handle, &mut exit_code);
1130        CloseHandle(handle);
1131        success != 0 && exit_code == STILL_ACTIVE as u32
1132    }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use super::*;
1138
1139    #[test]
1140    fn build_node_args_basic() {
1141        let config = NodeConfig {
1142            id: 1,
1143            service_name: "node1".to_string(),
1144            rewards_address: "0xabc123".to_string(),
1145            data_dir: "/data/node-1".into(),
1146            log_dir: Some("/logs/node-1".into()),
1147            node_port: Some(12000),
1148            metrics_port: Some(13000),
1149            network_id: Some(1),
1150            binary_path: "/bin/node".into(),
1151            version: "0.1.0".to_string(),
1152            env_variables: HashMap::new(),
1153            bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1154        };
1155
1156        let args = build_node_args(&config);
1157
1158        assert!(args.contains(&"--rewards-address".to_string()));
1159        assert!(args.contains(&"0xabc123".to_string()));
1160        assert!(args.contains(&"--root-dir".to_string()));
1161        assert!(args.contains(&"/data/node-1".to_string()));
1162        assert!(args.contains(&"--enable-logging".to_string()));
1163        assert!(args.contains(&"--log-dir".to_string()));
1164        assert!(args.contains(&"/logs/node-1".to_string()));
1165        assert!(args.contains(&"--port".to_string()));
1166        assert!(args.contains(&"12000".to_string()));
1167        assert!(args.contains(&"--metrics-port".to_string()));
1168        assert!(args.contains(&"13000".to_string()));
1169        assert!(args.contains(&"--bootstrap".to_string()));
1170        assert!(args.contains(&"peer1".to_string()));
1171        assert!(args.contains(&"peer2".to_string()));
1172        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1173    }
1174
1175    #[test]
1176    fn build_node_args_minimal() {
1177        let config = NodeConfig {
1178            id: 1,
1179            service_name: "node1".to_string(),
1180            rewards_address: "0xabc".to_string(),
1181            data_dir: "/data/node-1".into(),
1182            log_dir: None,
1183            node_port: None,
1184            metrics_port: None,
1185            network_id: None,
1186            binary_path: "/bin/node".into(),
1187            version: "0.1.0".to_string(),
1188            env_variables: HashMap::new(),
1189            bootstrap_peers: vec![],
1190        };
1191
1192        let args = build_node_args(&config);
1193
1194        assert!(args.contains(&"--rewards-address".to_string()));
1195        assert!(args.contains(&"--root-dir".to_string()));
1196        assert!(!args.contains(&"--enable-logging".to_string()));
1197        assert!(!args.contains(&"--log-dir".to_string()));
1198        assert!(!args.contains(&"--port".to_string()));
1199        assert!(!args.contains(&"--metrics-port".to_string()));
1200        assert!(!args.contains(&"--bootstrap".to_string()));
1201        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1202    }
1203
1204    #[test]
1205    fn record_crash_backoff_increases() {
1206        let (tx, _rx) = broadcast::channel(16);
1207        let mut sup = Supervisor::new(tx);
1208
1209        // Insert a running node
1210        sup.node_states.insert(
1211            1,
1212            NodeRuntime {
1213                status: NodeStatus::Running,
1214                pid: Some(100),
1215                started_at: Some(Instant::now()),
1216                restart_count: 0,
1217                first_crash_at: None,
1218                pending_version: None,
1219            },
1220        );
1221
1222        let (should_restart, attempt, backoff) = sup.record_crash(1);
1223        assert!(should_restart);
1224        assert_eq!(attempt, 1);
1225        assert_eq!(backoff, Duration::from_secs(1));
1226
1227        let (should_restart, attempt, backoff) = sup.record_crash(1);
1228        assert!(should_restart);
1229        assert_eq!(attempt, 2);
1230        assert_eq!(backoff, Duration::from_secs(2));
1231
1232        let (should_restart, attempt, backoff) = sup.record_crash(1);
1233        assert!(should_restart);
1234        assert_eq!(attempt, 3);
1235        assert_eq!(backoff, Duration::from_secs(4));
1236
1237        let (should_restart, attempt, backoff) = sup.record_crash(1);
1238        assert!(should_restart);
1239        assert_eq!(attempt, 4);
1240        assert_eq!(backoff, Duration::from_secs(8));
1241
1242        // 5th crash within window → errored
1243        let (should_restart, attempt, _) = sup.record_crash(1);
1244        assert!(!should_restart);
1245        assert_eq!(attempt, 5);
1246        assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1247    }
1248
1249    #[test]
1250    fn node_counts_tracks_states() {
1251        let (tx, _rx) = broadcast::channel(16);
1252        let mut sup = Supervisor::new(tx);
1253
1254        sup.node_states.insert(
1255            1,
1256            NodeRuntime {
1257                status: NodeStatus::Running,
1258                pid: Some(100),
1259                started_at: Some(Instant::now()),
1260                restart_count: 0,
1261                first_crash_at: None,
1262                pending_version: None,
1263            },
1264        );
1265        sup.node_states.insert(
1266            2,
1267            NodeRuntime {
1268                status: NodeStatus::Stopped,
1269                pid: None,
1270                started_at: None,
1271                restart_count: 0,
1272                first_crash_at: None,
1273                pending_version: None,
1274            },
1275        );
1276        sup.node_states.insert(
1277            3,
1278            NodeRuntime {
1279                status: NodeStatus::Errored,
1280                pid: None,
1281                started_at: None,
1282                restart_count: 5,
1283                first_crash_at: None,
1284                pending_version: None,
1285            },
1286        );
1287
1288        let (running, stopped, errored) = sup.node_counts();
1289        assert_eq!(running, 1);
1290        assert_eq!(stopped, 1);
1291        assert_eq!(errored, 1);
1292    }
1293
1294    #[test]
1295    fn mark_upgrade_scheduled_only_affects_running_nodes() {
1296        let (tx, mut rx) = broadcast::channel(16);
1297        let mut sup = Supervisor::new(tx);
1298
1299        sup.node_states.insert(
1300            1,
1301            NodeRuntime {
1302                status: NodeStatus::Running,
1303                pid: Some(111),
1304                started_at: Some(Instant::now()),
1305                restart_count: 0,
1306                first_crash_at: None,
1307                pending_version: None,
1308            },
1309        );
1310        sup.node_states.insert(
1311            2,
1312            NodeRuntime {
1313                status: NodeStatus::Stopped,
1314                pid: None,
1315                started_at: None,
1316                restart_count: 0,
1317                first_crash_at: None,
1318                pending_version: None,
1319            },
1320        );
1321
1322        // Running node: transitions to UpgradeScheduled with pending_version set and event fires.
1323        let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1324        assert!(affected);
1325        assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1326        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1327        match rx.try_recv() {
1328            Ok(NodeEvent::UpgradeScheduled {
1329                node_id,
1330                pending_version,
1331            }) => {
1332                assert_eq!(node_id, 1);
1333                assert_eq!(pending_version, "0.10.11-rc.1");
1334            }
1335            other => panic!("expected UpgradeScheduled event, got {other:?}"),
1336        }
1337
1338        // Stopped node: untouched, no event fired.
1339        let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1340        assert!(!affected);
1341        assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1342        assert!(sup.node_pending_version(2).is_none());
1343
1344        // Already-UpgradeScheduled node: calling again is a no-op.
1345        let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1346        assert!(!affected);
1347        // Pending version is the original one set.
1348        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1349    }
1350
1351    #[test]
1352    fn node_counts_counts_upgrade_scheduled_as_running() {
1353        let (tx, _rx) = broadcast::channel(16);
1354        let mut sup = Supervisor::new(tx);
1355
1356        sup.node_states.insert(
1357            1,
1358            NodeRuntime {
1359                status: NodeStatus::UpgradeScheduled,
1360                pid: Some(111),
1361                started_at: Some(Instant::now()),
1362                restart_count: 0,
1363                first_crash_at: None,
1364                pending_version: Some("0.10.11-rc.1".to_string()),
1365            },
1366        );
1367
1368        let (running, stopped, errored) = sup.node_counts();
1369        assert_eq!(running, 1);
1370        assert_eq!(stopped, 0);
1371        assert_eq!(errored, 0);
1372    }
1373
1374    #[tokio::test]
1375    async fn stop_node_not_found() {
1376        let (tx, _rx) = broadcast::channel(16);
1377        let mut sup = Supervisor::new(tx);
1378
1379        let result = sup.stop_node(999).await;
1380        assert!(matches!(result, Err(Error::NodeNotFound(999))));
1381    }
1382
1383    #[tokio::test]
1384    async fn stop_node_not_running() {
1385        let (tx, _rx) = broadcast::channel(16);
1386        let mut sup = Supervisor::new(tx);
1387
1388        sup.node_states.insert(
1389            1,
1390            NodeRuntime {
1391                status: NodeStatus::Stopped,
1392                pid: None,
1393                started_at: None,
1394                restart_count: 0,
1395                first_crash_at: None,
1396                pending_version: None,
1397            },
1398        );
1399
1400        let result = sup.stop_node(1).await;
1401        assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1402    }
1403
1404    #[tokio::test]
1405    async fn stop_all_nodes_mixed_states() {
1406        let (tx, _rx) = broadcast::channel(16);
1407        let mut sup = Supervisor::new(tx);
1408
1409        // Node 1: running (but with a fake PID that won't exist)
1410        sup.node_states.insert(
1411            1,
1412            NodeRuntime {
1413                status: NodeStatus::Running,
1414                pid: Some(999999),
1415                started_at: Some(Instant::now()),
1416                restart_count: 0,
1417                first_crash_at: None,
1418                pending_version: None,
1419            },
1420        );
1421        // Node 2: already stopped
1422        sup.node_states.insert(
1423            2,
1424            NodeRuntime {
1425                status: NodeStatus::Stopped,
1426                pid: None,
1427                started_at: None,
1428                restart_count: 0,
1429                first_crash_at: None,
1430                pending_version: None,
1431            },
1432        );
1433
1434        let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
1435
1436        let result = sup.stop_all_nodes(&configs).await;
1437
1438        assert_eq!(result.stopped.len(), 1);
1439        assert_eq!(result.stopped[0].node_id, 1);
1440        assert_eq!(result.stopped[0].service_name, "node1");
1441        assert_eq!(result.already_stopped, vec![2]);
1442        assert!(result.failed.is_empty());
1443    }
1444}