Skip to main content

ant_core/node/daemon/
supervisor.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{broadcast, RwLock};
7use tokio::time::MissedTickBehavior;
8use tokio_util::sync::CancellationToken;
9
10use crate::error::{Error, Result};
11use crate::node::binary::extract_version;
12use crate::node::events::NodeEvent;
13use crate::node::process::spawn::spawn_node;
14use crate::node::registry::NodeRegistry;
15use crate::node::types::{
16    NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
17};
18
19/// How often the upgrade-detection task polls each running node's binary for a version change.
20pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
21
22/// How often the liveness poll verifies that each Running node's OS process still exists.
23///
24/// Nodes the current daemon spawned are watched via their owned `Child` handle in
25/// `monitor_node`, so this poll exists purely to catch exits of nodes adopted across
26/// a daemon restart (whose `Child` handle died with the previous daemon). Five seconds
27/// is a rough trade-off: long enough that the syscall cost is negligible, short enough
28/// that a crashed adopted node still looks broken to the user within a few heartbeats.
29pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
30
31/// Path of the pid file a running node writes to so a future daemon instance can
32/// adopt it across restarts. Lives alongside the node's other on-disk state.
33fn node_pid_file(data_dir: &Path) -> PathBuf {
34    data_dir.join("node.pid")
35}
36
37/// Persist the running node's PID to `<data_dir>/node.pid`. Best-effort: a failure
38/// here only costs us the ability to adopt the node after a daemon restart, so we
39/// warn and continue rather than aborting the start.
40fn write_node_pid(data_dir: &Path, pid: u32) {
41    let path = node_pid_file(data_dir);
42    if let Err(e) = std::fs::write(&path, pid.to_string()) {
43        tracing::warn!(
44            "Failed to write node pid file at {}: {e}. Node will still run, but a future \
45             daemon restart will not be able to adopt it.",
46            path.display()
47        );
48    }
49}
50
51/// Remove the pid file. Called on every terminal-exit path in `monitor_node` so the
52/// next daemon doesn't try to adopt a PID belonging to a process that's gone.
53fn remove_node_pid(data_dir: &Path) {
54    let _ = std::fs::remove_file(node_pid_file(data_dir));
55}
56
57/// Read the pid file without validating liveness. Returns `None` if the file is
58/// missing or its contents can't be parsed as a u32.
59fn read_node_pid(data_dir: &Path) -> Option<u32> {
60    std::fs::read_to_string(node_pid_file(data_dir))
61        .ok()
62        .and_then(|s| s.trim().parse().ok())
63}
64
65/// Scan the OS process table for a running node that matches `config`, as a
66/// fallback for when `<data_dir>/node.pid` is missing or stale.
67///
68/// Nodes spawned by a pre-adoption daemon never had a pid file written, so
69/// without this scan the first restart after installing the adoption fix
70/// would leave every previously-running node classified as Stopped. The scan
71/// matches on:
72///
73/// - executable path identical to `config.binary_path`, AND
74/// - command line containing `--root-dir` (as a standalone arg or
75///   `--root-dir=<path>`) whose value resolves to `config.data_dir`.
76///
77/// The double match keeps us safe when multiple nodes share the same binary
78/// on disk (common on installs where one copy services several data dirs).
79///
80/// Returns `None` if no running process matches.
81fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
82    let target_data_dir = config.data_dir.as_path();
83    for (pid, process) in sys.processes() {
84        // On Linux, `sys.processes()` enumerates /proc/<pid>/task/<tid> too, so
85        // worker threads appear alongside their thread-group leader and share
86        // the same exe + cmdline. Skip threads — we want the TGID (the real
87        // process), which is the only PID safe to signal.
88        if process.thread_kind().is_some() {
89            continue;
90        }
91        let Some(exe) = process.exe() else {
92            continue;
93        };
94        if exe != config.binary_path.as_path() {
95            continue;
96        }
97
98        let cmd = process.cmd();
99        let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
100            let arg = arg.to_string_lossy();
101            if let Some(value) = arg.strip_prefix("--root-dir=") {
102                Path::new(value) == target_data_dir
103            } else if arg == "--root-dir" {
104                cmd.get(i + 1)
105                    .map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
106                    .unwrap_or(false)
107            } else {
108                false
109            }
110        });
111
112        if matches_root_dir {
113            return Some(pid.as_u32());
114        }
115    }
116    None
117}
118
119/// Check whether `pid` refers to a live, non-thread process. On Linux,
120/// `kill(tid, 0)` returns success for any thread's TID, not just the
121/// thread-group leader — so liveness alone is not enough to trust a PID
122/// loaded from the pid file. Consulting sysinfo's `thread_kind()` tells us
123/// whether the entry is a userland thread (TID) vs. the actual process
124/// (TGID). A missing sysinfo entry with a live PID is still treated as a
125/// process, since older daemons could have written the PID before sysinfo
126/// saw it.
127fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
128    if !is_process_alive(pid) {
129        return false;
130    }
131    match sys.process(sysinfo::Pid::from_u32(pid)) {
132        Some(process) => process.thread_kind().is_none(),
133        None => true,
134    }
135}
136
137/// Determine the PID to adopt for a node, trying the pid file first and
138/// falling back to a process-table scan. On successful scan, writes the pid
139/// file so the next adoption takes the fast path.
140///
141/// Returns `None` if no live process can be attributed to this node.
142fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
143    if let Some(pid) = read_node_pid(&config.data_dir) {
144        if pid_is_live_process(pid, sys) {
145            return Some(pid);
146        }
147        // Pid file points at a dead process or a thread TID (legacy daemons
148        // could record a TID because the fallback scan saw threads). Don't
149        // leave it around to mislead the next adoption pass.
150        remove_node_pid(&config.data_dir);
151    }
152
153    let pid = find_running_node_process(sys, config)?;
154    write_node_pid(&config.data_dir, pid);
155    Some(pid)
156}
157
158/// Build an `Instant` that reports the real process start time when
159/// `.elapsed()` is called on it — so uptime survives daemon restarts
160/// accurately for adopted nodes.
161///
162/// `sysinfo::Process::start_time()` returns seconds since the UNIX epoch
163/// (wall clock). `Instant` is monotonic and can't be constructed from a
164/// wall-clock value directly, so we back-date `Instant::now()` by the
165/// process's age. Returns `None` if the PID isn't in the snapshot (the
166/// process exited between scan and this call), if the system clock looks
167/// broken, or if subtraction would overflow (unrealistically-old process
168/// start times).
169fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
170    let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
171    let now_secs = std::time::SystemTime::now()
172        .duration_since(std::time::UNIX_EPOCH)
173        .ok()?
174        .as_secs();
175    let age = now_secs.saturating_sub(start_secs);
176    Instant::now().checked_sub(Duration::from_secs(age))
177}
178
179/// Maximum restart attempts before marking a node as errored.
180const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
181
182/// Window in which crashes are counted. If this many crashes happen within
183/// this duration, the node is marked errored.
184const CRASH_WINDOW: Duration = Duration::from_secs(300); // 5 minutes
185
186/// If a node runs for this long without crashing, reset the crash counter.
187const STABLE_DURATION: Duration = Duration::from_secs(300); // 5 minutes
188
189/// Maximum backoff delay between restarts.
190const MAX_BACKOFF: Duration = Duration::from_secs(60);
191
192/// Manages running node processes. Holds child process handles and runtime state.
193pub struct Supervisor {
194    event_tx: broadcast::Sender<NodeEvent>,
195    /// Runtime status of each node, keyed by node ID.
196    node_states: HashMap<u32, NodeRuntime>,
197    /// Nodes adopted from a previous daemon instance, which have no owning `monitor_node`
198    /// task (their `Child` handle died with the previous daemon). Exit detection and, on
199    /// auto-upgrade, respawn for these nodes happen in the liveness monitor instead. A node
200    /// leaves this set once this daemon (re)spawns it and owns a `monitor_node` for it.
201    adopted: HashSet<u32>,
202}
203
204struct NodeRuntime {
205    status: NodeStatus,
206    pid: Option<u32>,
207    started_at: Option<Instant>,
208    restart_count: u32,
209    first_crash_at: Option<Instant>,
210    /// When `status == UpgradeScheduled`, the target version the on-disk binary now reports.
211    pending_version: Option<String>,
212}
213
214impl Supervisor {
215    pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
216        Self {
217            event_tx,
218            node_states: HashMap::new(),
219            adopted: HashSet::new(),
220        }
221    }
222
223    /// Whether `node_id` was adopted from a previous daemon instance and is therefore not
224    /// backed by an owning `monitor_node` task in this daemon.
225    pub fn is_adopted(&self, node_id: u32) -> bool {
226        self.adopted.contains(&node_id)
227    }
228
229    /// Mark a node as owned by this daemon (i.e. it now has a `monitor_node` task). Clears
230    /// any adopted flag so the liveness monitor leaves its exit handling to `monitor_node`.
231    fn mark_owned(&mut self, node_id: u32) {
232        self.adopted.remove(&node_id);
233    }
234
235    /// Start a node by spawning the actual process.
236    ///
237    /// Returns `NodeStarted` on success. Spawns a background monitoring task
238    /// that watches the child process and handles restart logic.
239    pub async fn start_node(
240        &mut self,
241        config: &NodeConfig,
242        supervisor_ref: Arc<RwLock<Supervisor>>,
243        registry_ref: Arc<RwLock<NodeRegistry>>,
244    ) -> Result<NodeStarted> {
245        let node_id = config.id;
246
247        if let Some(state) = self.node_states.get(&node_id) {
248            if state.status == NodeStatus::Running {
249                return Err(Error::NodeAlreadyRunning(node_id));
250            }
251        }
252
253        let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
254
255        let mut child = spawn_node_from_config(config).await?;
256        let pid = child
257            .id()
258            .ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
259
260        // Brief health check: give the process a moment to start, then check if it
261        // exited immediately. This catches errors like invalid CLI arguments or missing
262        // shared libraries. We use timeout + wait() rather than try_wait() because
263        // tokio's child reaper requires the wait future to be polled.
264        match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
265            Ok(Ok(exit_status)) => {
266                // Process already exited — read stderr for details.
267                // spawn_node always redirects stderr to a file in the log dir
268                // (falling back to data_dir when no log dir is configured).
269                let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
270                let stderr_path = spawn_log_dir.join("stderr.log");
271                let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
272                let detail = if stderr_msg.trim().is_empty() {
273                    format!("exit code: {exit_status}")
274                } else {
275                    stderr_msg.trim().to_string()
276                };
277                self.node_states.insert(
278                    node_id,
279                    NodeRuntime {
280                        status: NodeStatus::Errored,
281                        pid: None,
282                        started_at: None,
283                        restart_count: 0,
284                        first_crash_at: None,
285                        pending_version: None,
286                    },
287                );
288                return Err(Error::ProcessSpawn(format!(
289                    "Node {node_id} exited immediately: {detail}"
290                )));
291            }
292            Ok(Err(e)) => {
293                return Err(Error::ProcessSpawn(format!(
294                    "Failed to check node process status: {e}"
295                )));
296            }
297            Err(_) => {} // Timeout — process is still running after 1s, good
298        }
299
300        self.node_states.insert(
301            node_id,
302            NodeRuntime {
303                status: NodeStatus::Running,
304                pid: Some(pid),
305                started_at: Some(Instant::now()),
306                restart_count: 0,
307                first_crash_at: None,
308                pending_version: None,
309            },
310        );
311        // This daemon now owns the process and spawns a `monitor_node` for it below, so it is
312        // no longer (or never was) an adopted node the liveness monitor must respawn.
313        self.mark_owned(node_id);
314
315        let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
316
317        let result = NodeStarted {
318            node_id,
319            service_name: config.service_name.clone(),
320            pid,
321        };
322
323        // Spawn monitoring task
324        let event_tx = self.event_tx.clone();
325        let config = config.clone();
326        tokio::spawn(async move {
327            monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
328        });
329
330        Ok(result)
331    }
332
333    /// Stop a node by gracefully terminating its process.
334    ///
335    /// Sends SIGTERM (Unix) or kills (Windows), waits up to 10 seconds for exit,
336    /// then sends SIGKILL if needed. The monitor task detects the Stopping status
337    /// and exits cleanly without attempting a restart.
338    pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
339        let state = self
340            .node_states
341            .get_mut(&node_id)
342            .ok_or(Error::NodeNotFound(node_id))?;
343
344        if state.status != NodeStatus::Running {
345            return Err(Error::NodeNotRunning(node_id));
346        }
347
348        let pid = state.pid;
349
350        let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
351        state.status = NodeStatus::Stopping;
352
353        if let Some(pid) = pid {
354            graceful_kill(pid).await;
355        }
356
357        // Update state after kill
358        let state = self.node_states.get_mut(&node_id).unwrap();
359        state.status = NodeStatus::Stopped;
360        state.pid = None;
361        state.started_at = None;
362
363        let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
364
365        Ok(())
366    }
367
368    /// Stop all running nodes, returning an aggregate result.
369    pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
370        let mut stopped = Vec::new();
371        let mut failed = Vec::new();
372        let mut already_stopped = Vec::new();
373
374        for (node_id, service_name) in configs {
375            let node_id = *node_id;
376            match self.node_status(node_id) {
377                Ok(NodeStatus::Running) => {}
378                Ok(_) => {
379                    already_stopped.push(node_id);
380                    continue;
381                }
382                Err(_) => {
383                    already_stopped.push(node_id);
384                    continue;
385                }
386            }
387
388            match self.stop_node(node_id).await {
389                Ok(()) => {
390                    stopped.push(NodeStopped {
391                        node_id,
392                        service_name: service_name.clone(),
393                    });
394                }
395                Err(Error::NodeNotRunning(_)) => {
396                    already_stopped.push(node_id);
397                }
398                Err(e) => {
399                    failed.push(NodeStopFailed {
400                        node_id,
401                        service_name: service_name.clone(),
402                        error: e.to_string(),
403                    });
404                }
405            }
406        }
407
408        StopNodeResult {
409            stopped,
410            failed,
411            already_stopped,
412        }
413    }
414
415    /// Get the status of a node.
416    pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
417        self.node_states
418            .get(&node_id)
419            .map(|s| s.status)
420            .ok_or(Error::NodeNotFound(node_id))
421    }
422
423    /// Get the PID of a running node.
424    pub fn node_pid(&self, node_id: u32) -> Option<u32> {
425        self.node_states.get(&node_id).and_then(|s| s.pid)
426    }
427
428    /// Get the uptime of a running node in seconds.
429    pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
430        self.node_states
431            .get(&node_id)
432            .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
433    }
434
435    /// The target version when the node is in `UpgradeScheduled` state, otherwise `None`.
436    pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
437        self.node_states
438            .get(&node_id)
439            .and_then(|s| s.pending_version.clone())
440    }
441
442    /// Transition a Running node into `UpgradeScheduled` with the target version.
443    ///
444    /// Only affects nodes currently in `Running`: any other state is left alone (a stopped
445    /// node legitimately has an out-of-date binary; a node already in UpgradeScheduled has
446    /// already been marked). Returns `true` if the transition happened.
447    fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
448        let Some(state) = self.node_states.get_mut(&node_id) else {
449            return false;
450        };
451        if state.status != NodeStatus::Running {
452            return false;
453        }
454        state.status = NodeStatus::UpgradeScheduled;
455        state.pending_version = Some(pending_version.clone());
456        let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
457            node_id,
458            pending_version,
459        });
460        true
461    }
462
463    /// Check whether a node is running.
464    pub fn is_running(&self, node_id: u32) -> bool {
465        self.node_states
466            .get(&node_id)
467            .is_some_and(|s| s.status == NodeStatus::Running)
468    }
469
470    /// Get counts of nodes in each state: (running, stopped, errored).
471    pub fn node_counts(&self) -> (u32, u32, u32) {
472        let mut running = 0u32;
473        let mut stopped = 0u32;
474        let mut errored = 0u32;
475        for state in self.node_states.values() {
476            match state.status {
477                // UpgradeScheduled means the process is still running; count it with running.
478                NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
479                    running += 1
480                }
481                NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
482                NodeStatus::Errored => errored += 1,
483            }
484        }
485        (running, stopped, errored)
486    }
487
488    /// Update the runtime state for a node (used by the monitor task).
489    fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
490        if let Some(state) = self.node_states.get_mut(&node_id) {
491            state.status = status;
492            state.pid = pid;
493            if status == NodeStatus::Running {
494                state.started_at = Some(Instant::now());
495            } else {
496                // Clear uptime tracking for non-running states so status
497                // responses don't report a stale `uptime_secs` after the node
498                // exits (e.g. liveness monitor detecting an external kill).
499                state.started_at = None;
500            }
501        }
502    }
503
504    /// Restore running-node state from a previous daemon instance.
505    ///
506    /// For each registered node, determines the PID to adopt via
507    /// `resolve_adopted_pid`: try `<data_dir>/node.pid` first, and if it's
508    /// missing or stale, fall back to a process-table scan matching the
509    /// node's binary path and `--root-dir` argument. Live matches are
510    /// inserted into `node_states` as `Running`.
511    ///
512    /// The scan is what covers the upgrade path: nodes spawned by a
513    /// pre-adoption daemon never had a pid file written, so without the
514    /// fallback the first restart after installing this fix would still
515    /// leave every previously-running node classified as Stopped.
516    ///
517    /// Must be called before the HTTP server starts accepting requests —
518    /// the window between `Supervisor::new` and adoption is where the API
519    /// would otherwise report live nodes as Stopped. Adopted nodes have no
520    /// associated `monitor_node` task (the `tokio::process::Child` handle
521    /// belonged to the previous daemon, and `tokio::process::Child::wait`
522    /// only works for the process's actual parent). Their exits are
523    /// detected instead by the `spawn_liveness_monitor` polling task.
524    ///
525    /// Returns the list of node IDs that were adopted.
526    pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
527        // Populated upfront so every adopted node gets its real start time via
528        // `process_started_at`, not just those that went through the scan
529        // fallback. The extra ~50 ms at daemon startup is a one-time cost
530        // that's cheaper than users seeing uptime reset every time the daemon
531        // restarts.
532        let mut sys = sysinfo::System::new();
533        sys.refresh_processes_specifics(
534            sysinfo::ProcessesToUpdate::All,
535            true,
536            sysinfo::ProcessRefreshKind::everything(),
537        );
538
539        let mut adopted = Vec::new();
540        for config in registry.list() {
541            let Some(pid) = resolve_adopted_pid(config, &sys) else {
542                continue;
543            };
544            self.node_states.insert(
545                config.id,
546                NodeRuntime {
547                    status: NodeStatus::Running,
548                    pid: Some(pid),
549                    // Back-date to the real process start time so uptime
550                    // reported to the API is wall-clock accurate across
551                    // daemon restarts. Falls back to `Instant::now()` only
552                    // if sysinfo can't report the start time (PID raced out
553                    // of the snapshot, or a broken clock) — better to show
554                    // uptime counting from adoption than to claim the node
555                    // is Stopped.
556                    started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
557                    restart_count: 0,
558                    first_crash_at: None,
559                    pending_version: None,
560                },
561            );
562            // No owning `monitor_node` exists for an adopted process (its `Child` died with the
563            // previous daemon), so flag it for the liveness monitor to handle its exit/respawn.
564            self.adopted.insert(config.id);
565            let _ = self.event_tx.send(NodeEvent::NodeStarted {
566                node_id: config.id,
567                pid,
568            });
569            adopted.push(config.id);
570        }
571        adopted
572    }
573
574    /// Record a crash and determine if the node should be restarted or marked errored.
575    /// Returns (should_restart, attempt_number, backoff_duration).
576    fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
577        let state = match self.node_states.get_mut(&node_id) {
578            Some(s) => s,
579            None => return (false, 0, Duration::ZERO),
580        };
581
582        let now = Instant::now();
583
584        // Check if we were stable long enough to reset crash counter
585        if let Some(started_at) = state.started_at {
586            if started_at.elapsed() >= STABLE_DURATION {
587                state.restart_count = 0;
588                state.first_crash_at = None;
589            }
590        }
591
592        state.restart_count += 1;
593        let attempt = state.restart_count;
594
595        if state.first_crash_at.is_none() {
596            state.first_crash_at = Some(now);
597        }
598
599        // Check if too many crashes in the window
600        if let Some(first_crash) = state.first_crash_at {
601            if attempt >= MAX_CRASHES_BEFORE_ERRORED
602                && now.duration_since(first_crash) < CRASH_WINDOW
603            {
604                state.status = NodeStatus::Errored;
605                state.pid = None;
606                state.started_at = None;
607                return (false, attempt, Duration::ZERO);
608            }
609        }
610
611        // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s cap
612        let backoff_secs = 1u64 << (attempt - 1).min(5);
613        let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
614
615        (true, attempt, backoff)
616    }
617}
618
619/// Periodically probe each Running node's on-disk binary for a version change.
620///
621/// When a node's binary-on-disk reports a different version than was recorded in the registry
622/// at `ant node add` time, ant-node has replaced the binary in place as part of its auto-upgrade
623/// flow and will restart the process shortly. We flip the node to `UpgradeScheduled` with the
624/// target version, which lets `ant node status` render the in-between state and lets
625/// `monitor_node` reclassify the upcoming clean exit as an expected restart rather than a crash.
626///
627/// The task exits when `shutdown` is cancelled.
628pub fn spawn_upgrade_monitor(
629    registry: Arc<RwLock<NodeRegistry>>,
630    supervisor: Arc<RwLock<Supervisor>>,
631    interval: Duration,
632    shutdown: CancellationToken,
633) {
634    tokio::spawn(async move {
635        let mut ticker = tokio::time::interval(interval);
636        // After a Windows sleep/hibernate the default `Burst` catch-up would fire one
637        // tick per missed interval back-to-back, producing a flood of `extract_version`
638        // subprocess spawns. `Skip` resumes on the next aligned tick instead.
639        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
640        // Skip the immediate first tick — we don't want to probe while nodes are still in the
641        // Starting -> Running transition.
642        ticker.tick().await;
643
644        loop {
645            tokio::select! {
646                _ = shutdown.cancelled() => return,
647                _ = ticker.tick() => {},
648            }
649
650            // Collect a snapshot of (node_id, binary_path, recorded_version, current_pending)
651            // to release the locks before running --version subprocesses (which take time).
652            let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
653                let reg = registry.read().await;
654                let sup = supervisor.read().await;
655                reg.list()
656                    .into_iter()
657                    .filter_map(|config| match sup.node_status(config.id) {
658                        Ok(NodeStatus::Running) => Some((
659                            config.id,
660                            config.binary_path.clone(),
661                            config.version.clone(),
662                            sup.node_pending_version(config.id),
663                        )),
664                        _ => None,
665                    })
666                    .collect()
667            };
668
669            for (node_id, binary_path, recorded_version, current_pending) in candidates {
670                let observed = match extract_version(&binary_path).await {
671                    Ok(v) => v,
672                    // Transient failures (e.g. binary mid-replacement) — skip this round.
673                    Err(_) => continue,
674                };
675                if observed == recorded_version {
676                    continue;
677                }
678                if current_pending.as_deref() == Some(observed.as_str()) {
679                    continue;
680                }
681                supervisor
682                    .write()
683                    .await
684                    .mark_upgrade_scheduled(node_id, observed);
685            }
686        }
687    });
688}
689
690/// Build CLI arguments for the node binary from a NodeConfig.
691pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
692    let mut args = vec![
693        "--rewards-address".to_string(),
694        config.rewards_address.clone(),
695        "--root-dir".to_string(),
696        config.data_dir.display().to_string(),
697    ];
698
699    if let Some(ref log_dir) = config.log_dir {
700        args.push("--enable-logging".to_string());
701        args.push("--log-dir".to_string());
702        args.push(log_dir.display().to_string());
703    }
704
705    if let Some(port) = config.node_port {
706        args.push("--port".to_string());
707        args.push(port.to_string());
708    }
709
710    if let Some(port) = config.metrics_port {
711        args.push("--metrics-port".to_string());
712        args.push(port.to_string());
713    }
714
715    for peer in &config.bootstrap_peers {
716        args.push("--bootstrap".to_string());
717        args.push(peer.clone());
718    }
719
720    if let Some(channel) = config.upgrade_channel {
721        args.push("--upgrade-channel".to_string());
722        args.push(channel.to_string());
723    }
724
725    // The daemon's supervisor is the service manager. Tell ant-node not to spawn its own
726    // replacement on auto-upgrade; instead, exit cleanly and let us respawn. Without this,
727    // ant-node's default spawn-grandchild-then-exit flow races for the node's port during
728    // the parent's graceful shutdown and the grandchild fails to bind.
729    args.push("--stop-on-upgrade".to_string());
730
731    args
732}
733
734/// Spawn a node process from a NodeConfig.
735///
736/// Writes `<data_dir>/node.pid` on successful spawn so that a future daemon instance
737/// can adopt the running process via `Supervisor::adopt_from_registry`. The file is
738/// cleaned up by `monitor_node` on the node's terminal exit.
739async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
740    let args = build_node_args(config);
741    let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
742
743    let log_dir = config
744        .log_dir
745        .as_deref()
746        .unwrap_or(config.data_dir.as_path());
747
748    let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
749    if let Some(pid) = child.id() {
750        write_node_pid(&config.data_dir, pid);
751    }
752    Ok(child)
753}
754
755/// Monitor a node process. On exit, handle restart logic. On permanent exit
756/// (user stop, crash limit, errored), cleans up the pid file so a subsequent
757/// daemon restart doesn't try to adopt a dead process.
758async fn monitor_node(
759    child: tokio::process::Child,
760    mut config: NodeConfig,
761    supervisor: Arc<RwLock<Supervisor>>,
762    registry: Arc<RwLock<NodeRegistry>>,
763    event_tx: broadcast::Sender<NodeEvent>,
764) {
765    monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
766    remove_node_pid(&config.data_dir);
767}
768
769async fn monitor_node_inner(
770    mut child: tokio::process::Child,
771    config: &mut NodeConfig,
772    supervisor: Arc<RwLock<Supervisor>>,
773    registry: Arc<RwLock<NodeRegistry>>,
774    event_tx: broadcast::Sender<NodeEvent>,
775) {
776    let node_id = config.id;
777
778    loop {
779        // Wait for the process to exit
780        let exit_status = child.wait().await;
781
782        // Check whether this is a scheduled upgrade restart or an intentional stop.
783        let status_at_exit = {
784            let sup = supervisor.read().await;
785            sup.node_status(node_id).ok()
786        };
787
788        match status_at_exit {
789            Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
790            Some(NodeStatus::UpgradeScheduled) => {
791                // ant-node cleanly exited after replacing its binary in place. Respawn
792                // directly (no backoff, no crash counter) and refresh the recorded version.
793                match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
794                    Ok(new_child) => {
795                        child = new_child;
796                        continue;
797                    }
798                    Err(e) => {
799                        let _ = event_tx.send(NodeEvent::NodeErrored {
800                            node_id,
801                            message: format!("Failed to respawn after upgrade: {e}"),
802                        });
803                        let mut sup = supervisor.write().await;
804                        sup.update_state(node_id, NodeStatus::Errored, None);
805                        return;
806                    }
807                }
808            }
809            _ => {}
810        }
811
812        let exit_code = exit_status.ok().and_then(|s| s.code());
813
814        // A process-reported exit that wasn't user-initiated (Stopping was filtered above) is
815        // either an auto-upgrade (exit 0 after ant-node replaced its binary) or a crash. In
816        // neither case should the node be parked in `Stopped` — that state is reserved for
817        // intentional user stops.
818        //
819        // Distinguish upgrade from crash by checking whether the on-disk binary's version
820        // drifted from the registry. Between replacing its binary and actually exiting,
821        // ant-node can hold the process open for anywhere from seconds to minutes, depending
822        // on in-flight work and its own config. The periodic version poll will usually have
823        // flipped the node to `UpgradeScheduled` well before the exit, but when the window is
824        // short we cannot rely on that — hence this synchronous re-check here.
825        if exit_code == Some(0) {
826            if let Ok(disk_version) = extract_version(&config.binary_path).await {
827                if disk_version != config.version {
828                    {
829                        let mut sup = supervisor.write().await;
830                        sup.mark_upgrade_scheduled(node_id, disk_version.clone());
831                    }
832                    match respawn_upgraded_node(config, &supervisor, &registry, &event_tx).await {
833                        Ok(new_child) => {
834                            child = new_child;
835                            continue;
836                        }
837                        Err(e) => {
838                            let _ = event_tx.send(NodeEvent::NodeErrored {
839                                node_id,
840                                message: format!("Failed to respawn after upgrade: {e}"),
841                            });
842                            let mut sup = supervisor.write().await;
843                            sup.update_state(node_id, NodeStatus::Errored, None);
844                            return;
845                        }
846                    }
847                }
848            }
849            // Exit 0 but the binary didn't change — fall through to the crash / restart path.
850            // We report the crash with the exit code preserved; the crash counter guards
851            // against infinite restart loops if the process keeps exiting immediately.
852        }
853
854        // Crash (or clean exit that wasn't an upgrade)
855        let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
856
857        let (should_restart, attempt, backoff) = {
858            let mut sup = supervisor.write().await;
859            sup.record_crash(node_id)
860        };
861
862        if !should_restart {
863            let _ = event_tx.send(NodeEvent::NodeErrored {
864                node_id,
865                message: format!(
866                    "Node crashed {} times within {} seconds, giving up",
867                    MAX_CRASHES_BEFORE_ERRORED,
868                    CRASH_WINDOW.as_secs()
869                ),
870            });
871            return;
872        }
873
874        let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
875
876        tokio::time::sleep(backoff).await;
877
878        // Try to restart
879        match spawn_node_from_config(&*config).await {
880            Ok(new_child) => {
881                let pid = match new_child.id() {
882                    Some(pid) => pid,
883                    None => {
884                        // Process exited before we could read its PID
885                        let _ = event_tx.send(NodeEvent::NodeErrored {
886                            node_id,
887                            message: "Restarted process exited before PID could be read"
888                                .to_string(),
889                        });
890                        let mut sup = supervisor.write().await;
891                        sup.update_state(node_id, NodeStatus::Errored, None);
892                        return;
893                    }
894                };
895                {
896                    let mut sup = supervisor.write().await;
897                    sup.update_state(node_id, NodeStatus::Running, Some(pid));
898                }
899                let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
900                child = new_child;
901            }
902            Err(e) => {
903                let _ = event_tx.send(NodeEvent::NodeErrored {
904                    node_id,
905                    message: format!("Failed to restart node: {e}"),
906                });
907                let mut sup = supervisor.write().await;
908                sup.update_state(node_id, NodeStatus::Errored, None);
909                return;
910            }
911        }
912    }
913}
914
915/// Respawn a node whose `UpgradeScheduled` status tells us the exit was expected.
916///
917/// On success: persists the new version to the registry, updates the in-memory config clone,
918/// clears pending_version, sets status back to Running, and fires `NodeUpgraded`.
919async fn respawn_upgraded_node(
920    config: &mut NodeConfig,
921    supervisor: &Arc<RwLock<Supervisor>>,
922    registry: &Arc<RwLock<NodeRegistry>>,
923    event_tx: &broadcast::Sender<NodeEvent>,
924) -> Result<tokio::process::Child> {
925    let node_id = config.id;
926    let old_version = config.version.clone();
927
928    let new_child = spawn_node_from_config(config).await?;
929    let pid = new_child
930        .id()
931        .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
932
933    // Read the new version from the replaced binary. If this fails we still consider the respawn
934    // successful; we just don't refresh the recorded version this round.
935    let new_version = extract_version(&config.binary_path).await.ok();
936
937    if let Some(ref version) = new_version {
938        config.version = version.clone();
939        let mut reg = registry.write().await;
940        if let Ok(stored) = reg.get_mut(node_id) {
941            stored.version = version.clone();
942            let _ = reg.save();
943        }
944    }
945
946    {
947        let mut sup = supervisor.write().await;
948        if let Some(state) = sup.node_states.get_mut(&node_id) {
949            state.status = NodeStatus::Running;
950            state.pid = Some(pid);
951            state.started_at = Some(Instant::now());
952            state.pending_version = None;
953            state.restart_count = 0;
954            state.first_crash_at = None;
955        }
956    }
957
958    let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
959    if let Some(version) = new_version {
960        let _ = event_tx.send(NodeEvent::NodeUpgraded {
961            node_id,
962            old_version,
963            new_version: version,
964        });
965    }
966
967    Ok(new_child)
968}
969
970/// Timeout for graceful shutdown before force-killing.
971const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
972
973/// Send SIGTERM to a process, wait for it to exit, and SIGKILL if it doesn't.
974async fn graceful_kill(pid: u32) {
975    send_signal_term(pid);
976
977    // Poll for process exit
978    let start = Instant::now();
979    loop {
980        if !is_process_alive(pid) {
981            return;
982        }
983        if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
984            break;
985        }
986        tokio::time::sleep(Duration::from_millis(100)).await;
987    }
988
989    // Force kill if still alive
990    send_signal_kill(pid);
991
992    // Brief wait for force kill to take effect
993    for _ in 0..10 {
994        if !is_process_alive(pid) {
995            return;
996        }
997        tokio::time::sleep(Duration::from_millis(50)).await;
998    }
999}
1000
1001/// Decide whether the liveness monitor should flip a node it found dead to `Stopped`.
1002///
1003/// `snapshot_pid` is the PID the sweep captured and then observed to be dead. `current_pid`
1004/// and `current_status` are the node's recorded state at the moment of the decision — which
1005/// may differ from the snapshot (e.g. an upgrade respawn replaced the PID with a live one
1006/// while leaving the status `Running`).
1007///
1008/// We only stop the node if it is still `Running` AND the recorded PID is still the one we
1009/// observed dead. The PID check is essential: between the snapshot and now, an upgrade (or
1010/// crash) respawn can have replaced the dead `snapshot_pid` with a live `current_pid` while
1011/// keeping the status `Running`. Stopping in that case would clobber a healthy, freshly
1012/// respawned process (the "running node reported as stopped after an upgrade" bug).
1013fn liveness_should_stop(
1014    snapshot_pid: u32,
1015    current_pid: Option<u32>,
1016    current_status: Option<NodeStatus>,
1017) -> bool {
1018    current_status == Some(NodeStatus::Running) && current_pid == Some(snapshot_pid)
1019}
1020
1021/// Poll each Running node's PID for OS liveness every `LIVENESS_POLL_INTERVAL`,
1022/// flipping dead ones to `Stopped` and emitting `NodeStopped`.
1023///
1024/// Exists to detect exits of nodes adopted across a daemon restart
1025/// (`Supervisor::adopt_from_registry`). Daemon-spawned nodes have a
1026/// `monitor_node` task awaiting on the owned `Child` handle, which detects
1027/// exit immediately — the poll is redundant-but-harmless for them. Adopted
1028/// nodes don't have a `Child` (it died with the previous daemon), so the poll
1029/// is the only way the supervisor learns that one has exited.
1030///
1031/// The task terminates when `shutdown` is cancelled.
1032pub fn spawn_liveness_monitor(
1033    registry: Arc<RwLock<NodeRegistry>>,
1034    supervisor: Arc<RwLock<Supervisor>>,
1035    event_tx: broadcast::Sender<NodeEvent>,
1036    interval: Duration,
1037    shutdown: CancellationToken,
1038) {
1039    tokio::spawn(async move {
1040        let mut ticker = tokio::time::interval(interval);
1041        // Don't burst-catchup after a Windows sleep/hibernate: a flood of liveness
1042        // probes serves no purpose, and uniform `Skip` policy across supervisor
1043        // monitors keeps post-wake behaviour predictable.
1044        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1045        loop {
1046            tokio::select! {
1047                _ = shutdown.cancelled() => return,
1048                _ = ticker.tick() => {}
1049            }
1050
1051            // Snapshot candidates to release locks before the per-process syscalls.
1052            let candidates: Vec<(u32, u32, PathBuf)> =
1053                {
1054                    let sup = supervisor.read().await;
1055                    let reg = registry.read().await;
1056                    reg.list()
1057                        .into_iter()
1058                        .filter_map(|config| {
1059                            let pid = sup.node_pid(config.id)?;
1060                            matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
1061                                .then_some((config.id, pid, config.data_dir.clone()))
1062                        })
1063                        .collect()
1064                };
1065
1066            for (node_id, pid, data_dir) in candidates {
1067                if is_process_alive(pid) {
1068                    continue;
1069                }
1070
1071                // Adopted nodes have no owning `monitor_node`, so this poll is their only
1072                // supervisor. If such a node's process died and the on-disk binary version has
1073                // drifted from the registry, the exit was an auto-upgrade — `--stop-on-upgrade`
1074                // expects the service manager (us) to restart it. Respawn it on the new binary
1075                // and hand it a `monitor_node`, rather than leaving it dead and flagged Stopped.
1076                if supervisor.read().await.is_adopted(node_id) {
1077                    let config = {
1078                        let reg = registry.read().await;
1079                        reg.get(node_id).ok().cloned()
1080                    };
1081                    if let Some(mut config) = config {
1082                        let drifted = matches!(
1083                            extract_version(&config.binary_path).await,
1084                            Ok(disk_version) if disk_version != config.version
1085                        );
1086                        if drifted {
1087                            match respawn_upgraded_node(
1088                                &mut config,
1089                                &supervisor,
1090                                &registry,
1091                                &event_tx,
1092                            )
1093                            .await
1094                            {
1095                                Ok(child) => {
1096                                    // Now owned by this daemon: clear the adopted flag and give
1097                                    // it a monitor_node so future exits are handled there.
1098                                    supervisor.write().await.mark_owned(node_id);
1099                                    let sup_ref = Arc::clone(&supervisor);
1100                                    let reg_ref = Arc::clone(&registry);
1101                                    let ev = event_tx.clone();
1102                                    tokio::spawn(async move {
1103                                        monitor_node(child, config, sup_ref, reg_ref, ev).await;
1104                                    });
1105                                    continue;
1106                                }
1107                                Err(e) => {
1108                                    let _ = event_tx.send(NodeEvent::NodeErrored {
1109                                        node_id,
1110                                        message: format!(
1111                                            "Failed to respawn adopted node after upgrade: {e}"
1112                                        ),
1113                                    });
1114                                    let mut sup = supervisor.write().await;
1115                                    sup.update_state(node_id, NodeStatus::Errored, None);
1116                                    sup.mark_owned(node_id);
1117                                    remove_node_pid(&data_dir);
1118                                    continue;
1119                                }
1120                            }
1121                        }
1122                    }
1123                }
1124
1125                let mut sup = supervisor.write().await;
1126                // Re-check under the write lock to avoid racing with a concurrent
1127                // start/stop that flipped the state between the snapshot and now.
1128                if !liveness_should_stop(pid, sup.node_pid(node_id), sup.node_status(node_id).ok())
1129                {
1130                    continue;
1131                }
1132                sup.update_state(node_id, NodeStatus::Stopped, None);
1133                let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
1134                remove_node_pid(&data_dir);
1135            }
1136        }
1137    });
1138}
1139
1140#[cfg(unix)]
1141fn pid_to_i32(pid: u32) -> Option<i32> {
1142    i32::try_from(pid).ok().filter(|&p| p > 0)
1143}
1144
1145#[cfg(unix)]
1146fn send_signal_term(pid: u32) {
1147    if let Some(pid) = pid_to_i32(pid) {
1148        unsafe {
1149            libc::kill(pid, libc::SIGTERM);
1150        }
1151    }
1152}
1153
1154#[cfg(unix)]
1155fn send_signal_kill(pid: u32) {
1156    if let Some(pid) = pid_to_i32(pid) {
1157        unsafe {
1158            libc::kill(pid, libc::SIGKILL);
1159        }
1160    }
1161}
1162
1163#[cfg(unix)]
1164fn is_process_alive(pid: u32) -> bool {
1165    let Some(pid) = pid_to_i32(pid) else {
1166        return false;
1167    };
1168    let ret = unsafe { libc::kill(pid, 0) };
1169    if ret == 0 {
1170        return true;
1171    }
1172    // EPERM means the process exists but we lack permission to signal it
1173    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
1174}
1175
1176#[cfg(windows)]
1177fn send_signal_term(pid: u32) {
1178    use windows_sys::Win32::System::Console::{
1179        AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
1180    };
1181
1182    unsafe {
1183        // Detach from our own console (no-op if daemon has none, which is
1184        // typical since it's spawned with DETACHED_PROCESS).
1185        FreeConsole();
1186
1187        // Attach to the target process's console and send Ctrl+C
1188        if AttachConsole(pid) != 0 {
1189            // Disable Ctrl+C handling so GenerateConsoleCtrlEvent doesn't
1190            // terminate us while we're attached to the node's console.
1191            SetConsoleCtrlHandler(None, 1);
1192            GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
1193            // Detach from the node's console first — once detached, the
1194            // async Ctrl+C event can only reach the node, not us.
1195            FreeConsole();
1196            // Brief delay to let the event drain before re-enabling our
1197            // handler. Without this, the handler thread can process the
1198            // event between FreeConsole and SetConsoleCtrlHandler.
1199            std::thread::sleep(std::time::Duration::from_millis(50));
1200            // Restore Ctrl+C handling so `daemon run` (foreground mode)
1201            // can still be stopped via Ctrl+C / tokio::signal::ctrl_c().
1202            SetConsoleCtrlHandler(None, 0);
1203        }
1204    }
1205}
1206
1207#[cfg(windows)]
1208fn send_signal_kill(pid: u32) {
1209    use windows_sys::Win32::Foundation::CloseHandle;
1210    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
1211
1212    unsafe {
1213        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
1214        if !handle.is_null() {
1215            TerminateProcess(handle, 1);
1216            CloseHandle(handle);
1217        }
1218    }
1219}
1220
1221#[cfg(windows)]
1222fn is_process_alive(pid: u32) -> bool {
1223    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
1224    use windows_sys::Win32::System::Threading::{
1225        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
1226    };
1227
1228    unsafe {
1229        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1230        if handle.is_null() {
1231            return false;
1232        }
1233        let mut exit_code: u32 = 0;
1234        let success = GetExitCodeProcess(handle, &mut exit_code);
1235        CloseHandle(handle);
1236        success != 0 && exit_code == STILL_ACTIVE as u32
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use super::*;
1243    use crate::node::types::UpgradeChannel;
1244
1245    #[test]
1246    fn adopted_flag_lifecycle() {
1247        let (tx, _rx) = broadcast::channel(16);
1248        let mut sup = Supervisor::new(tx);
1249
1250        // Nodes are not adopted by default.
1251        assert!(!sup.is_adopted(1));
1252
1253        // adopt_from_registry flags nodes carried over from a previous daemon.
1254        sup.adopted.insert(1);
1255        assert!(sup.is_adopted(1));
1256
1257        // Once this daemon (re)spawns the node and owns a monitor_node for it, the flag
1258        // clears so the liveness monitor stops treating its exit as needing a respawn.
1259        sup.mark_owned(1);
1260        assert!(!sup.is_adopted(1));
1261    }
1262
1263    // Regression test for the "running node reported as stopped after an upgrade" bug.
1264    //
1265    // A daemon-spawned node was respawned by monitor_node after an upgrade, so the recorded
1266    // state is now Running with a live PID_new. A liveness sweep that snapshotted the old,
1267    // now-dead PID then acts: it must NOT mark the node Stopped, because the running process
1268    // is the new one. `liveness_should_stop` guards against this by also requiring the recorded
1269    // PID to still match the one the sweep observed dead.
1270    #[test]
1271    fn liveness_does_not_stop_node_respawned_under_it() {
1272        let dead_snapshot_pid = 1000; // PID the sweep captured and found dead
1273        let live_respawned_pid = Some(2000); // PID_new from the upgrade respawn (alive)
1274        assert!(
1275            !liveness_should_stop(
1276                dead_snapshot_pid,
1277                live_respawned_pid,
1278                Some(NodeStatus::Running)
1279            ),
1280            "liveness must not stop a node whose PID changed under it (respawned with a live PID)"
1281        );
1282    }
1283
1284    #[test]
1285    fn build_node_args_basic() {
1286        let config = NodeConfig {
1287            id: 1,
1288            service_name: "node1".to_string(),
1289            rewards_address: "0xabc123".to_string(),
1290            data_dir: "/data/node-1".into(),
1291            log_dir: Some("/logs/node-1".into()),
1292            node_port: Some(12000),
1293            metrics_port: Some(13000),
1294            network_id: Some(1),
1295            binary_path: "/bin/node".into(),
1296            version: "0.1.0".to_string(),
1297            env_variables: HashMap::new(),
1298            bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
1299            upgrade_channel: None,
1300        };
1301
1302        let args = build_node_args(&config);
1303
1304        assert!(args.contains(&"--rewards-address".to_string()));
1305        assert!(args.contains(&"0xabc123".to_string()));
1306        assert!(args.contains(&"--root-dir".to_string()));
1307        assert!(args.contains(&"/data/node-1".to_string()));
1308        assert!(args.contains(&"--enable-logging".to_string()));
1309        assert!(args.contains(&"--log-dir".to_string()));
1310        assert!(args.contains(&"/logs/node-1".to_string()));
1311        assert!(args.contains(&"--port".to_string()));
1312        assert!(args.contains(&"12000".to_string()));
1313        assert!(args.contains(&"--metrics-port".to_string()));
1314        assert!(args.contains(&"13000".to_string()));
1315        assert!(args.contains(&"--bootstrap".to_string()));
1316        assert!(args.contains(&"peer1".to_string()));
1317        assert!(args.contains(&"peer2".to_string()));
1318        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1319        // No upgrade channel configured -> no --upgrade-channel argument.
1320        assert!(!args.contains(&"--upgrade-channel".to_string()));
1321    }
1322
1323    #[test]
1324    fn build_node_args_includes_upgrade_channel() {
1325        let mut config = NodeConfig {
1326            id: 1,
1327            service_name: "node1".to_string(),
1328            rewards_address: "0xabc".to_string(),
1329            data_dir: "/data/node-1".into(),
1330            log_dir: None,
1331            node_port: None,
1332            metrics_port: None,
1333            network_id: None,
1334            binary_path: "/bin/node".into(),
1335            version: "0.1.0".to_string(),
1336            env_variables: HashMap::new(),
1337            bootstrap_peers: vec![],
1338            upgrade_channel: Some(UpgradeChannel::Beta),
1339        };
1340
1341        let args = build_node_args(&config);
1342        let idx = args
1343            .iter()
1344            .position(|a| a == "--upgrade-channel")
1345            .expect("--upgrade-channel should be present");
1346        assert_eq!(args[idx + 1], "beta");
1347
1348        config.upgrade_channel = Some(UpgradeChannel::Stable);
1349        let args = build_node_args(&config);
1350        let idx = args.iter().position(|a| a == "--upgrade-channel").unwrap();
1351        assert_eq!(args[idx + 1], "stable");
1352    }
1353
1354    #[test]
1355    fn build_node_args_minimal() {
1356        let config = NodeConfig {
1357            id: 1,
1358            service_name: "node1".to_string(),
1359            rewards_address: "0xabc".to_string(),
1360            data_dir: "/data/node-1".into(),
1361            log_dir: None,
1362            node_port: None,
1363            metrics_port: None,
1364            network_id: None,
1365            binary_path: "/bin/node".into(),
1366            version: "0.1.0".to_string(),
1367            env_variables: HashMap::new(),
1368            bootstrap_peers: vec![],
1369            upgrade_channel: None,
1370        };
1371
1372        let args = build_node_args(&config);
1373
1374        assert!(args.contains(&"--rewards-address".to_string()));
1375        assert!(args.contains(&"--root-dir".to_string()));
1376        assert!(!args.contains(&"--enable-logging".to_string()));
1377        assert!(!args.contains(&"--log-dir".to_string()));
1378        assert!(!args.contains(&"--port".to_string()));
1379        assert!(!args.contains(&"--metrics-port".to_string()));
1380        assert!(!args.contains(&"--bootstrap".to_string()));
1381        assert!(args.contains(&"--stop-on-upgrade".to_string()));
1382    }
1383
1384    #[test]
1385    fn record_crash_backoff_increases() {
1386        let (tx, _rx) = broadcast::channel(16);
1387        let mut sup = Supervisor::new(tx);
1388
1389        // Insert a running node
1390        sup.node_states.insert(
1391            1,
1392            NodeRuntime {
1393                status: NodeStatus::Running,
1394                pid: Some(100),
1395                started_at: Some(Instant::now()),
1396                restart_count: 0,
1397                first_crash_at: None,
1398                pending_version: None,
1399            },
1400        );
1401
1402        let (should_restart, attempt, backoff) = sup.record_crash(1);
1403        assert!(should_restart);
1404        assert_eq!(attempt, 1);
1405        assert_eq!(backoff, Duration::from_secs(1));
1406
1407        let (should_restart, attempt, backoff) = sup.record_crash(1);
1408        assert!(should_restart);
1409        assert_eq!(attempt, 2);
1410        assert_eq!(backoff, Duration::from_secs(2));
1411
1412        let (should_restart, attempt, backoff) = sup.record_crash(1);
1413        assert!(should_restart);
1414        assert_eq!(attempt, 3);
1415        assert_eq!(backoff, Duration::from_secs(4));
1416
1417        let (should_restart, attempt, backoff) = sup.record_crash(1);
1418        assert!(should_restart);
1419        assert_eq!(attempt, 4);
1420        assert_eq!(backoff, Duration::from_secs(8));
1421
1422        // 5th crash within window → errored
1423        let (should_restart, attempt, _) = sup.record_crash(1);
1424        assert!(!should_restart);
1425        assert_eq!(attempt, 5);
1426        assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
1427    }
1428
1429    #[test]
1430    fn node_counts_tracks_states() {
1431        let (tx, _rx) = broadcast::channel(16);
1432        let mut sup = Supervisor::new(tx);
1433
1434        sup.node_states.insert(
1435            1,
1436            NodeRuntime {
1437                status: NodeStatus::Running,
1438                pid: Some(100),
1439                started_at: Some(Instant::now()),
1440                restart_count: 0,
1441                first_crash_at: None,
1442                pending_version: None,
1443            },
1444        );
1445        sup.node_states.insert(
1446            2,
1447            NodeRuntime {
1448                status: NodeStatus::Stopped,
1449                pid: None,
1450                started_at: None,
1451                restart_count: 0,
1452                first_crash_at: None,
1453                pending_version: None,
1454            },
1455        );
1456        sup.node_states.insert(
1457            3,
1458            NodeRuntime {
1459                status: NodeStatus::Errored,
1460                pid: None,
1461                started_at: None,
1462                restart_count: 5,
1463                first_crash_at: None,
1464                pending_version: None,
1465            },
1466        );
1467
1468        let (running, stopped, errored) = sup.node_counts();
1469        assert_eq!(running, 1);
1470        assert_eq!(stopped, 1);
1471        assert_eq!(errored, 1);
1472    }
1473
1474    #[test]
1475    fn mark_upgrade_scheduled_only_affects_running_nodes() {
1476        let (tx, mut rx) = broadcast::channel(16);
1477        let mut sup = Supervisor::new(tx);
1478
1479        sup.node_states.insert(
1480            1,
1481            NodeRuntime {
1482                status: NodeStatus::Running,
1483                pid: Some(111),
1484                started_at: Some(Instant::now()),
1485                restart_count: 0,
1486                first_crash_at: None,
1487                pending_version: None,
1488            },
1489        );
1490        sup.node_states.insert(
1491            2,
1492            NodeRuntime {
1493                status: NodeStatus::Stopped,
1494                pid: None,
1495                started_at: None,
1496                restart_count: 0,
1497                first_crash_at: None,
1498                pending_version: None,
1499            },
1500        );
1501
1502        // Running node: transitions to UpgradeScheduled with pending_version set and event fires.
1503        let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
1504        assert!(affected);
1505        assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
1506        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1507        match rx.try_recv() {
1508            Ok(NodeEvent::UpgradeScheduled {
1509                node_id,
1510                pending_version,
1511            }) => {
1512                assert_eq!(node_id, 1);
1513                assert_eq!(pending_version, "0.10.11-rc.1");
1514            }
1515            other => panic!("expected UpgradeScheduled event, got {other:?}"),
1516        }
1517
1518        // Stopped node: untouched, no event fired.
1519        let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
1520        assert!(!affected);
1521        assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
1522        assert!(sup.node_pending_version(2).is_none());
1523
1524        // Already-UpgradeScheduled node: calling again is a no-op.
1525        let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
1526        assert!(!affected);
1527        // Pending version is the original one set.
1528        assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
1529    }
1530
1531    #[test]
1532    fn node_counts_counts_upgrade_scheduled_as_running() {
1533        let (tx, _rx) = broadcast::channel(16);
1534        let mut sup = Supervisor::new(tx);
1535
1536        sup.node_states.insert(
1537            1,
1538            NodeRuntime {
1539                status: NodeStatus::UpgradeScheduled,
1540                pid: Some(111),
1541                started_at: Some(Instant::now()),
1542                restart_count: 0,
1543                first_crash_at: None,
1544                pending_version: Some("0.10.11-rc.1".to_string()),
1545            },
1546        );
1547
1548        let (running, stopped, errored) = sup.node_counts();
1549        assert_eq!(running, 1);
1550        assert_eq!(stopped, 0);
1551        assert_eq!(errored, 0);
1552    }
1553
1554    #[tokio::test]
1555    async fn stop_node_not_found() {
1556        let (tx, _rx) = broadcast::channel(16);
1557        let mut sup = Supervisor::new(tx);
1558
1559        let result = sup.stop_node(999).await;
1560        assert!(matches!(result, Err(Error::NodeNotFound(999))));
1561    }
1562
1563    #[tokio::test]
1564    async fn stop_node_not_running() {
1565        let (tx, _rx) = broadcast::channel(16);
1566        let mut sup = Supervisor::new(tx);
1567
1568        sup.node_states.insert(
1569            1,
1570            NodeRuntime {
1571                status: NodeStatus::Stopped,
1572                pid: None,
1573                started_at: None,
1574                restart_count: 0,
1575                first_crash_at: None,
1576                pending_version: None,
1577            },
1578        );
1579
1580        let result = sup.stop_node(1).await;
1581        assert!(matches!(result, Err(Error::NodeNotRunning(1))));
1582    }
1583
1584    #[tokio::test]
1585    async fn stop_all_nodes_mixed_states() {
1586        let (tx, _rx) = broadcast::channel(16);
1587        let mut sup = Supervisor::new(tx);
1588
1589        // Node 1: running (but with a fake PID that won't exist)
1590        sup.node_states.insert(
1591            1,
1592            NodeRuntime {
1593                status: NodeStatus::Running,
1594                pid: Some(999999),
1595                started_at: Some(Instant::now()),
1596                restart_count: 0,
1597                first_crash_at: None,
1598                pending_version: None,
1599            },
1600        );
1601        // Node 2: already stopped
1602        sup.node_states.insert(
1603            2,
1604            NodeRuntime {
1605                status: NodeStatus::Stopped,
1606                pid: None,
1607                started_at: None,
1608                restart_count: 0,
1609                first_crash_at: None,
1610                pending_version: None,
1611            },
1612        );
1613
1614        let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
1615
1616        let result = sup.stop_all_nodes(&configs).await;
1617
1618        assert_eq!(result.stopped.len(), 1);
1619        assert_eq!(result.stopped[0].node_id, 1);
1620        assert_eq!(result.stopped[0].service_name, "node1");
1621        assert_eq!(result.already_stopped, vec![2]);
1622        assert!(result.failed.is_empty());
1623    }
1624}