Skip to main content

ant_core/node/daemon/
supervisor.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio_util::sync::CancellationToken;
8
9use crate::error::{Error, Result};
10use crate::node::binary::extract_version;
11use crate::node::events::NodeEvent;
12use crate::node::process::spawn::spawn_node;
13use crate::node::registry::NodeRegistry;
14use crate::node::types::{
15    NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
16};
17
18/// How often the upgrade-detection task polls each running node's binary for a version change.
19pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
20
21/// How often the liveness poll verifies that each Running node's OS process still exists.
22///
23/// Nodes the current daemon spawned are watched via their owned `Child` handle in
24/// `monitor_node`, so this poll exists purely to catch exits of nodes adopted across
25/// a daemon restart (whose `Child` handle died with the previous daemon). Five seconds
26/// is a rough trade-off: long enough that the syscall cost is negligible, short enough
27/// that a crashed adopted node still looks broken to the user within a few heartbeats.
28pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
29
30/// Path of the pid file a running node writes to so a future daemon instance can
31/// adopt it across restarts. Lives alongside the node's other on-disk state.
32fn node_pid_file(data_dir: &Path) -> PathBuf {
33    data_dir.join("node.pid")
34}
35
36/// Persist the running node's PID to `<data_dir>/node.pid`. Best-effort: a failure
37/// here only costs us the ability to adopt the node after a daemon restart, so we
38/// warn and continue rather than aborting the start.
39fn write_node_pid(data_dir: &Path, pid: u32) {
40    let path = node_pid_file(data_dir);
41    if let Err(e) = std::fs::write(&path, pid.to_string()) {
42        tracing::warn!(
43            "Failed to write node pid file at {}: {e}. Node will still run, but a future \
44             daemon restart will not be able to adopt it.",
45            path.display()
46        );
47    }
48}
49
50/// Remove the pid file. Called on every terminal-exit path in `monitor_node` so the
51/// next daemon doesn't try to adopt a PID belonging to a process that's gone.
52fn remove_node_pid(data_dir: &Path) {
53    let _ = std::fs::remove_file(node_pid_file(data_dir));
54}
55
56/// Read the pid file without validating liveness. Returns `None` if the file is
57/// missing or its contents can't be parsed as a u32.
58fn read_node_pid(data_dir: &Path) -> Option<u32> {
59    std::fs::read_to_string(node_pid_file(data_dir))
60        .ok()
61        .and_then(|s| s.trim().parse().ok())
62}
63
64/// Scan the OS process table for a running node that matches `config`, as a
65/// fallback for when `<data_dir>/node.pid` is missing or stale.
66///
67/// Nodes spawned by a pre-adoption daemon never had a pid file written, so
68/// without this scan the first restart after installing the adoption fix
69/// would leave every previously-running node classified as Stopped. The scan
70/// matches on:
71///
72/// - executable path identical to `config.binary_path`, AND
73/// - command line containing `--root-dir` (as a standalone arg or
74///   `--root-dir=<path>`) whose value resolves to `config.data_dir`.
75///
76/// The double match keeps us safe when multiple nodes share the same binary
77/// on disk (common on installs where one copy services several data dirs).
78///
79/// Returns `None` if no running process matches.
80fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
81    let target_data_dir = config.data_dir.as_path();
82    for (pid, process) in sys.processes() {
83        // On Linux, `sys.processes()` enumerates /proc/<pid>/task/<tid> too, so
84        // worker threads appear alongside their thread-group leader and share
85        // the same exe + cmdline. Skip threads — we want the TGID (the real
86        // process), which is the only PID safe to signal.
87        if process.thread_kind().is_some() {
88            continue;
89        }
90        let Some(exe) = process.exe() else {
91            continue;
92        };
93        if exe != config.binary_path.as_path() {
94            continue;
95        }
96
97        let cmd = process.cmd();
98        let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
99            let arg = arg.to_string_lossy();
100            if let Some(value) = arg.strip_prefix("--root-dir=") {
101                Path::new(value) == target_data_dir
102            } else if arg == "--root-dir" {
103                cmd.get(i + 1)
104                    .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
105                    .unwrap_or(false)
106            } else {
107                false
108            }
109        });
110
111        if matches_root_dir {
112            return Some(pid.as_u32());
113        }
114    }
115    None
116}
117
118/// Check whether `pid` refers to a live, non-thread process. On Linux,
119/// `kill(tid, 0)` returns success for any thread's TID, not just the
120/// thread-group leader — so liveness alone is not enough to trust a PID
121/// loaded from the pid file. Consulting sysinfo's `thread_kind()` tells us
122/// whether the entry is a userland thread (TID) vs. the actual process
123/// (TGID). A missing sysinfo entry with a live PID is still treated as a
124/// process, since older daemons could have written the PID before sysinfo
125/// saw it.
126fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
127    if !is_process_alive(pid) {
128        return false;
129    }
130    match sys.process(sysinfo::Pid::from_u32(pid)) {
131        Some(process) => process.thread_kind().is_none(),
132        None => true,
133    }
134}
135
136/// Determine the PID to adopt for a node, trying the pid file first and
137/// falling back to a process-table scan. On successful scan, writes the pid
138/// file so the next adoption takes the fast path.
139///
140/// Returns `None` if no live process can be attributed to this node.
141fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
142    if let Some(pid) = read_node_pid(&config.data_dir) {
143        if pid_is_live_process(pid, sys) {
144            return Some(pid);
145        }
146        // Pid file points at a dead process or a thread TID (legacy daemons
147        // could record a TID because the fallback scan saw threads). Don't
148        // leave it around to mislead the next adoption pass.
149        remove_node_pid(&config.data_dir);
150    }
151
152    let pid = find_running_node_process(sys, config)?;
153    write_node_pid(&config.data_dir, pid);
154    Some(pid)
155}
156
157/// Build an `Instant` that reports the real process start time when
158/// `.elapsed()` is called on it — so uptime survives daemon restarts
159/// accurately for adopted nodes.
160///
161/// `sysinfo::Process::start_time()` returns seconds since the UNIX epoch
162/// (wall clock). `Instant` is monotonic and can't be constructed from a
163/// wall-clock value directly, so we back-date `Instant::now()` by the
164/// process's age. Returns `None` if the PID isn't in the snapshot (the
165/// process exited between scan and this call), if the system clock looks
166/// broken, or if subtraction would overflow (unrealistically-old process
167/// start times).
168fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
169    let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
170    let now_secs = std::time::SystemTime::now()
171        .duration_since(std::time::UNIX_EPOCH)
172        .ok()?
173        .as_secs();
174    let age = now_secs.saturating_sub(start_secs);
175    Instant::now().checked_sub(Duration::from_secs(age))
176}
177
178/// Maximum restart attempts before marking a node as errored.
179const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
180
181/// Window in which crashes are counted. If this many crashes happen within
182/// this duration, the node is marked errored.
183const CRASH_WINDOW: Duration = Duration::from_secs(300); // 5 minutes
184
185/// If a node runs for this long without crashing, reset the crash counter.
186const STABLE_DURATION: Duration = Duration::from_secs(300); // 5 minutes
187
188/// Maximum backoff delay between restarts.
189const MAX_BACKOFF: Duration = Duration::from_secs(60);
190
191/// Manages running node processes. Holds child process handles and runtime state.
192pub struct Supervisor {
193    event_tx: broadcast::Sender<NodeEvent>,
194    /// Runtime status of each node, keyed by node ID.
195    node_states: HashMap<u32, NodeRuntime>,
196}
197
198struct NodeRuntime {
199    status: NodeStatus,
200    pid: Option<u32>,
201    started_at: Option<Instant>,
202    restart_count: u32,
203    first_crash_at: Option<Instant>,
204    /// When `status == UpgradeScheduled`, the target version the on-disk binary now reports.
205    pending_version: Option<String>,
206}
207
208impl Supervisor {
209    pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
210        Self {
211            event_tx,
212            node_states: HashMap::new(),
213        }
214    }
215
216    /// Start a node by spawning the actual process.
217    ///
218    /// Returns `NodeStarted` on success. Spawns a background monitoring task
219    /// that watches the child process and handles restart logic.
220    pub async fn start_node(
221        &mut self,
222        config: &NodeConfig,
223        supervisor_ref: Arc<RwLock<Supervisor>>,
224        registry_ref: Arc<RwLock<NodeRegistry>>,
225    ) -> Result<NodeStarted> {
226        let node_id = config.id;
227
228        if let Some(state) = self.node_states.get(&node_id) {
229            if state.status == NodeStatus::Running {
230                return Err(Error::NodeAlreadyRunning(node_id));
231            }
232        }
233
234        let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
235
236        let mut child = spawn_node_from_config(config).await?;
237        let pid = child
238            .id()
239            .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
240
241        // Brief health check: give the process a moment to start, then check if it
242        // exited immediately. This catches errors like invalid CLI arguments or missing
243        // shared libraries. We use timeout + wait() rather than try_wait() because
244        // tokio's child reaper requires the wait future to be polled.
245        match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
246            Ok(Ok(exit_status)) => {
247                // Process already exited — read stderr for details.
248                // spawn_node always redirects stderr to a file in the log dir
249                // (falling back to data_dir when no log dir is configured).
250                let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
251                let stderr_path = spawn_log_dir.join("stderr.log");
252                let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
253                let detail = if stderr_msg.trim().is_empty() {
254                    format!("exit code: {exit_status}")
255                } else {
256                    stderr_msg.trim().to_string()
257                };
258                self.node_states.insert(
259                    node_id,
260                    NodeRuntime {
261                        status: NodeStatus::Errored,
262                        pid: None,
263                        started_at: None,
264                        restart_count: 0,
265                        first_crash_at: None,
266                        pending_version: None,
267                    },
268                );
269                return Err(Error::ProcessSpawn(format!(
270                    "Node {node_id} exited immediately: {detail}"
271                )));
272            }
273            Ok(Err(e)) => {
274                return Err(Error::ProcessSpawn(format!(
275                    "Failed to check node process status: {e}"
276                )));
277            }
278            Err(_) => {} // Timeout — process is still running after 1s, good
279        }
280
281        self.node_states.insert(
282            node_id,
283            NodeRuntime {
284                status: NodeStatus::Running,
285                pid: Some(pid),
286                started_at: Some(Instant::now()),
287                restart_count: 0,
288                first_crash_at: None,
289                pending_version: None,
290            },
291        );
292
293        let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
294
295        let result = NodeStarted {
296            node_id,
297            service_name: config.service_name.clone(),
298            pid,
299        };
300
301        // Spawn monitoring task
302        let event_tx = self.event_tx.clone();
303        let config = config.clone();
304        tokio::spawn(async move {
305            monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
306        });
307
308        Ok(result)
309    }
310
311    /// Stop a node by gracefully terminating its process.
312    ///
313    /// Sends SIGTERM (Unix) or kills (Windows), waits up to 10 seconds for exit,
314    /// then sends SIGKILL if needed. The monitor task detects the Stopping status
315    /// and exits cleanly without attempting a restart.
316    pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
317        let state = self
318            .node_states
319            .get_mut(&node_id)
320            .ok_or(Error::NodeNotFound(node_id))?;
321
322        if state.status != NodeStatus::Running {
323            return Err(Error::NodeNotRunning(node_id));
324        }
325
326        let pid = state.pid;
327
328        let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
329        state.status = NodeStatus::Stopping;
330
331        if let Some(pid) = pid {
332            graceful_kill(pid).await;
333        }
334
335        // Update state after kill
336        let state = self.node_states.get_mut(&node_id).unwrap();
337        state.status = NodeStatus::Stopped;
338        state.pid = None;
339        state.started_at = None;
340
341        let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
342
343        Ok(())
344    }
345
346    /// Stop all running nodes, returning an aggregate result.
347    pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
348        let mut stopped = Vec::new();
349        let mut failed = Vec::new();
350        let mut already_stopped = Vec::new();
351
352        for (node_id, service_name) in configs {
353            let node_id = *node_id;
354            match self.node_status(node_id) {
355                Ok(NodeStatus::Running) => {}
356                Ok(_) => {
357                    already_stopped.push(node_id);
358                    continue;
359                }
360                Err(_) => {
361                    already_stopped.push(node_id);
362                    continue;
363                }
364            }
365
366            match self.stop_node(node_id).await {
367                Ok(()) => {
368                    stopped.push(NodeStopped {
369                        node_id,
370                        service_name: service_name.clone(),
371                    });
372                }
373                Err(Error::NodeNotRunning(_)) => {
374                    already_stopped.push(node_id);
375                }
376                Err(e) => {
377                    failed.push(NodeStopFailed {
378                        node_id,
379                        service_name: service_name.clone(),
380                        error: e.to_string(),
381                    });
382                }
383            }
384        }
385
386        StopNodeResult {
387            stopped,
388            failed,
389            already_stopped,
390        }
391    }
392
393    /// Get the status of a node.
394    pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
395        self.node_states
396            .get(&node_id)
397            .map(|s| s.status)
398            .ok_or(Error::NodeNotFound(node_id))
399    }
400
401    /// Get the PID of a running node.
402    pub fn node_pid(&self, node_id: u32) -> Option<u32> {
403        self.node_states.get(&node_id).and_then(|s| s.pid)
404    }
405
406    /// Get the uptime of a running node in seconds.
407    pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
408        self.node_states
409            .get(&node_id)
410            .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
411    }
412
413    /// The target version when the node is in `UpgradeScheduled` state, otherwise `None`.
414    pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
415        self.node_states
416            .get(&node_id)
417            .and_then(|s| s.pending_version.clone())
418    }
419
420    /// Transition a Running node into `UpgradeScheduled` with the target version.
421    ///
422    /// Only affects nodes currently in `Running`: any other state is left alone (a stopped
423    /// node legitimately has an out-of-date binary; a node already in UpgradeScheduled has
424    /// already been marked). Returns `true` if the transition happened.
425    fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
426        let Some(state) = self.node_states.get_mut(&node_id) else {
427            return false;
428        };
429        if state.status != NodeStatus::Running {
430            return false;
431        }
432        state.status = NodeStatus::UpgradeScheduled;
433        state.pending_version = Some(pending_version.clone());
434        let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
435            node_id,
436            pending_version,
437        });
438        true
439    }
440
441    /// Check whether a node is running.
442    pub fn is_running(&self, node_id: u32) -> bool {
443        self.node_states
444            .get(&node_id)
445            .is_some_and(|s| s.status == NodeStatus::Running)
446    }
447
448    /// Get counts of nodes in each state: (running, stopped, errored).
449    pub fn node_counts(&self) -> (u32, u32, u32) {
450        let mut running = 0u32;
451        let mut stopped = 0u32;
452        let mut errored = 0u32;
453        for state in self.node_states.values() {
454            match state.status {
455                // UpgradeScheduled means the process is still running; count it with running.
456                NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
457                    running += 1
458                }
459                NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
460                NodeStatus::Errored => errored += 1,
461            }
462        }
463        (running, stopped, errored)
464    }
465
466    /// Update the runtime state for a node (used by the monitor task).
467    fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
468        if let Some(state) = self.node_states.get_mut(&node_id) {
469            state.status = status;
470            state.pid = pid;
471            if status == NodeStatus::Running {
472                state.started_at = Some(Instant::now());
473            } else {
474                // Clear uptime tracking for non-running states so status
475                // responses don't report a stale `uptime_secs` after the node
476                // exits (e.g. liveness monitor detecting an external kill).
477                state.started_at = None;
478            }
479        }
480    }
481
482    /// Restore running-node state from a previous daemon instance.
483    ///
484    /// For each registered node, determines the PID to adopt via
485    /// `resolve_adopted_pid`: try `<data_dir>/node.pid` first, and if it's
486    /// missing or stale, fall back to a process-table scan matching the
487    /// node's binary path and `--root-dir` argument. Live matches are
488    /// inserted into `node_states` as `Running`.
489    ///
490    /// The scan is what covers the upgrade path: nodes spawned by a
491    /// pre-adoption daemon never had a pid file written, so without the
492    /// fallback the first restart after installing this fix would still
493    /// leave every previously-running node classified as Stopped.
494    ///
495    /// Must be called before the HTTP server starts accepting requests —
496    /// the window between `Supervisor::new` and adoption is where the API
497    /// would otherwise report live nodes as Stopped. Adopted nodes have no
498    /// associated `monitor_node` task (the `tokio::process::Child` handle
499    /// belonged to the previous daemon, and `tokio::process::Child::wait`
500    /// only works for the process's actual parent). Their exits are
501    /// detected instead by the `spawn_liveness_monitor` polling task.
502    ///
503    /// Returns the list of node IDs that were adopted.
504    pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
505        // Populated upfront so every adopted node gets its real start time via
506        // `process_started_at`, not just those that went through the scan
507        // fallback. The extra ~50 ms at daemon startup is a one-time cost
508        // that's cheaper than users seeing uptime reset every time the daemon
509        // restarts.
510        let mut sys = sysinfo::System::new();
511        sys.refresh_processes_specifics(
512            sysinfo::ProcessesToUpdate::All,
513            true,
514            sysinfo::ProcessRefreshKind::everything(),
515        );
516
517        let mut adopted = Vec::new();
518        for config in registry.list() {
519            let Some(pid) = resolve_adopted_pid(config, &sys) else {
520                continue;
521            };
522            self.node_states.insert(
523                config.id,
524                NodeRuntime {
525                    status: NodeStatus::Running,
526                    pid: Some(pid),
527                    // Back-date to the real process start time so uptime
528                    // reported to the API is wall-clock accurate across
529                    // daemon restarts. Falls back to `Instant::now()` only
530                    // if sysinfo can't report the start time (PID raced out
531                    // of the snapshot, or a broken clock) — better to show
532                    // uptime counting from adoption than to claim the node
533                    // is Stopped.
534                    started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
535                    restart_count: 0,
536                    first_crash_at: None,
537                    pending_version: None,
538                },
539            );
540            let _ = self.event_tx.send(NodeEvent::NodeStarted {
541                node_id: config.id,
542                pid,
543            });
544            adopted.push(config.id);
545        }
546        adopted
547    }
548
549    /// Record a crash and determine if the node should be restarted or marked errored.
550    /// Returns (should_restart, attempt_number, backoff_duration).
551    fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
552        let state = match self.node_states.get_mut(&node_id) {
553            Some(s) => s,
554            None => return (false, 0, Duration::ZERO),
555        };
556
557        let now = Instant::now();
558
559        // Check if we were stable long enough to reset crash counter
560        if let Some(started_at) = state.started_at {
561            if started_at.elapsed() >= STABLE_DURATION {
562                state.restart_count = 0;
563                state.first_crash_at = None;
564            }
565        }
566
567        state.restart_count += 1;
568        let attempt = state.restart_count;
569
570        if state.first_crash_at.is_none() {
571            state.first_crash_at = Some(now);
572        }
573
574        // Check if too many crashes in the window
575        if let Some(first_crash) = state.first_crash_at {
576            if attempt >= MAX_CRASHES_BEFORE_ERRORED
577                && now.duration_since(first_crash) < CRASH_WINDOW
578            {
579                state.status = NodeStatus::Errored;
580                state.pid = None;
581                state.started_at = None;
582                return (false, attempt, Duration::ZERO);
583            }
584        }
585
586        // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s cap
587        let backoff_secs = 1u64 << (attempt - 1).min(5);
588        let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
589
590        (true, attempt, backoff)
591    }
592}
593
594/// Periodically probe each Running node's on-disk binary for a version change.
595///
596/// When a node's binary-on-disk reports a different version than was recorded in the registry
597/// at `ant node add` time, ant-node has replaced the binary in place as part of its auto-upgrade
598/// flow and will restart the process shortly. We flip the node to `UpgradeScheduled` with the
599/// target version, which lets `ant node status` render the in-between state and lets
600/// `monitor_node` reclassify the upcoming clean exit as an expected restart rather than a crash.
601///
602/// The task exits when `shutdown` is cancelled.
603pub fn spawn_upgrade_monitor(
604    registry: Arc<RwLock<NodeRegistry>>,
605    supervisor: Arc<RwLock<Supervisor>>,
606    interval: Duration,
607    shutdown: CancellationToken,
608) {
609    tokio::spawn(async move {
610        let mut ticker = tokio::time::interval(interval);
611        // Skip the immediate first tick — we don't want to probe while nodes are still in the
612        // Starting -> Running transition.
613        ticker.tick().await;
614
615        loop {
616            tokio::select! {
617                _ = shutdown.cancelled() => return,
618                _ = ticker.tick() => {},
619            }
620
621            // Collect a snapshot of (node_id, binary_path, recorded_version, current_pending)
622            // to release the locks before running --version subprocesses (which take time).
623            let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
624                let reg = registry.read().await;
625                let sup = supervisor.read().await;
626                reg.list()
627                    .into_iter()
628                    .filter_map(|config| match sup.node_status(config.id) {
629                        Ok(NodeStatus::Running) => Some((
630                            config.id,
631                            config.binary_path.clone(),
632                            config.version.clone(),
633                            sup.node_pending_version(config.id),
634                        )),
635                        _ => None,
636                    })
637                    .collect()
638            };
639
640            for (node_id, binary_path, recorded_version, current_pending) in candidates {
641                let observed = match extract_version(&binary_path).await {
642                    Ok(v) => v,
643                    // Transient failures (e.g. binary mid-replacement) — skip this round.
644                    Err(_) => continue,
645                };
646                if observed == recorded_version {
647                    continue;
648                }
649                if current_pending.as_deref() == Some(observed.as_str()) {
650                    continue;
651                }
652                supervisor
653                    .write()
654                    .await
655                    .mark_upgrade_scheduled(node_id, observed);
656            }
657        }
658    });
659}
660
661/// Build CLI arguments for the node binary from a NodeConfig.
662pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
663    let mut args = vec![
664        "--rewards-address".to_string(),
665        config.rewards_address.clone(),
666        "--root-dir".to_string(),
667        config.data_dir.display().to_string(),
668    ];
669
670    if let Some(ref log_dir) = config.log_dir {
671        args.push("--enable-logging".to_string());
672        args.push("--log-dir".to_string());
673        args.push(log_dir.display().to_string());
674    }
675
676    if let Some(port) = config.node_port {
677        args.push("--port".to_string());
678        args.push(port.to_string());
679    }
680
681    if let Some(port) = config.metrics_port {
682        args.push("--metrics-port".to_string());
683        args.push(port.to_string());
684    }
685
686    for peer in &config.bootstrap_peers {
687        args.push("--bootstrap".to_string());
688        args.push(peer.clone());
689    }
690
691    // The daemon's supervisor is the service manager. Tell ant-node not to spawn its own
692    // replacement on auto-upgrade; instead, exit cleanly and let us respawn. Without this,
693    // ant-node's default spawn-grandchild-then-exit flow races for the node's port during
694    // the parent's graceful shutdown and the grandchild fails to bind.
695    args.push("--stop-on-upgrade".to_string());
696
697    args
698}
699
700/// Spawn a node process from a NodeConfig.
701///
702/// Writes `<data_dir>/node.pid` on successful spawn so that a future daemon instance
703/// can adopt the running process via `Supervisor::adopt_from_registry`. The file is
704/// cleaned up by `monitor_node` on the node's terminal exit.
705async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
706    let args = build_node_args(config);
707    let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
708
709    let log_dir = config
710        .log_dir
711        .as_deref()
712        .unwrap_or(config.data_dir.as_path());
713
714    let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
715    if let Some(pid) = child.id() {
716        write_node_pid(&config.data_dir, pid);
717    }
718    Ok(child)
719}
720
721/// Monitor a node process. On exit, handle restart logic. On permanent exit
722/// (user stop, crash limit, errored), cleans up the pid file so a subsequent
723/// daemon restart doesn't try to adopt a dead process.
724async fn monitor_node(
725    child: tokio::process::Child,
726    mut config: NodeConfig,
727    supervisor: Arc<RwLock<Supervisor>>,
728    registry: Arc<RwLock<NodeRegistry>>,
729    event_tx: broadcast::Sender<NodeEvent>,
730) {
731    monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
732    remove_node_pid(&config.data_dir);
733}
734
735async fn monitor_node_inner(
736    mut child: tokio::process::Child,
737    config: &mut NodeConfig,
738    supervisor: Arc<RwLock<Supervisor>>,
739    registry: Arc<RwLock<NodeRegistry>>,
740    event_tx: broadcast::Sender<NodeEvent>,
741) {
742    let node_id = config.id;
743
744    loop {
745        // Wait for the process to exit
746        let exit_status = child.wait().await;
747
748        // Check whether this is a scheduled upgrade restart or an intentional stop.
749        let status_at_exit = {
750            let sup = supervisor.read().await;
751            sup.node_status(node_id).ok()
752        };
753
754        match status_at_exit {
755            Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
756            Some(NodeStatus::UpgradeScheduled) => {
757                // ant-node cleanly exited after replacing its binary in place. Respawn
758                // directly (no backoff, no crash counter) and refresh the recorded version.
759                match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
760                    Ok(new_child) => {
761                        child = new_child;
762                        continue;
763                    }
764                    Err(e) => {
765                        let _ = event_tx.send(NodeEvent::NodeErrored {
766                            node_id,
767                            message: format!("Failed to respawn after upgrade: {e}"),
768                        });
769                        let mut sup = supervisor.write().await;
770                        sup.update_state(node_id, NodeStatus::Errored, None);
771                        return;
772                    }
773                }
774            }
775            _ => {}
776        }
777
778        let exit_code = exit_status.ok().and_then(|s| s.code());
779
780        // A process-reported exit that wasn't user-initiated (Stopping was filtered above) is
781        // either an auto-upgrade (exit 0 after ant-node replaced its binary) or a crash. In
782        // neither case should the node be parked in `Stopped` — that state is reserved for
783        // intentional user stops.
784        //
785        // Distinguish upgrade from crash by checking whether the on-disk binary's version
786        // drifted from the registry. Between replacing its binary and actually exiting,
787        // ant-node can hold the process open for anywhere from seconds to minutes, depending
788        // on in-flight work and its own config. The periodic version poll will usually have
789        // flipped the node to `UpgradeScheduled` well before the exit, but when the window is
790        // short we cannot rely on that — hence this synchronous re-check here.
791        if exit_code == Some(0) {
792            if let Ok(disk_version) = extract_version(&config.binary_path).await {
793                if disk_version != config.version {
794                    {
795                        let mut sup = supervisor.write().await;
796                        sup.mark_upgrade_scheduled(node_id, disk_version.clone());
797                    }
798                    match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
799                        Ok(new_child) => {
800                            child = new_child;
801                            continue;
802                        }
803                        Err(e) => {
804                            let _ = event_tx.send(NodeEvent::NodeErrored {
805                                node_id,
806                                message: format!("Failed to respawn after upgrade: {e}"),
807                            });
808                            let mut sup = supervisor.write().await;
809                            sup.update_state(node_id, NodeStatus::Errored, None);
810                            return;
811                        }
812                    }
813                }
814            }
815            // Exit 0 but the binary didn't change — fall through to the crash / restart path.
816            // We report the crash with the exit code preserved; the crash counter guards
817            // against infinite restart loops if the process keeps exiting immediately.
818        }
819
820        // Crash (or clean exit that wasn't an upgrade)
821        let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
822
823        let (should_restart, attempt, backoff) = {
824            let mut sup = supervisor.write().await;
825            sup.record_crash(node_id)
826        };
827
828        if !should_restart {
829            let _ = event_tx.send(NodeEvent::NodeErrored {
830                node_id,
831                message: format!(
832                    "Node crashed {} times within {} seconds, giving up",
833                    MAX_CRASHES_BEFORE_ERRORED,
834                    CRASH_WINDOW.as_secs()
835                ),
836            });
837            return;
838        }
839
840        let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
841
842        tokio::time::sleep(backoff).await;
843
844        // Try to restart
845        match spawn_node_from_config(&*config).await {
846            Ok(new_child) => {
847                let pid = match new_child.id() {
848                    Some(pid) => pid,
849                    None => {
850                        // Process exited before we could read its PID
851                        let _ = event_tx.send(NodeEvent::NodeErrored {
852                            node_id,
853                            message: "Restarted process exited before PID could be read"
854                                .to_string(),
855                        });
856                        let mut sup = supervisor.write().await;
857                        sup.update_state(node_id, NodeStatus::Errored, None);
858                        return;
859                    }
860                };
861                {
862                    let mut sup = supervisor.write().await;
863                    sup.update_state(node_id, NodeStatus::Running, Some(pid));
864                }
865                let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
866                child = new_child;
867            }
868            Err(e) => {
869                let _ = event_tx.send(NodeEvent::NodeErrored {
870                    node_id,
871                    message: format!("Failed to restart node: {e}"),
872                });
873                let mut sup = supervisor.write().await;
874                sup.update_state(node_id, NodeStatus::Errored, None);
875                return;
876            }
877        }
878    }
879}
880
881/// Respawn a node whose `UpgradeScheduled` status tells us the exit was expected.
882///
883/// On success: persists the new version to the registry, updates the in-memory config clone,
884/// clears pending_version, sets status back to Running, and fires `NodeUpgraded`.
885async fn respawn_upgraded_node(
886    config: &mut NodeConfig,
887    supervisor: &Arc<RwLock<Supervisor>>,
888    registry: &Arc<RwLock<NodeRegistry>>,
889    event_tx: &broadcast::Sender<NodeEvent>,
890) -> Result<tokio::process::Child> {
891    let node_id = config.id;
892    let old_version = config.version.clone();
893
894    let new_child = spawn_node_from_config(config).await?;
895    let pid = new_child
896        .id()
897        .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
898
899    // Read the new version from the replaced binary. If this fails we still consider the respawn
900    // successful; we just don't refresh the recorded version this round.
901    let new_version = extract_version(&config.binary_path).await.ok();
902
903    if let Some(ref version) = new_version {
904        config.version = version.clone();
905        let mut reg = registry.write().await;
906        if let Ok(stored) = reg.get_mut(node_id) {
907            stored.version = version.clone();
908            let _ = reg.save();
909        }
910    }
911
912    {
913        let mut sup = supervisor.write().await;
914        if let Some(state) = sup.node_states.get_mut(&node_id) {
915            state.status = NodeStatus::Running;
916            state.pid = Some(pid);
917            state.started_at = Some(Instant::now());
918            state.pending_version = None;
919            state.restart_count = 0;
920            state.first_crash_at = None;
921        }
922    }
923
924    let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
925    if let Some(version) = new_version {
926        let _ = event_tx.send(NodeEvent::NodeUpgraded {
927            node_id,
928            old_version,
929            new_version: version,
930        });
931    }
932
933    Ok(new_child)
934}
935
936/// Timeout for graceful shutdown before force-killing.
937const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
938
939/// Send SIGTERM to a process, wait for it to exit, and SIGKILL if it doesn't.
940async fn graceful_kill(pid: u32) {
941    send_signal_term(pid);
942
943    // Poll for process exit
944    let start = Instant::now();
945    loop {
946        if !is_process_alive(pid) {
947            return;
948        }
949        if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
950            break;
951        }
952        tokio::time::sleep(Duration::from_millis(100)).await;
953    }
954
955    // Force kill if still alive
956    send_signal_kill(pid);
957
958    // Brief wait for force kill to take effect
959    for _ in 0..10 {
960        if !is_process_alive(pid) {
961            return;
962        }
963        tokio::time::sleep(Duration::from_millis(50)).await;
964    }
965}
966
967/// Poll each Running node's PID for OS liveness every `LIVENESS_POLL_INTERVAL`,
968/// flipping dead ones to `Stopped` and emitting `NodeStopped`.
969///
970/// Exists to detect exits of nodes adopted across a daemon restart
971/// (`Supervisor::adopt_from_registry`). Daemon-spawned nodes have a
972/// `monitor_node` task awaiting on the owned `Child` handle, which detects
973/// exit immediately — the poll is redundant-but-harmless for them. Adopted
974/// nodes don't have a `Child` (it died with the previous daemon), so the poll
975/// is the only way the supervisor learns that one has exited.
976///
977/// The task terminates when `shutdown` is cancelled.
978pub fn spawn_liveness_monitor(
979    registry: Arc<RwLock<NodeRegistry>>,
980    supervisor: Arc<RwLock<Supervisor>>,
981    event_tx: broadcast::Sender<NodeEvent>,
982    interval: Duration,
983    shutdown: CancellationToken,
984) {
985    tokio::spawn(async move {
986        let mut ticker = tokio::time::interval(interval);
987        loop {
988            tokio::select! {
989                _ = shutdown.cancelled() => return,
990                _ = ticker.tick() => {}
991            }
992
993            // Snapshot candidates to release locks before the per-process syscalls.
994            let candidates: Vec<(u32, u32, PathBuf)> =
995                {
996                    let sup = supervisor.read().await;
997                    let reg = registry.read().await;
998                    reg.list()
999                        .into_iter()
1000                        .filter_map(|config| {
1001                            let pid = sup.node_pid(config.id)?;
1002                            matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1003                                .then_some((config.id, pid, config.data_dir.clone()))
1004                        })
1005                        .collect()
1006                };
1007
1008            for (node_id, pid, data_dir) in candidates {
1009                if is_process_alive(pid) {
1010                    continue;
1011                }
1012                let mut sup = supervisor.write().await;
1013                // Re-check under the write lock to avoid racing with a concurrent
1014                // start/stop that flipped the state between the snapshot and now.
1015                if !matches!(sup.node_status(node_id), Ok(NodeStatus::Running)) {
1016                    continue;
1017                }
1018                sup.update_state(node_id, NodeStatus::Stopped, None);
1019                let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1020                remove_node_pid(&data_dir);
1021            }
1022        }
1023    });
1024}
1025
1026#[cfg(unix)]
1027fn pid_to_i32(pid: u32) -> Option<i32> {
1028    i32::try_from(pid).ok().filter(|&p| p > 0)
1029}
1030
1031#[cfg(unix)]
1032fn send_signal_term(pid: u32) {
1033    if let Some(pid) = pid_to_i32(pid) {
1034        unsafe {
1035            libc::kill(pid, libc::SIGTERM);
1036        }
1037    }
1038}
1039
1040#[cfg(unix)]
1041fn send_signal_kill(pid: u32) {
1042    if let Some(pid) = pid_to_i32(pid) {
1043        unsafe {
1044            libc::kill(pid, libc::SIGKILL);
1045        }
1046    }
1047}
1048
1049#[cfg(unix)]
1050fn is_process_alive(pid: u32) -> bool {
1051    let Some(pid) = pid_to_i32(pid) else {
1052        return false;
1053    };
1054    let ret = unsafe { libc::kill(pid, 0) };
1055    if ret == 0 {
1056        return true;
1057    }
1058    // EPERM means the process exists but we lack permission to signal it
1059    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1060}
1061
1062#[cfg(windows)]
1063fn send_signal_term(pid: u32) {
1064    use windows_sys::Win32::System::Console::{
1065        AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1066    };
1067
1068    unsafe {
1069        // Detach from our own console (no-op if daemon has none, which is
1070        // typical since it's spawned with DETACHED_PROCESS).
1071        FreeConsole();
1072
1073        // Attach to the target process's console and send Ctrl+C
1074        if AttachConsole(pid) != 0 {
1075            // Disable Ctrl+C handling so GenerateConsoleCtrlEvent doesn't
1076            // terminate us while we're attached to the node's console.
1077            SetConsoleCtrlHandler(None, 1);
1078            GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1079            // Detach from the node's console first — once detached, the
1080            // async Ctrl+C event can only reach the node, not us.
1081            FreeConsole();
1082            // Brief delay to let the event drain before re-enabling our
1083            // handler. Without this, the handler thread can process the
1084            // event between FreeConsole and SetConsoleCtrlHandler.
1085            std::thread::sleep(std::time::Duration::from_millis(50));
1086            // Restore Ctrl+C handling so `daemon run` (foreground mode)
1087            // can still be stopped via Ctrl+C / tokio::signal::ctrl_c().
1088            SetConsoleCtrlHandler(None, 0);
1089        }
1090    }
1091}
1092
1093#[cfg(windows)]
1094fn send_signal_kill(pid: u32) {
1095    use windows_sys::Win32::Foundation::CloseHandle;
1096    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1097
1098    unsafe {
1099        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1100        if !handle.is_null() {
1101            TerminateProcess(handle, 1);
1102            CloseHandle(handle);
1103        }
1104    }
1105}
1106
1107#[cfg(windows)]
1108fn is_process_alive(pid: u32) -> bool {
1109    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1110    use windows_sys::Win32::System::Threading::{
1111        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1112    };
1113
1114    unsafe {
1115        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1116        if handle.is_null() {
1117            return false;
1118        }
1119        let mut exit_code: u32 = 0;
1120        let success = GetExitCodeProcess(handle, &mut exit_code);
1121        CloseHandle(handle);
1122        success != 0 && exit_code == STILL_ACTIVE as u32
1123    }
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128    use super::*;
1129
1130    #[test]
1131    fn build_node_args_basic() {
1132        let config = NodeConfig {
1133            id: 1,
1134            service_name: "node1".to_string(),
1135            rewards_address: "0xabc123".to_string(),
1136            data_dir: "/data/node-1".into(),
1137            log_dir: Some("/logs/node-1".into()),
1138            node_port: Some(12000),
1139            metrics_port: Some(13000),
1140            network_id: Some(1),
1141            binary_path: "/bin/node".into(),
1142            version: "0.1.0".to_string(),
1143            env_variables: HashMap::new(),
1144            bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1145        };
1146
1147        let args = build_node_args(&config);
1148
1149        assert!(args.contains(&"--rewards-address".to_string()));
1150        assert!(args.contains(&"0xabc123".to_string()));
1151        assert!(args.contains(&"--root-dir".to_string()));
1152        assert!(args.contains(&"/data/node-1".to_string()));
1153        assert!(args.contains(&"--enable-logging".to_string()));
1154        assert!(args.contains(&"--log-dir".to_string()));
1155        assert!(args.contains(&"/logs/node-1".to_string()));
1156        assert!(args.contains(&"--port".to_string()));
1157        assert!(args.contains(&"12000".to_string()));
1158        assert!(args.contains(&"--metrics-port".to_string()));
1159        assert!(args.contains(&"13000".to_string()));
1160        assert!(args.contains(&"--bootstrap".to_string()));
1161        assert!(args.contains(&"peer1".to_string()));
1162        assert!(args.contains(&"peer2".to_string()));
1163        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1164    }
1165
1166    #[test]
1167    fn build_node_args_minimal() {
1168        let config = NodeConfig {
1169            id: 1,
1170            service_name: "node1".to_string(),
1171            rewards_address: "0xabc".to_string(),
1172            data_dir: "/data/node-1".into(),
1173            log_dir: None,
1174            node_port: None,
1175            metrics_port: None,
1176            network_id: None,
1177            binary_path: "/bin/node".into(),
1178            version: "0.1.0".to_string(),
1179            env_variables: HashMap::new(),
1180            bootstrap_peers: vec![],
1181        };
1182
1183        let args = build_node_args(&config);
1184
1185        assert!(args.contains(&"--rewards-address".to_string()));
1186        assert!(args.contains(&"--root-dir".to_string()));
1187        assert!(!args.contains(&"--enable-logging".to_string()));
1188        assert!(!args.contains(&"--log-dir".to_string()));
1189        assert!(!args.contains(&"--port".to_string()));
1190        assert!(!args.contains(&"--metrics-port".to_string()));
1191        assert!(!args.contains(&"--bootstrap".to_string()));
1192        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1193    }
1194
1195    #[test]
1196    fn record_crash_backoff_increases() {
1197        let (tx, _rx) = broadcast::channel(16);
1198        let mut sup = Supervisor::new(tx);
1199
1200        // Insert a running node
1201        sup.node_states.insert(
1202            1,
1203            NodeRuntime {
1204                status: NodeStatus::Running,
1205                pid: Some(100),
1206                started_at: Some(Instant::now()),
1207                restart_count: 0,
1208                first_crash_at: None,
1209                pending_version: None,
1210            },
1211        );
1212
1213        let (should_restart, attempt, backoff) = sup.record_crash(1);
1214        assert!(should_restart);
1215        assert_eq!(attempt, 1);
1216        assert_eq!(backoff, Duration::from_secs(1));
1217
1218        let (should_restart, attempt, backoff) = sup.record_crash(1);
1219        assert!(should_restart);
1220        assert_eq!(attempt, 2);
1221        assert_eq!(backoff, Duration::from_secs(2));
1222
1223        let (should_restart, attempt, backoff) = sup.record_crash(1);
1224        assert!(should_restart);
1225        assert_eq!(attempt, 3);
1226        assert_eq!(backoff, Duration::from_secs(4));
1227
1228        let (should_restart, attempt, backoff) = sup.record_crash(1);
1229        assert!(should_restart);
1230        assert_eq!(attempt, 4);
1231        assert_eq!(backoff, Duration::from_secs(8));
1232
1233        // 5th crash within window → errored
1234        let (should_restart, attempt, _) = sup.record_crash(1);
1235        assert!(!should_restart);
1236        assert_eq!(attempt, 5);
1237        assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1238    }
1239
1240    #[test]
1241    fn node_counts_tracks_states() {
1242        let (tx, _rx) = broadcast::channel(16);
1243        let mut sup = Supervisor::new(tx);
1244
1245        sup.node_states.insert(
1246            1,
1247            NodeRuntime {
1248                status: NodeStatus::Running,
1249                pid: Some(100),
1250                started_at: Some(Instant::now()),
1251                restart_count: 0,
1252                first_crash_at: None,
1253                pending_version: None,
1254            },
1255        );
1256        sup.node_states.insert(
1257            2,
1258            NodeRuntime {
1259                status: NodeStatus::Stopped,
1260                pid: None,
1261                started_at: None,
1262                restart_count: 0,
1263                first_crash_at: None,
1264                pending_version: None,
1265            },
1266        );
1267        sup.node_states.insert(
1268            3,
1269            NodeRuntime {
1270                status: NodeStatus::Errored,
1271                pid: None,
1272                started_at: None,
1273                restart_count: 5,
1274                first_crash_at: None,
1275                pending_version: None,
1276            },
1277        );
1278
1279        let (running, stopped, errored) = sup.node_counts();
1280        assert_eq!(running, 1);
1281        assert_eq!(stopped, 1);
1282        assert_eq!(errored, 1);
1283    }
1284
1285    #[test]
1286    fn mark_upgrade_scheduled_only_affects_running_nodes() {
1287        let (tx, mut rx) = broadcast::channel(16);
1288        let mut sup = Supervisor::new(tx);
1289
1290        sup.node_states.insert(
1291            1,
1292            NodeRuntime {
1293                status: NodeStatus::Running,
1294                pid: Some(111),
1295                started_at: Some(Instant::now()),
1296                restart_count: 0,
1297                first_crash_at: None,
1298                pending_version: None,
1299            },
1300        );
1301        sup.node_states.insert(
1302            2,
1303            NodeRuntime {
1304                status: NodeStatus::Stopped,
1305                pid: None,
1306                started_at: None,
1307                restart_count: 0,
1308                first_crash_at: None,
1309                pending_version: None,
1310            },
1311        );
1312
1313        // Running node: transitions to UpgradeScheduled with pending_version set and event fires.
1314        let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1315        assert!(affected);
1316        assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1317        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1318        match rx.try_recv() {
1319            Ok(NodeEvent::UpgradeScheduled {
1320                node_id,
1321                pending_version,
1322            }) => {
1323                assert_eq!(node_id, 1);
1324                assert_eq!(pending_version, "0.10.11-rc.1");
1325            }
1326            other => panic!("expected UpgradeScheduled event, got {other:?}"),
1327        }
1328
1329        // Stopped node: untouched, no event fired.
1330        let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1331        assert!(!affected);
1332        assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1333        assert!(sup.node_pending_version(2).is_none());
1334
1335        // Already-UpgradeScheduled node: calling again is a no-op.
1336        let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1337        assert!(!affected);
1338        // Pending version is the original one set.
1339        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1340    }
1341
1342    #[test]
1343    fn node_counts_counts_upgrade_scheduled_as_running() {
1344        let (tx, _rx) = broadcast::channel(16);
1345        let mut sup = Supervisor::new(tx);
1346
1347        sup.node_states.insert(
1348            1,
1349            NodeRuntime {
1350                status: NodeStatus::UpgradeScheduled,
1351                pid: Some(111),
1352                started_at: Some(Instant::now()),
1353                restart_count: 0,
1354                first_crash_at: None,
1355                pending_version: Some("0.10.11-rc.1".to_string()),
1356            },
1357        );
1358
1359        let (running, stopped, errored) = sup.node_counts();
1360        assert_eq!(running, 1);
1361        assert_eq!(stopped, 0);
1362        assert_eq!(errored, 0);
1363    }
1364
1365    #[tokio::test]
1366    async fn stop_node_not_found() {
1367        let (tx, _rx) = broadcast::channel(16);
1368        let mut sup = Supervisor::new(tx);
1369
1370        let result = sup.stop_node(999).await;
1371        assert!(matches!(result, Err(Error::NodeNotFound(999))));
1372    }
1373
1374    #[tokio::test]
1375    async fn stop_node_not_running() {
1376        let (tx, _rx) = broadcast::channel(16);
1377        let mut sup = Supervisor::new(tx);
1378
1379        sup.node_states.insert(
1380            1,
1381            NodeRuntime {
1382                status: NodeStatus::Stopped,
1383                pid: None,
1384                started_at: None,
1385                restart_count: 0,
1386                first_crash_at: None,
1387                pending_version: None,
1388            },
1389        );
1390
1391        let result = sup.stop_node(1).await;
1392        assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1393    }
1394
1395    #[tokio::test]
1396    async fn stop_all_nodes_mixed_states() {
1397        let (tx, _rx) = broadcast::channel(16);
1398        let mut sup = Supervisor::new(tx);
1399
1400        // Node 1: running (but with a fake PID that won't exist)
1401        sup.node_states.insert(
1402            1,
1403            NodeRuntime {
1404                status: NodeStatus::Running,
1405                pid: Some(999999),
1406                started_at: Some(Instant::now()),
1407                restart_count: 0,
1408                first_crash_at: None,
1409                pending_version: None,
1410            },
1411        );
1412        // Node 2: already stopped
1413        sup.node_states.insert(
1414            2,
1415            NodeRuntime {
1416                status: NodeStatus::Stopped,
1417                pid: None,
1418                started_at: None,
1419                restart_count: 0,
1420                first_crash_at: None,
1421                pending_version: None,
1422            },
1423        );
1424
1425        let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
1426
1427        let result = sup.stop_all_nodes(&configs).await;
1428
1429        assert_eq!(result.stopped.len(), 1);
1430        assert_eq!(result.stopped[0].node_id, 1);
1431        assert_eq!(result.stopped[0].service_name, "node1");
1432        assert_eq!(result.already_stopped, vec![2]);
1433        assert!(result.failed.is_empty());
1434    }
1435}