use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, RwLock};
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
use crate::error::{Error, Result};
use crate::node::binary::extract_version;
use crate::node::daemon::disk;
use crate::node::daemon::health::{DiskThresholds, FleetHealth};
use crate::node::events::NodeEvent;
use crate::node::process::spawn::spawn_node;
use crate::node::registry::NodeRegistry;
use crate::node::types::{
EvictionRecord, NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped,
StopNodeResult,
};
pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60);
pub const EVICTION_POLL_INTERVAL: Duration = Duration::from_secs(30);
const MAX_EVICTIONS_PER_CYCLE: usize = 4;
pub const LIVENESS_POLL_INTERVAL: Duration = Duration::from_secs(5);
fn node_pid_file(data_dir: &Path) -> PathBuf {
data_dir.join("node.pid")
}
fn write_node_pid(data_dir: &Path, pid: u32) {
let path = node_pid_file(data_dir);
if let Err(e) = std::fs::write(&path, pid.to_string()) {
tracing::warn!(
"Failed to write node pid file at {}: {e}. Node will still run, but a future \
daemon restart will not be able to adopt it.",
path.display()
);
}
}
fn remove_node_pid(data_dir: &Path) {
let _ = std::fs::remove_file(node_pid_file(data_dir));
}
fn read_node_pid(data_dir: &Path) -> Option<u32> {
std::fs::read_to_string(node_pid_file(data_dir))
.ok()
.and_then(|s| s.trim().parse().ok())
}
fn find_running_node_process(sys: &sysinfo::System, config: &NodeConfig) -> Option<u32> {
let target_data_dir = config.data_dir.as_path();
for (pid, process) in sys.processes() {
if process.thread_kind().is_some() {
continue;
}
let Some(exe) = process.exe() else {
continue;
};
if exe != config.binary_path.as_path() {
continue;
}
let cmd = process.cmd();
let matches_root_dir = cmd.iter().enumerate().any(|(i, arg)| {
let arg = arg.to_string_lossy();
if let Some(value) = arg.strip_prefix("--root-dir=") {
Path::new(value) == target_data_dir
} else if arg == "--root-dir" {
cmd.get(i + 1)
.map(|v| Path::new(&*v.to_string_lossy()) == target_data_dir)
.unwrap_or(false)
} else {
false
}
});
if matches_root_dir {
return Some(pid.as_u32());
}
}
None
}
fn pid_is_live_process(pid: u32, sys: &sysinfo::System) -> bool {
if !is_process_alive(pid) {
return false;
}
match sys.process(sysinfo::Pid::from_u32(pid)) {
Some(process) => process.thread_kind().is_none(),
None => true,
}
}
fn resolve_adopted_pid(config: &NodeConfig, sys: &sysinfo::System) -> Option<u32> {
if let Some(pid) = read_node_pid(&config.data_dir) {
if pid_is_live_process(pid, sys) {
return Some(pid);
}
remove_node_pid(&config.data_dir);
}
let pid = find_running_node_process(sys, config)?;
write_node_pid(&config.data_dir, pid);
Some(pid)
}
fn process_started_at(sys: &sysinfo::System, pid: u32) -> Option<Instant> {
let start_secs = sys.process(sysinfo::Pid::from_u32(pid))?.start_time();
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()?
.as_secs();
let age = now_secs.saturating_sub(start_secs);
Instant::now().checked_sub(Duration::from_secs(age))
}
const MAX_CRASHES_BEFORE_ERRORED: u32 = 5;
const CRASH_WINDOW: Duration = Duration::from_secs(300);
const STABLE_DURATION: Duration = Duration::from_secs(300);
const MAX_BACKOFF: Duration = Duration::from_secs(60);
pub struct Supervisor {
event_tx: broadcast::Sender<NodeEvent>,
node_states: HashMap<u32, NodeRuntime>,
adopted: HashSet<u32>,
evicting: HashSet<u32>,
}
struct NodeRuntime {
status: NodeStatus,
pid: Option<u32>,
started_at: Option<Instant>,
restart_count: u32,
first_crash_at: Option<Instant>,
pending_version: Option<String>,
}
impl Supervisor {
pub fn new(event_tx: broadcast::Sender<NodeEvent>) -> Self {
Self {
event_tx,
node_states: HashMap::new(),
adopted: HashSet::new(),
evicting: HashSet::new(),
}
}
pub fn is_adopted(&self, node_id: u32) -> bool {
self.adopted.contains(&node_id)
}
fn begin_evicting(&mut self, node_id: u32) {
self.evicting.insert(node_id);
}
fn finish_evicting(&mut self, node_id: u32) {
self.evicting.remove(&node_id);
}
fn mark_owned(&mut self, node_id: u32) {
self.adopted.remove(&node_id);
}
pub async fn start_node(
&mut self,
config: &NodeConfig,
supervisor_ref: Arc<RwLock<Supervisor>>,
registry_ref: Arc<RwLock<NodeRegistry>>,
) -> Result<NodeStarted> {
let node_id = config.id;
if config.eviction.is_some() || self.evicting.contains(&node_id) {
return Err(Error::NodeEvicted(node_id));
}
if let Some(state) = self.node_states.get(&node_id) {
if state.status == NodeStatus::Running {
return Err(Error::NodeAlreadyRunning(node_id));
}
}
let _ = self.event_tx.send(NodeEvent::NodeStarting { node_id });
let mut child = spawn_node_from_config(config).await?;
let pid = child
.id()
.ok_or_else(|| Error::ProcessSpawn("Failed to get PID from spawned process".into()))?;
match tokio::time::timeout(Duration::from_secs(1), child.wait()).await {
Ok(Ok(exit_status)) => {
let spawn_log_dir = config.log_dir.as_deref().unwrap_or(&config.data_dir);
let stderr_path = spawn_log_dir.join("stderr.log");
let stderr_msg = std::fs::read_to_string(&stderr_path).unwrap_or_default();
let detail = if stderr_msg.trim().is_empty() {
format!("exit code: {exit_status}")
} else {
stderr_msg.trim().to_string()
};
self.node_states.insert(
node_id,
NodeRuntime {
status: NodeStatus::Errored,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
return Err(Error::ProcessSpawn(format!(
"Node {node_id} exited immediately: {detail}"
)));
}
Ok(Err(e)) => {
return Err(Error::ProcessSpawn(format!(
"Failed to check node process status: {e}"
)));
}
Err(_) => {} }
self.node_states.insert(
node_id,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(pid),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
self.mark_owned(node_id);
let _ = self.event_tx.send(NodeEvent::NodeStarted { node_id, pid });
let result = NodeStarted {
node_id,
service_name: config.service_name.clone(),
pid,
};
let event_tx = self.event_tx.clone();
let config = config.clone();
tokio::spawn(async move {
monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await;
});
Ok(result)
}
pub async fn stop_node(&mut self, node_id: u32) -> Result<()> {
let state = self
.node_states
.get_mut(&node_id)
.ok_or(Error::NodeNotFound(node_id))?;
if state.status != NodeStatus::Running {
return Err(Error::NodeNotRunning(node_id));
}
let pid = state.pid;
let _ = self.event_tx.send(NodeEvent::NodeStopping { node_id });
state.status = NodeStatus::Stopping;
if let Some(pid) = pid {
graceful_kill(pid).await;
}
let state = self.node_states.get_mut(&node_id).unwrap();
state.status = NodeStatus::Stopped;
state.pid = None;
state.started_at = None;
let _ = self.event_tx.send(NodeEvent::NodeStopped { node_id });
Ok(())
}
pub async fn stop_all_nodes(&mut self, configs: &[(u32, String)]) -> StopNodeResult {
let mut stopped = Vec::new();
let mut failed = Vec::new();
let mut already_stopped = Vec::new();
for (node_id, service_name) in configs {
let node_id = *node_id;
match self.node_status(node_id) {
Ok(NodeStatus::Running) => {}
Ok(_) => {
already_stopped.push(node_id);
continue;
}
Err(_) => {
already_stopped.push(node_id);
continue;
}
}
match self.stop_node(node_id).await {
Ok(()) => {
stopped.push(NodeStopped {
node_id,
service_name: service_name.clone(),
});
}
Err(Error::NodeNotRunning(_)) => {
already_stopped.push(node_id);
}
Err(e) => {
failed.push(NodeStopFailed {
node_id,
service_name: service_name.clone(),
error: e.to_string(),
});
}
}
}
StopNodeResult {
stopped,
failed,
already_stopped,
}
}
pub fn node_status(&self, node_id: u32) -> Result<NodeStatus> {
self.node_states
.get(&node_id)
.map(|s| s.status)
.ok_or(Error::NodeNotFound(node_id))
}
pub fn node_pid(&self, node_id: u32) -> Option<u32> {
self.node_states.get(&node_id).and_then(|s| s.pid)
}
pub fn node_uptime_secs(&self, node_id: u32) -> Option<u64> {
self.node_states
.get(&node_id)
.and_then(|s| s.started_at.map(|t| t.elapsed().as_secs()))
}
pub fn node_pending_version(&self, node_id: u32) -> Option<String> {
self.node_states
.get(&node_id)
.and_then(|s| s.pending_version.clone())
}
fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool {
let Some(state) = self.node_states.get_mut(&node_id) else {
return false;
};
if state.status != NodeStatus::Running {
return false;
}
state.status = NodeStatus::UpgradeScheduled;
state.pending_version = Some(pending_version.clone());
let _ = self.event_tx.send(NodeEvent::UpgradeScheduled {
node_id,
pending_version,
});
true
}
pub fn is_running(&self, node_id: u32) -> bool {
self.node_states
.get(&node_id)
.is_some_and(|s| s.status == NodeStatus::Running)
}
pub fn node_counts(&self) -> (u32, u32, u32) {
let mut running = 0u32;
let mut stopped = 0u32;
let mut errored = 0u32;
for state in self.node_states.values() {
match state.status {
NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
running += 1
}
NodeStatus::Stopped | NodeStatus::Stopping | NodeStatus::Evicted => stopped += 1,
NodeStatus::Errored => errored += 1,
}
}
(running, stopped, errored)
}
fn update_state(&mut self, node_id: u32, status: NodeStatus, pid: Option<u32>) {
if let Some(state) = self.node_states.get_mut(&node_id) {
state.status = status;
state.pid = pid;
if status == NodeStatus::Running {
state.started_at = Some(Instant::now());
} else {
state.started_at = None;
}
}
}
pub fn adopt_from_registry(&mut self, registry: &NodeRegistry) -> Vec<u32> {
let mut sys = sysinfo::System::new();
sys.refresh_processes_specifics(
sysinfo::ProcessesToUpdate::All,
true,
sysinfo::ProcessRefreshKind::everything(),
);
let mut adopted = Vec::new();
for config in registry.list() {
let Some(pid) = resolve_adopted_pid(config, &sys) else {
continue;
};
self.node_states.insert(
config.id,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(pid),
started_at: Some(process_started_at(&sys, pid).unwrap_or_else(Instant::now)),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
self.adopted.insert(config.id);
let _ = self.event_tx.send(NodeEvent::NodeStarted {
node_id: config.id,
pid,
});
adopted.push(config.id);
}
adopted
}
fn record_crash(&mut self, node_id: u32) -> (bool, u32, Duration) {
let state = match self.node_states.get_mut(&node_id) {
Some(s) => s,
None => return (false, 0, Duration::ZERO),
};
let now = Instant::now();
if let Some(started_at) = state.started_at {
if started_at.elapsed() >= STABLE_DURATION {
state.restart_count = 0;
state.first_crash_at = None;
}
}
state.restart_count += 1;
let attempt = state.restart_count;
if state.first_crash_at.is_none() {
state.first_crash_at = Some(now);
}
if let Some(first_crash) = state.first_crash_at {
if attempt >= MAX_CRASHES_BEFORE_ERRORED
&& now.duration_since(first_crash) < CRASH_WINDOW
{
state.status = NodeStatus::Errored;
state.pid = None;
state.started_at = None;
return (false, attempt, Duration::ZERO);
}
}
let backoff_secs = 1u64 << (attempt - 1).min(5);
let backoff = Duration::from_secs(backoff_secs).min(MAX_BACKOFF);
(true, attempt, backoff)
}
}
pub fn spawn_upgrade_monitor(
registry: Arc<RwLock<NodeRegistry>>,
supervisor: Arc<RwLock<Supervisor>>,
interval: Duration,
shutdown: CancellationToken,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.tick().await;
loop {
tokio::select! {
_ = shutdown.cancelled() => return,
_ = ticker.tick() => {},
}
let candidates: Vec<(u32, std::path::PathBuf, String, Option<String>)> = {
let reg = registry.read().await;
let sup = supervisor.read().await;
reg.list()
.into_iter()
.filter_map(|config| match sup.node_status(config.id) {
Ok(NodeStatus::Running) => Some((
config.id,
config.binary_path.clone(),
config.version.clone(),
sup.node_pending_version(config.id),
)),
_ => None,
})
.collect()
};
for (node_id, binary_path, recorded_version, current_pending) in candidates {
let observed = match extract_version(&binary_path).await {
Ok(v) => v,
Err(_) => continue,
};
if observed == recorded_version {
continue;
}
if current_pending.as_deref() == Some(observed.as_str()) {
continue;
}
supervisor
.write()
.await
.mark_upgrade_scheduled(node_id, observed);
}
}
});
}
pub fn spawn_eviction_monitor(
registry: Arc<RwLock<NodeRegistry>>,
supervisor: Arc<RwLock<Supervisor>>,
event_tx: broadcast::Sender<NodeEvent>,
health: Arc<RwLock<FleetHealth>>,
thresholds: DiskThresholds,
interval: Duration,
shutdown: CancellationToken,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.tick().await;
loop {
tokio::select! {
_ = shutdown.cancelled() => return,
_ = ticker.tick() => {},
}
run_eviction_cycle(®istry, &supervisor, &event_tx, &health, &thresholds).await;
}
});
}
async fn run_eviction_cycle(
registry: &Arc<RwLock<NodeRegistry>>,
supervisor: &Arc<RwLock<Supervisor>>,
event_tx: &broadcast::Sender<NodeEvent>,
health: &Arc<RwLock<FleetHealth>>,
thresholds: &DiskThresholds,
) {
for _ in 0..MAX_EVICTIONS_PER_CYCLE {
let partitions = disk::partition_states(running_nodes(registry, supervisor).await);
let target = partitions
.iter()
.find(|p| p.available_bytes <= thresholds.eviction_bytes && p.nodes.len() >= 2);
let Some(partition) = target else {
publish_health(
health,
event_tx,
FleetHealth::from_partitions(&partitions, thresholds),
)
.await;
return;
};
let Some(candidate) = partition.eviction_candidate().cloned() else {
break;
};
evict_node(
registry,
supervisor,
event_tx,
&candidate,
partition.available_bytes,
)
.await;
}
let partitions = disk::partition_states(running_nodes(registry, supervisor).await);
publish_health(
health,
event_tx,
FleetHealth::from_partitions(&partitions, thresholds),
)
.await;
}
async fn running_nodes(
registry: &Arc<RwLock<NodeRegistry>>,
supervisor: &Arc<RwLock<Supervisor>>,
) -> Vec<(u32, PathBuf)> {
let reg = registry.read().await;
let sup = supervisor.read().await;
reg.list()
.into_iter()
.filter(|config| config.eviction.is_none())
.filter(|config| matches!(sup.node_status(config.id), Ok(NodeStatus::Running)))
.map(|config| (config.id, config.data_dir.clone()))
.collect()
}
async fn remove_dir_all_with_retry(path: &Path) -> std::io::Result<()> {
const MAX_ATTEMPTS: u32 = 8;
let mut delay = Duration::from_millis(100);
for attempt in 1..=MAX_ATTEMPTS {
match std::fs::remove_dir_all(path) {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) if attempt == MAX_ATTEMPTS => return Err(e),
Err(_) => {
tokio::time::sleep(delay).await;
delay = (delay * 2).min(Duration::from_secs(1));
}
}
}
Ok(())
}
async fn persist_eviction_marker(
registry: &Arc<RwLock<NodeRegistry>>,
node_id: u32,
reason: &str,
evicted_at: u64,
reclaimed_bytes: u64,
) {
let mut reg = registry.write().await;
if let Ok(config) = reg.get_mut(node_id) {
config.eviction = Some(EvictionRecord {
reason: reason.to_string(),
evicted_at,
reclaimed_bytes,
});
}
if let Err(e) = reg.save() {
tracing::error!("Eviction: failed to persist registry for node {node_id}: {e}");
}
}
async fn evict_node(
registry: &Arc<RwLock<NodeRegistry>>,
supervisor: &Arc<RwLock<Supervisor>>,
event_tx: &broadcast::Sender<NodeEvent>,
candidate: &disk::NodeDiskUsage,
available_before: u64,
) {
let node_id = candidate.node_id;
let reclaimable = candidate.size_bytes;
let evicted_at = now_unix_secs();
supervisor.write().await.begin_evicting(node_id);
let pending_reason = format!(
"Automatically evicted to reclaim disk space: only {} free on its partition. \
Deleting its data directory to recover ~{}.",
fmt_bytes(available_before),
fmt_bytes(reclaimable),
);
persist_eviction_marker(registry, node_id, &pending_reason, evicted_at, reclaimable).await;
if let Err(e) = supervisor.write().await.stop_node(node_id).await {
tracing::warn!("Eviction: failed to stop node {node_id} before deletion: {e}");
}
let deleted = match remove_dir_all_with_retry(&candidate.data_dir).await {
Ok(()) => true,
Err(e) => {
tracing::error!(
"Eviction: could not delete data dir {} for node {node_id} after retries: {e}. \
Disk space was NOT reclaimed; manual cleanup may be required.",
candidate.data_dir.display()
);
false
}
};
let (reclaimed_bytes, reason) = if deleted {
(
reclaimable,
format!(
"Automatically evicted to reclaim disk space: only {} free on its partition. \
Its data directory was deleted, recovering ~{}.",
fmt_bytes(available_before),
fmt_bytes(reclaimable),
),
)
} else {
(
0,
format!(
"Automatically evicted due to low disk space (only {} free on its partition), but \
its data directory could not be deleted, so space was not reclaimed. Manual \
cleanup of {} may be needed.",
fmt_bytes(available_before),
candidate.data_dir.display(),
),
)
};
persist_eviction_marker(registry, node_id, &reason, evicted_at, reclaimed_bytes).await;
{
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Evicted, None);
sup.finish_evicting(node_id);
}
tracing::info!(
"Evicted node {node_id}, reclaimed ~{} ({reason})",
fmt_bytes(reclaimed_bytes)
);
let _ = event_tx.send(NodeEvent::NodeEvicted {
node_id,
reason,
reclaimed_bytes,
});
}
async fn publish_health(
health: &Arc<RwLock<FleetHealth>>,
event_tx: &broadcast::Sender<NodeEvent>,
next: FleetHealth,
) {
let changed = {
let mut current = health.write().await;
let changed = current.overall != next.overall;
*current = next.clone();
changed
};
if changed {
let _ = event_tx.send(NodeEvent::FleetHealthChanged {
overall: serde_json::to_value(next.overall)
.ok()
.and_then(|v| v.as_str().map(str::to_owned))
.unwrap_or_default(),
});
}
}
fn now_unix_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn fmt_bytes(bytes: u64) -> String {
const MIB: f64 = 1024.0 * 1024.0;
const GIB: f64 = 1024.0 * MIB;
let b = bytes as f64;
if b >= GIB {
format!("{:.2} GiB", b / GIB)
} else {
format!("{:.0} MiB", b / MIB)
}
}
pub fn build_node_args(config: &NodeConfig) -> Vec<String> {
let mut args = vec![
"--rewards-address".to_string(),
config.rewards_address.clone(),
"--root-dir".to_string(),
config.data_dir.display().to_string(),
];
if let Some(ref log_dir) = config.log_dir {
args.push("--enable-logging".to_string());
args.push("--log-dir".to_string());
args.push(log_dir.display().to_string());
}
if let Some(port) = config.node_port {
args.push("--port".to_string());
args.push(port.to_string());
}
for peer in &config.bootstrap_peers {
args.push("--bootstrap".to_string());
args.push(peer.clone());
}
if let Some(channel) = config.upgrade_channel {
args.push("--upgrade-channel".to_string());
args.push(channel.to_string());
}
args.push("--stop-on-upgrade".to_string());
args.push("--evm-network".to_string());
args.push(config.evm_network.as_arg().to_string());
args
}
async fn spawn_node_from_config(config: &NodeConfig) -> Result<tokio::process::Child> {
let args = build_node_args(config);
let env_vars: Vec<(String, String)> = config.env_variables.clone().into_iter().collect();
let log_dir = config
.log_dir
.as_deref()
.unwrap_or(config.data_dir.as_path());
let child = spawn_node(&config.binary_path, &args, &env_vars, log_dir).await?;
if let Some(pid) = child.id() {
write_node_pid(&config.data_dir, pid);
}
Ok(child)
}
async fn monitor_node(
child: tokio::process::Child,
mut config: NodeConfig,
supervisor: Arc<RwLock<Supervisor>>,
registry: Arc<RwLock<NodeRegistry>>,
event_tx: broadcast::Sender<NodeEvent>,
) {
monitor_node_inner(child, &mut config, supervisor, registry, event_tx).await;
remove_node_pid(&config.data_dir);
}
async fn monitor_node_inner(
mut child: tokio::process::Child,
config: &mut NodeConfig,
supervisor: Arc<RwLock<Supervisor>>,
registry: Arc<RwLock<NodeRegistry>>,
event_tx: broadcast::Sender<NodeEvent>,
) {
let node_id = config.id;
loop {
let exit_status = child.wait().await;
let status_at_exit = {
let sup = supervisor.read().await;
sup.node_status(node_id).ok()
};
match status_at_exit {
Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) | Some(NodeStatus::Evicted) => {
return
}
Some(NodeStatus::UpgradeScheduled) => {
match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
Ok(new_child) => {
child = new_child;
continue;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!("Failed to respawn after upgrade: {e}"),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
}
}
_ => {}
}
let exit_code = exit_status.ok().and_then(|s| s.code());
if exit_code == Some(0) {
if let Ok(disk_version) = extract_version(&config.binary_path).await {
if disk_version != config.version {
{
let mut sup = supervisor.write().await;
sup.mark_upgrade_scheduled(node_id, disk_version.clone());
}
match respawn_upgraded_node(config, &supervisor, ®istry, &event_tx).await {
Ok(new_child) => {
child = new_child;
continue;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!("Failed to respawn after upgrade: {e}"),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
}
}
}
}
let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code });
let (should_restart, attempt, backoff) = {
let mut sup = supervisor.write().await;
sup.record_crash(node_id)
};
if !should_restart {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!(
"Node crashed {} times within {} seconds, giving up",
MAX_CRASHES_BEFORE_ERRORED,
CRASH_WINDOW.as_secs()
),
});
return;
}
let _ = event_tx.send(NodeEvent::NodeRestarting { node_id, attempt });
tokio::time::sleep(backoff).await;
match spawn_node_from_config(&*config).await {
Ok(new_child) => {
let pid = match new_child.id() {
Some(pid) => pid,
None => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: "Restarted process exited before PID could be read"
.to_string(),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
};
{
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Running, Some(pid));
}
let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
child = new_child;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!("Failed to restart node: {e}"),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
return;
}
}
}
}
async fn respawn_upgraded_node(
config: &mut NodeConfig,
supervisor: &Arc<RwLock<Supervisor>>,
registry: &Arc<RwLock<NodeRegistry>>,
event_tx: &broadcast::Sender<NodeEvent>,
) -> Result<tokio::process::Child> {
let node_id = config.id;
let old_version = config.version.clone();
let new_child = spawn_node_from_config(config).await?;
let pid = new_child
.id()
.ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?;
let new_version = extract_version(&config.binary_path).await.ok();
if let Some(ref version) = new_version {
config.version = version.clone();
let mut reg = registry.write().await;
if let Ok(stored) = reg.get_mut(node_id) {
stored.version = version.clone();
let _ = reg.save();
}
}
{
let mut sup = supervisor.write().await;
if let Some(state) = sup.node_states.get_mut(&node_id) {
state.status = NodeStatus::Running;
state.pid = Some(pid);
state.started_at = Some(Instant::now());
state.pending_version = None;
state.restart_count = 0;
state.first_crash_at = None;
}
}
let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid });
if let Some(version) = new_version {
let _ = event_tx.send(NodeEvent::NodeUpgraded {
node_id,
old_version,
new_version: version,
});
}
Ok(new_child)
}
const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
async fn graceful_kill(pid: u32) {
send_signal_term(pid);
let start = Instant::now();
loop {
if !is_process_alive(pid) {
return;
}
if start.elapsed() >= GRACEFUL_SHUTDOWN_TIMEOUT {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
send_signal_kill(pid);
for _ in 0..10 {
if !is_process_alive(pid) {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
fn liveness_should_stop(
snapshot_pid: u32,
current_pid: Option<u32>,
current_status: Option<NodeStatus>,
) -> bool {
current_status == Some(NodeStatus::Running) && current_pid == Some(snapshot_pid)
}
pub fn spawn_liveness_monitor(
registry: Arc<RwLock<NodeRegistry>>,
supervisor: Arc<RwLock<Supervisor>>,
event_tx: broadcast::Sender<NodeEvent>,
interval: Duration,
shutdown: CancellationToken,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.cancelled() => return,
_ = ticker.tick() => {}
}
let candidates: Vec<(u32, u32, PathBuf)> =
{
let sup = supervisor.read().await;
let reg = registry.read().await;
reg.list()
.into_iter()
.filter_map(|config| {
let pid = sup.node_pid(config.id)?;
matches!(sup.node_status(config.id), Ok(NodeStatus::Running))
.then_some((config.id, pid, config.data_dir.clone()))
})
.collect()
};
for (node_id, pid, data_dir) in candidates {
if is_process_alive(pid) {
continue;
}
if supervisor.read().await.is_adopted(node_id) {
let config = {
let reg = registry.read().await;
reg.get(node_id).ok().cloned()
};
if let Some(mut config) = config {
let drifted = matches!(
extract_version(&config.binary_path).await,
Ok(disk_version) if disk_version != config.version
);
if drifted {
match respawn_upgraded_node(
&mut config,
&supervisor,
®istry,
&event_tx,
)
.await
{
Ok(child) => {
supervisor.write().await.mark_owned(node_id);
let sup_ref = Arc::clone(&supervisor);
let reg_ref = Arc::clone(®istry);
let ev = event_tx.clone();
tokio::spawn(async move {
monitor_node(child, config, sup_ref, reg_ref, ev).await;
});
continue;
}
Err(e) => {
let _ = event_tx.send(NodeEvent::NodeErrored {
node_id,
message: format!(
"Failed to respawn adopted node after upgrade: {e}"
),
});
let mut sup = supervisor.write().await;
sup.update_state(node_id, NodeStatus::Errored, None);
sup.mark_owned(node_id);
remove_node_pid(&data_dir);
continue;
}
}
}
}
}
let mut sup = supervisor.write().await;
if !liveness_should_stop(pid, sup.node_pid(node_id), sup.node_status(node_id).ok())
{
continue;
}
sup.update_state(node_id, NodeStatus::Stopped, None);
let _ = event_tx.send(NodeEvent::NodeStopped { node_id });
remove_node_pid(&data_dir);
}
}
});
}
#[cfg(unix)]
fn pid_to_i32(pid: u32) -> Option<i32> {
i32::try_from(pid).ok().filter(|&p| p > 0)
}
#[cfg(unix)]
fn send_signal_term(pid: u32) {
if let Some(pid) = pid_to_i32(pid) {
unsafe {
libc::kill(pid, libc::SIGTERM);
}
}
}
#[cfg(unix)]
fn send_signal_kill(pid: u32) {
if let Some(pid) = pid_to_i32(pid) {
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
}
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
let Some(pid) = pid_to_i32(pid) else {
return false;
};
let ret = unsafe { libc::kill(pid, 0) };
if ret == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
#[cfg(windows)]
fn send_signal_term(pid: u32) {
use windows_sys::Win32::System::Console::{
AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, SetConsoleCtrlHandler, CTRL_C_EVENT,
};
unsafe {
FreeConsole();
if AttachConsole(pid) != 0 {
SetConsoleCtrlHandler(None, 1);
GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
FreeConsole();
std::thread::sleep(std::time::Duration::from_millis(50));
SetConsoleCtrlHandler(None, 0);
}
}
}
#[cfg(windows)]
fn send_signal_kill(pid: u32) {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
unsafe {
let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
if !handle.is_null() {
TerminateProcess(handle, 1);
CloseHandle(handle);
}
}
}
#[cfg(windows)]
fn is_process_alive(pid: u32) -> bool {
use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
use windows_sys::Win32::System::Threading::{
GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
};
unsafe {
let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if handle.is_null() {
return false;
}
let mut exit_code: u32 = 0;
let success = GetExitCodeProcess(handle, &mut exit_code);
CloseHandle(handle);
success != 0 && exit_code == STILL_ACTIVE as u32
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::node::types::{EvmNetwork, UpgradeChannel};
#[tokio::test]
async fn remove_dir_all_with_retry_deletes_tree_and_tolerates_missing() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().join("node-data");
std::fs::create_dir_all(dir.join("sub")).unwrap();
std::fs::write(dir.join("sub").join("data.mdb"), vec![0u8; 128]).unwrap();
remove_dir_all_with_retry(&dir).await.unwrap();
assert!(!dir.exists());
remove_dir_all_with_retry(&dir).await.unwrap();
}
#[tokio::test]
async fn start_node_rejects_a_node_being_evicted() {
let (tx, _rx) = broadcast::channel(16);
let sup = Arc::new(RwLock::new(Supervisor::new(tx)));
let tmp = tempfile::tempdir().unwrap();
let reg = Arc::new(RwLock::new(
NodeRegistry::load(&tmp.path().join("reg.json")).unwrap(),
));
let config = NodeConfig {
id: 7,
service_name: "node7".to_string(),
rewards_address: "0xabc".to_string(),
data_dir: tmp.path().join("node-7"),
log_dir: None,
node_port: None,
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec![],
upgrade_channel: None,
evm_network: EvmNetwork::default(),
eviction: None,
};
sup.write().await.begin_evicting(7);
let res = sup
.write()
.await
.start_node(&config, sup.clone(), reg.clone())
.await;
assert!(matches!(res, Err(Error::NodeEvicted(7))));
sup.write().await.finish_evicting(7);
assert!(!sup.read().await.evicting.contains(&7));
}
#[test]
fn adopted_flag_lifecycle() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
assert!(!sup.is_adopted(1));
sup.adopted.insert(1);
assert!(sup.is_adopted(1));
sup.mark_owned(1);
assert!(!sup.is_adopted(1));
}
#[test]
fn liveness_does_not_stop_node_respawned_under_it() {
let dead_snapshot_pid = 1000; let live_respawned_pid = Some(2000); assert!(
!liveness_should_stop(
dead_snapshot_pid,
live_respawned_pid,
Some(NodeStatus::Running)
),
"liveness must not stop a node whose PID changed under it (respawned with a live PID)"
);
}
#[test]
fn build_node_args_basic() {
let config = NodeConfig {
id: 1,
service_name: "node1".to_string(),
rewards_address: "0xabc123".to_string(),
data_dir: "/data/node-1".into(),
log_dir: Some("/logs/node-1".into()),
node_port: Some(12000),
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec!["peer1".to_string(), "peer2".to_string()],
upgrade_channel: None,
evm_network: EvmNetwork::default(),
eviction: None,
};
let args = build_node_args(&config);
assert!(args.contains(&"--rewards-address".to_string()));
assert!(args.contains(&"0xabc123".to_string()));
assert!(args.contains(&"--root-dir".to_string()));
assert!(args.contains(&"/data/node-1".to_string()));
assert!(args.contains(&"--enable-logging".to_string()));
assert!(args.contains(&"--log-dir".to_string()));
assert!(args.contains(&"/logs/node-1".to_string()));
assert!(args.contains(&"--port".to_string()));
assert!(args.contains(&"12000".to_string()));
assert!(args.contains(&"--bootstrap".to_string()));
assert!(args.contains(&"peer1".to_string()));
assert!(args.contains(&"peer2".to_string()));
assert!(args.contains(&"--stop-on-upgrade".to_string()));
assert!(!args.contains(&"--upgrade-channel".to_string()));
assert_eq!(evm_network_arg(&args), Some("arbitrum-one"));
}
fn evm_network_arg(args: &[String]) -> Option<&str> {
let idx = args.iter().position(|a| a == "--evm-network")?;
args.get(idx + 1).map(String::as_str)
}
#[test]
fn build_node_args_emits_evm_network_flag() {
let mut config = NodeConfig {
id: 1,
service_name: "node1".to_string(),
rewards_address: "0xabc".to_string(),
data_dir: "/data/node-1".into(),
log_dir: None,
node_port: None,
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec![],
upgrade_channel: None,
evm_network: EvmNetwork::ArbitrumSepolia,
eviction: None,
};
let args = build_node_args(&config);
assert_eq!(evm_network_arg(&args), Some("arbitrum-sepolia"));
config.evm_network = EvmNetwork::ArbitrumOne;
let args = build_node_args(&config);
assert_eq!(evm_network_arg(&args), Some("arbitrum-one"));
}
#[test]
fn build_node_args_includes_upgrade_channel() {
let mut config = NodeConfig {
id: 1,
service_name: "node1".to_string(),
rewards_address: "0xabc".to_string(),
data_dir: "/data/node-1".into(),
log_dir: None,
node_port: None,
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec![],
upgrade_channel: Some(UpgradeChannel::Beta),
evm_network: EvmNetwork::default(),
eviction: None,
};
let args = build_node_args(&config);
let idx = args
.iter()
.position(|a| a == "--upgrade-channel")
.expect("--upgrade-channel should be present");
assert_eq!(args[idx + 1], "beta");
config.upgrade_channel = Some(UpgradeChannel::Stable);
let args = build_node_args(&config);
let idx = args.iter().position(|a| a == "--upgrade-channel").unwrap();
assert_eq!(args[idx + 1], "stable");
}
#[test]
fn build_node_args_minimal() {
let config = NodeConfig {
id: 1,
service_name: "node1".to_string(),
rewards_address: "0xabc".to_string(),
data_dir: "/data/node-1".into(),
log_dir: None,
node_port: None,
binary_path: "/bin/node".into(),
version: "0.1.0".to_string(),
env_variables: HashMap::new(),
bootstrap_peers: vec![],
upgrade_channel: None,
evm_network: EvmNetwork::default(),
eviction: None,
};
let args = build_node_args(&config);
assert!(args.contains(&"--rewards-address".to_string()));
assert!(args.contains(&"--root-dir".to_string()));
assert!(!args.contains(&"--enable-logging".to_string()));
assert!(!args.contains(&"--log-dir".to_string()));
assert!(!args.contains(&"--port".to_string()));
assert!(!args.contains(&"--bootstrap".to_string()));
assert!(args.contains(&"--stop-on-upgrade".to_string()));
}
#[test]
fn record_crash_backoff_increases() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(100),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 1);
assert_eq!(backoff, Duration::from_secs(1));
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 2);
assert_eq!(backoff, Duration::from_secs(2));
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 3);
assert_eq!(backoff, Duration::from_secs(4));
let (should_restart, attempt, backoff) = sup.record_crash(1);
assert!(should_restart);
assert_eq!(attempt, 4);
assert_eq!(backoff, Duration::from_secs(8));
let (should_restart, attempt, _) = sup.record_crash(1);
assert!(!should_restart);
assert_eq!(attempt, 5);
assert_eq!(sup.node_states[&1].status, NodeStatus::Errored);
}
#[test]
fn node_counts_tracks_states() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(100),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
2,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
3,
NodeRuntime {
status: NodeStatus::Errored,
pid: None,
started_at: None,
restart_count: 5,
first_crash_at: None,
pending_version: None,
},
);
let (running, stopped, errored) = sup.node_counts();
assert_eq!(running, 1);
assert_eq!(stopped, 1);
assert_eq!(errored, 1);
}
#[test]
fn mark_upgrade_scheduled_only_affects_running_nodes() {
let (tx, mut rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(111),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
2,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string());
assert!(affected);
assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled);
assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
match rx.try_recv() {
Ok(NodeEvent::UpgradeScheduled {
node_id,
pending_version,
}) => {
assert_eq!(node_id, 1);
assert_eq!(pending_version, "0.10.11-rc.1");
}
other => panic!("expected UpgradeScheduled event, got {other:?}"),
}
let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string());
assert!(!affected);
assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped);
assert!(sup.node_pending_version(2).is_none());
let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string());
assert!(!affected);
assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1"));
}
#[test]
fn node_counts_counts_upgrade_scheduled_as_running() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::UpgradeScheduled,
pid: Some(111),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: Some("0.10.11-rc.1".to_string()),
},
);
let (running, stopped, errored) = sup.node_counts();
assert_eq!(running, 1);
assert_eq!(stopped, 0);
assert_eq!(errored, 0);
}
#[tokio::test]
async fn stop_node_not_found() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
let result = sup.stop_node(999).await;
assert!(matches!(result, Err(Error::NodeNotFound(999))));
}
#[tokio::test]
async fn stop_node_not_running() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let result = sup.stop_node(1).await;
assert!(matches!(result, Err(Error::NodeNotRunning(1))));
}
#[tokio::test]
async fn stop_all_nodes_mixed_states() {
let (tx, _rx) = broadcast::channel(16);
let mut sup = Supervisor::new(tx);
sup.node_states.insert(
1,
NodeRuntime {
status: NodeStatus::Running,
pid: Some(999999),
started_at: Some(Instant::now()),
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
sup.node_states.insert(
2,
NodeRuntime {
status: NodeStatus::Stopped,
pid: None,
started_at: None,
restart_count: 0,
first_crash_at: None,
pending_version: None,
},
);
let configs = vec![(1, "node1".to_string()), (2, "node2".to_string())];
let result = sup.stop_all_nodes(&configs).await;
assert_eq!(result.stopped.len(), 1);
assert_eq!(result.stopped[0].node_id, 1);
assert_eq!(result.stopped[0].service_name, "node1");
assert_eq!(result.already_stopped, vec![2]);
assert!(result.failed.is_empty());
}
}