use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, RwLock};
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
use crate::error::{Error, Result};
use crate::node::binary::extract_version;
use crate::node::events::NodeEvent;
use crate::node::process::spawn::spawn_node;
use crate::node::registry::NodeRegistry;
use crate::node::types::{
NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult,
};
pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
fn node_pid_file(data_dir: &Path) -> PathBuf {
data_dir.join("node.pid")
}
fn write_node_pid(data_dir: &Path, pid: u32) {
let path = node_pid_file(data_dir);
if let Err(e) = std::fs::write(&path, pid.to_string()) {
tracing::warn!(
"Failed to write node pid file at {}: {e}. Node will still run, but a future \
daemon restart will not be able to adopt it.",
path.display()
);
}
}
fn remove_node_pid(data_dir: &Path) {
let _ = std::fs::remove_file(node_pid_file(data_dir));
}
fn read_node_pid(data_dir: &Path) -> Option<u32> {
std::fs::read_to_string(node_pid_file(data_dir))
.ok()
.and_then(|s| s.trim().parse().ok())
}
fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
let target_data_dir = config.data_dir.as_path();
for (pid, process) in sys.processes() {
if process.thread_kind().is_some() {
continue;
}
let Some(exe) = process.exe() else {
continue;
};
if exe != config.binary_path.as_path() {
continue;
}
let cmd = process.cmd();
let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
let arg = arg.to_string_lossy();
if let Some(value) = arg.strip_prefix("--root-dir=") {
Path::new(value) == target_data_dir
} else if arg == "--root-dir" {
cmd.get(i + 1)
.map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
.unwrap_or(false)
} else {
false
}
});
if matches_root_dir {
return Some(pid.as_u32());
}
}
None
}
fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
if !is_process_alive(pid) {
return false;
}
match sys.process(sysinfo::Pid::from_u32(pid)) {
Some(process) => process.thread_kind().is_none(),
None => true,
}
}
fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
if let Some(pid) = read_node_pid(&config.data_dir) {
if pid_is_live_process(pid, sys) {
return Some(pid);
}
remove_node_pid(&config.data_dir);
}
let pid = find_running_node_process(sys, config)?;
write_node_pid(&config.data_dir, pid);
Some(pid)
}
fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()?
.as_secs();
let age = now_secs.saturating_sub(start_secs);
Instant::now().checked_sub(Duration::from_secs(age))
}
const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
const CRASH_WINDOW: Duration = Duration::from_secs(300);
const STABLE_DURATION: Duration = Duration::from_secs(300);
const MAX_BACKOFF: Duration = Duration::from_secs(60);
pub struct Supervisor {
event_tx: broadcast::Sender<NodeEvent>,
node_states: HashMap<u32, NodeRuntime>,
}
struct NodeRuntime {
status: NodeStatus,
pid: Option<u32>,
started_at: Option<Instant>,
restart_count: u32,
first_crash_at: Option<Instant>,
pending_version: Option<String>,
}
impl Supervisor {
pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
Self {
event_tx,
node_states: HashMap::new(),
}
}
pub async fn start_node(
&mut self,
config: &NodeConfig,
supervisor_ref: Arc<RwLock<Supervisor>>,
registry_ref: Arc<RwLock<NodeRegistry>>,
) -> Result<NodeStarted> {
let node_id = config.id;
if let Some(state) = self.node_states.get(&node_id) {
if state.status == NodeStatus::Running {
return Err(Error::NodeAlreadyRunning(node_id));
}
}
let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
let mut child = spawn_node_from_config(config).await?;
let pid = child
.id()
.ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
Ok(Ok(exit_status)) => {
let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
let stderr_path = spawn_log_dir.join("stderr.log");
let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
let detail = if stderr_msg.trim().is_empty() {
format!("exit code: {exit_status}")
} else {
stderr_msg.trim().to_string()
};
self.node_states.insert(
node_id,
NodeRuntime {
status: NodeStatus::Errored,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
return Err(Error::ProcessSpawn(format!(
"Node {node_id} exited immediately: {detail}"
)));
}
Ok(Err(e)) => {
return Err(Error::ProcessSpawn(format!(
"Failed to check node process status: {e}"
)));
}
Err(_) => {} }
self.node_states.insert(
node_id,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(pid),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
let result = NodeStarted {
node_id,
service_name: config.service_name.clone(),
pid,
};
let event_tx = self.event_tx.clone();
let config = config.clone();
tokio::spawn(async move {
monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
});
Ok(result)
}
pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
let state = self
.node_states
.get_mut(&node_id)
.ok_or(Error::NodeNotFound(node_id))?;
if state.status != NodeStatus::Running {
return Err(Error::NodeNotRunning(node_id));
}
let pid = state.pid;
let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
state.status = NodeStatus::Stopping;
if let Some(pid) = pid {
graceful_kill(pid).await;
}
let state = self.node_states.get_mut(&node_id).unwrap();
state.status = NodeStatus::Stopped;
state.pid = None;
state.started_at = None;
let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
Ok(())
}
pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
let mut stopped = Vec::new();
let mut failed = Vec::new();
let mut already_stopped = Vec::new();
for (node_id, service_name) in configs {
let node_id = *node_id;
match self.node_status(node_id) {
Ok(NodeStatus::Running) => {}
Ok(_) => {
already_stopped.push(node_id);
continue;
}
Err(_) => {
already_stopped.push(node_id);
continue;
}
}
match self.stop_node(node_id).await {
Ok(()) => {
stopped.push(NodeStopped {
node_id,
service_name: service_name.clone(),
});
}
Err(Error::NodeNotRunning(_)) => {
already_stopped.push(node_id);
}
Err(e) => {
failed.push(NodeStopFailed {
node_id,
service_name: service_name.clone(),
error: e.to_string(),
});
}
}
}
StopNodeResult {
stopped,
failed,
already_stopped,
}
}
pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
self.node_states
.get(&node_id)
.map(|s| s.status)
.ok_or(Error::NodeNotFound(node_id))
}
pub fn node_pid(&self, node_id: u32) -> Option<u32> {
self.node_states.get(&node_id).and_then(|s| s.pid)
}
pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
self.node_states
.get(&node_id)
.and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
}
pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
self.node_states
.get(&node_id)
.and_then(|s| s.pending_version.clone())
}
fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
let Some(state) = self.node_states.get_mut(&node_id) else {
return false;
};
if state.status != NodeStatus::Running {
return false;
}
state.status = NodeStatus::UpgradeScheduled;
state.pending_version = Some(pending_version.clone());
let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
node_id,
pending_version,
});
true
}
pub fn is_running(&self, node_id: u32) -> bool {
self.node_states
.get(&node_id)
.is_some_and(|s| s.status == NodeStatus::Running)
}
pub fn node_counts(&self) -> (u32, u32, u32) {
let mut running = 0u32;
let mut stopped = 0u32;
let mut errored = 0u32;
for state in self.node_states.values() {
match state.status {
NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
running += 1
}
NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1,
NodeStatus::Errored => errored += 1,
}
}
(running, stopped, errored)
}
fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
if let Some(state) = self.node_states.get_mut(&node_id) {
state.status = status;
state.pid = pid;
if status == NodeStatus::Running {
state.started_at = Some(Instant::now());
} else {
state.started_at = None;
}
}
}
pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
let mut sys = sysinfo::System::new();
sys.refresh_processes_specifics(
sysinfo::ProcessesToUpdate::All,
true,
sysinfo::ProcessRefreshKind::everything(),
);
let mut adopted = Vec::new();
for config in registry.list() {
let Some(pid) = resolve_adopted_pid(config, &sys) else {
continue;
};
self.node_states.insert(
config.id,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(pid),
started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let _ = self.event_tx.send(NodeEvent::NodeStarted {
node_id: config.id,
pid,
});
adopted.push(config.id);
}
adopted
}
fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
let state = match self.node_states.get_mut(&node_id) {
Some(s) => s,
None => return (false, 0, Duration::ZERO),
};
let now = Instant::now();
if let Some(started_at) = state.started_at {
if started_at.elapsed() >= STABLE_DURATION {
state.restart_count = 0;
state.first_crash_at = None;
}
}
state.restart_count += 1;
let attempt = state.restart_count;
if state.first_crash_at.is_none() {
state.first_crash_at = Some(now);
}
if let Some(first_crash) = state.first_crash_at {
if attempt >= MAX_CRASHES_BEFORE_ERRORED
&& now.duration_since(first_crash) < CRASH_WINDOW
{
state.status = NodeStatus::Errored;
state.pid = None;
state.started_at = None;
return (false, attempt, Duration::ZERO);
}
}
let backoff_secs = 1u64 << (attempt - 1).min(5);
let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
(true, attempt, backoff)
}
}
pub fn spawn_upgrade_monitor(
registry: Arc<RwLock<NodeRegistry>>,
supervisor: Arc<RwLock<Supervisor>>,
interval: Duration,
shutdown: CancellationToken,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.tick().await;
loop {
tokio::select! {
_ = shutdown.cancelled() => return,
_ = ticker.tick() => {},
}
let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
let reg = registry.read().await;
let sup = supervisor.read().await;
reg.list()
.into_iter()
.filter_map(|config| match sup.node_status(config.id) {
Ok(NodeStatus::Running) => Some((
config.id,
config.binary_path.clone(),
config.version.clone(),
sup.node_pending_version(config.id),
)),
_ => None,
})
.collect()
};
for (node_id, binary_path, recorded_version, current_pending) in candidates {
let observed = match extract_version(&binary_path).await {
Ok(v) => v,
Err(_) => continue,
};
if observed == recorded_version {
continue;
}
if current_pending.as_deref() == Some(observed.as_str()) {
continue;
}
supervisor
.write()
.await
.mark_upgrade_scheduled(node_id, observed);
}
}
});
}
pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
let mut args = vec![
"--rewards-address".to_string(),
config.rewards_address.clone(),
"--root-dir".to_string(),
config.data_dir.display().to_string(),
];
if let Some(ref log_dir) = config.log_dir {
args.push("--enable-logging".to_string());
args.push("--log-dir".to_string());
args.push(log_dir.display().to_string());
}
if let Some(port) = config.node_port {
args.push("--port".to_string());
args.push(port.to_string());
}
if let Some(port) = config.metrics_port {
args.push("--metrics-port".to_string());
args.push(port.to_string());
}
for peer in &config.bootstrap_peers {
args.push("--bootstrap".to_string());
args.push(peer.clone());
}
args.push("--stop-on-upgrade".to_string());
args
}
async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
let args = build_node_args(config);
let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
let log_dir = config
.log_dir
.as_deref()
.unwrap_or(config.data_dir.as_path());
let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
if let Some(pid) = child.id() {
write_node_pid(&config.data_dir, pid);
}
Ok(child)
}
async fn monitor_node(
child: tokio::process::Child,
mut config: NodeConfig,
supervisor: Arc<RwLock<Supervisor>>,
registry: Arc<RwLock<NodeRegistry>>,
event_tx: broadcast::Sender<NodeEvent>,
) {
monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
remove_node_pid(&config.data_dir);
}
async fn monitor_node_inner(
mut child: tokio::process::Child,
config: &mut NodeConfig,
supervisor: Arc<RwLock<Supervisor>>,
registry: Arc<RwLock<NodeRegistry>>,
event_tx: broadcast::Sender<NodeEvent>,
) {
let node_id = config.id;
loop {
let exit_status = child.wait().await;
let status_at_exit = {
let sup = supervisor.read().await;
sup.node_status(node_id).ok()
};
match status_at_exit {
Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return,
Some(NodeStatus::UpgradeScheduled) => {
match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
Ok(new_child) => {
child = new_child;
continue;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!("Failed to respawn after upgrade: {e}"),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
}
}
_ => {}
}
let exit_code = exit_status.ok().and_then(|s| s.code());
if exit_code == Some(0) {
if let Ok(disk_version) = extract_version(&config.binary_path).await {
if disk_version != config.version {
{
let mut sup = supervisor.write().await;
sup.mark_upgrade_scheduled(node_id, disk_version.clone());
}
match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
Ok(new_child) => {
child = new_child;
continue;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!("Failed to respawn after upgrade: {e}"),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
}
}
}
}
let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
let (should_restart, attempt, backoff) = {
let mut sup = supervisor.write().await;
sup.record_crash(node_id)
};
if !should_restart {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!(
"Node crashed {} times within {} seconds, giving up",
MAX_CRASHES_BEFORE_ERRORED,
CRASH_WINDOW.as_secs()
),
});
return;
}
let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
tokio::time::sleep(backoff).await;
match spawn_node_from_config(&*config).await {
Ok(new_child) => {
let pid = match new_child.id() {
Some(pid) => pid,
None => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: "Restarted process exited before PID could be read"
.to_string(),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
};
{
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Running, Some(pid));
}
let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
child = new_child;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!("Failed to restart node: {e}"),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
}
}
}
async fn respawn_upgraded_node(
config: &mut NodeConfig,
supervisor: &Arc<RwLock<Supervisor>>,
registry: &Arc<RwLock<NodeRegistry>>,
event_tx: &broadcast::Sender<NodeEvent>,
) -> Result<tokio::process::Child> {
let node_id = config.id;
let old_version = config.version.clone();
let new_child = spawn_node_from_config(config).await?;
let pid = new_child
.id()
.ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
let new_version = extract_version(&config.binary_path).await.ok();
if let Some(ref version) = new_version {
config.version = version.clone();
let mut reg = registry.write().await;
if let Ok(stored) = reg.get_mut(node_id) {
stored.version = version.clone();
let _ = reg.save();
}
}
{
let mut sup = supervisor.write().await;
if let Some(state) = sup.node_states.get_mut(&node_id) {
state.status = NodeStatus::Running;
state.pid = Some(pid);
state.started_at = Some(Instant::now());
state.pending_version = None;
state.restart_count = 0;
state.first_crash_at = None;
}
}
let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
if let Some(version) = new_version {
let _ = event_tx.send(NodeEvent::NodeUpgraded {
node_id,
old_version,
new_version: version,
});
}
Ok(new_child)
}
const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
async fn graceful_kill(pid: u32) {
send_signal_term(pid);
let start = Instant::now();
loop {
if !is_process_alive(pid) {
return;
}
if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
send_signal_kill(pid);
for _ in 0..10 {
if !is_process_alive(pid) {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
pub fn spawn_liveness_monitor(
registry: Arc<RwLock<NodeRegistry>>,
supervisor: Arc<RwLock<Supervisor>>,
event_tx: broadcast::Sender<NodeEvent>,
interval: Duration,
shutdown: CancellationToken,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.cancelled() => return,
_ = ticker.tick() => {}
}
let candidates: Vec<(u32, u32, PathBuf)> =
{
let sup = supervisor.read().await;
let reg = registry.read().await;
reg.list()
.into_iter()
.filter_map(|config| {
let pid = sup.node_pid(config.id)?;
matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
.then_some((config.id, pid, config.data_dir.clone()))
})
.collect()
};
for (node_id, pid, data_dir) in candidates {
if is_process_alive(pid) {
continue;
}
let mut sup = supervisor.write().await;
if !matches!(sup.node_status(node_id), Ok(NodeStatus::Running)) {
continue;
}
sup.update_state(node_id, NodeStatus::Stopped, None);
let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
remove_node_pid(&data_dir);
}
}
});
}
#[cfg(unix)]
fn pid_to_i32(pid: u32) -> Option<i32> {
i32::try_from(pid).ok().filter(|&p| p > 0)
}
#[cfg(unix)]
fn send_signal_term(pid: u32) {
if let Some(pid) = pid_to_i32(pid) {
unsafe {
libc::kill(pid, libc::SIGTERM);
}
}
}
#[cfg(unix)]
fn send_signal_kill(pid: u32) {
if let Some(pid) = pid_to_i32(pid) {
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
}
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
let Some(pid) = pid_to_i32(pid) else {
return false;
};
let ret = unsafe { libc::kill(pid, 0) };
if ret == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
#[cfg(windows)]
fn send_signal_term(pid: u32) {
use windows_sys::Win32::System::Console::{
AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
};
unsafe {
FreeConsole();
if AttachConsole(pid) != 0 {
SetConsoleCtrlHandler(None, 1);
GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
FreeConsole();
std::thread::sleep(std::time::Duration::from_millis(50));
SetConsoleCtrlHandler(None, 0);
}
}
}
#[cfg(windows)]
fn send_signal_kill(pid: u32) {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
unsafe {
let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
if !handle.is_null() {
TerminateProcess(handle, 1);
CloseHandle(handle);
}
}
}
#[cfg(windows)]
fn is_process_alive(pid: u32) -> bool {
use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
use windows_sys::Win32::System::Threading::{
GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
};
unsafe {
let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if handle.is_null() {
return false;
}
let mut exit_code: u32 = 0;
let success = GetExitCodeProcess(handle, &mut exit_code);
CloseHandle(handle);
success != 0 && exit_code == STILL_ACTIVE as u32
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_node_args_basic() {
let config = NodeConfig {
id: 1,
service_name: "node1".to_string(),
rewards_address: "0xabc123".to_string(),
data_dir: "/data/node-1".into(),
log_dir: Some("/logs/node-1".into()),
node_port: Some(12000),
metrics_port: Some(13000),
network_id: Some(1),
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
};
let args = build_node_args(&config);
assert!(args.contains(&"--rewards-address".to_string()));
assert!(args.contains(&"0xabc123".to_string()));
assert!(args.contains(&"--root-dir".to_string()));
assert!(args.contains(&"/data/node-1".to_string()));
assert!(args.contains(&"--enable-logging".to_string()));
assert!(args.contains(&"--log-dir".to_string()));
assert!(args.contains(&"/logs/node-1".to_string()));
assert!(args.contains(&"--port".to_string()));
assert!(args.contains(&"12000".to_string()));
assert!(args.contains(&"--metrics-port".to_string()));
assert!(args.contains(&"13000".to_string()));
assert!(args.contains(&"--bootstrap".to_string()));
assert!(args.contains(&"peer1".to_string()));
assert!(args.contains(&"peer2".to_string()));
assert!(args.contains(&"--stop-on-upgrade".to_string()));
}
#[test]
fn build_node_args_minimal() {
let config = NodeConfig {
id: 1,
service_name: "node1".to_string(),
rewards_address: "0xabc".to_string(),
data_dir: "/data/node-1".into(),
log_dir: None,
node_port: None,
metrics_port: None,
network_id: None,
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec![],
};
let args = build_node_args(&config);
assert!(args.contains(&"--rewards-address".to_string()));
assert!(args.contains(&"--root-dir".to_string()));
assert!(!args.contains(&"--enable-logging".to_string()));
assert!(!args.contains(&"--log-dir".to_string()));
assert!(!args.contains(&"--port".to_string()));
assert!(!args.contains(&"--metrics-port".to_string()));
assert!(!args.contains(&"--bootstrap".to_string()));
assert!(args.contains(&"--stop-on-upgrade".to_string()));
}
#[test]
fn record_crash_backoff_increases() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(100),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 1);
assert_eq!(backoff, Duration::from_secs(1));
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 2);
assert_eq!(backoff, Duration::from_secs(2));
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 3);
assert_eq!(backoff, Duration::from_secs(4));
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 4);
assert_eq!(backoff, Duration::from_secs(8));
let (should_restart, attempt, _) = sup.record_crash(1);
assert!(!should_restart);
assert_eq!(attempt, 5);
assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
}
#[test]
fn node_counts_tracks_states() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(100),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
2,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
3,
NodeRuntime {
status: NodeStatus::Errored,
pid: None,
started_at: None,
restart_count: 5,
first_crash_at: None,
pending_version: None,
},
);
let (running, stopped, errored) = sup.node_counts();
assert_eq!(running, 1);
assert_eq!(stopped, 1);
assert_eq!(errored, 1);
}
#[test]
fn mark_upgrade_scheduled_only_affects_running_nodes() {
let (tx, mut rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(111),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
2,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
assert!(affected);
assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
match rx.try_recv() {
Ok(NodeEvent::UpgradeScheduled {
node_id,
pending_version,
}) => {
assert_eq!(node_id, 1);
assert_eq!(pending_version, "0.10.11-rc.1");
}
other => panic!("expected UpgradeScheduled event, got {other:?}"),
}
let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
assert!(!affected);
assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
assert!(sup.node_pending_version(2).is_none());
let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
assert!(!affected);
assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
}
#[test]
fn node_counts_counts_upgrade_scheduled_as_running() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::UpgradeScheduled,
pid: Some(111),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: Some("0.10.11-rc.1".to_string()),
},
);
let (running, stopped, errored) = sup.node_counts();
assert_eq!(running, 1);
assert_eq!(stopped, 0);
assert_eq!(errored, 0);
}
#[tokio::test]
async fn stop_node_not_found() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
let result = sup.stop_node(999).await;
assert!(matches!(result, Err(Error::NodeNotFound(999))));
}
#[tokio::test]
async fn stop_node_not_running() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let result = sup.stop_node(1).await;
assert!(matches!(result, Err(Error::NodeNotRunning(1))));
}
#[tokio::test]
async fn stop_all_nodes_mixed_states() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(999999),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
2,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
let result = sup.stop_all_nodes(&configs).await;
assert_eq!(result.stopped.len(), 1);
assert_eq!(result.stopped[0].node_id, 1);
assert_eq!(result.stopped[0].service_name, "node1");
assert_eq!(result.already_stopped, vec![2]);
assert!(result.failed.is_empty());
}
}