use anyhow::Result;
use ractor::{concurrency::Duration, Actor, ActorProcessingErr, ActorRef};
use crate::config::ValorConfig;
use crate::service::ValorServiceId;
use crate::{
master::{TaskStatusUpdate, ValorMasterMessage},
types::{ValorID, ValorIdExt},
worker::{
messages::ValorWorkerMessage,
status::{CapacitySnapshot, ValorWorkerCapacity, ValorWorkerEvent, WorkerHeartbeat},
ValorWorkerServiceRegistry,
},
};
pub struct Worker;
pub struct ValorWorkerState {
id: ValorID,
service_ids: Vec<ValorServiceId>,
reporter_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Actor for Worker {
type Msg = ValorWorkerMessage;
type State = ValorWorkerState;
type Arguments = ValorID;
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
arg: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let workers_pg = ValorID::workers_pg_name();
let actor_cell = myself.get_cell();
ractor::pg::join(workers_pg, vec![actor_cell]);
tracing::info!(
"Worker {} joined process group '{}' and is ready to receive messages",
arg,
ValorID::workers_pg_name()
);
let service_ids = match ValorWorkerServiceRegistry::load_default() {
Ok(registry) => registry.service_ids(),
Err(e) => {
tracing::warn!(
"Worker: failed to load service registry: {} (using empty)",
e
);
Vec::new()
}
};
let my_id = arg.clone();
let myself_ref = myself.clone();
let reporter_handle = tokio::spawn(async move {
let base_ms: u64 = crate::config::ValorApplicationConfig::load()
.ok()
.map(|c| c.tuning().heartbeat_interval_ms)
.unwrap_or(5_000);
let missed_threshold: u32 = crate::config::ValorApplicationConfig::load()
.ok()
.map(|c| c.tuning().missed_before_unreachable)
.unwrap_or(2);
let mut seq_no: u64 = 0;
let mut missed_master_intervals: u32 = 0;
loop {
let jitter: u64 = (rand::random::<u16>() as u64) % ((base_ms / 10).max(1));
let sleep_dur = Duration::from_millis(base_ms + jitter);
tokio::select! {
_ = ractor::concurrency::sleep(sleep_dur) => {
let masters_pg = ValorID::masters_pg_name();
if ractor::pg::get_members(&masters_pg).is_empty() {
missed_master_intervals = missed_master_intervals.saturating_add(1);
} else {
missed_master_intervals = 0;
}
if missed_master_intervals >= missed_threshold {
tracing::warn!("Worker {}: master not found for {} intervals, initiating graceful leave", my_id, missed_master_intervals);
let _ = myself_ref.cast(ValorWorkerMessage::Shutdown);
break;
}
let heartbeat = ValorWorkerEvent::Heartbeat(WorkerHeartbeat {
id: my_id.clone(),
ts_mono_ms: current_millis(),
seq_no,
});
send_to_master(heartbeat);
let cap = system_capacity();
let capacity = ValorWorkerEvent::CapacityReport(CapacitySnapshot {
id: my_id.clone(),
ts_mono_ms: current_millis(),
capacity: cap,
});
send_to_master(capacity);
seq_no = seq_no.wrapping_add(1);
}
}
}
});
Ok(ValorWorkerState {
id: arg,
service_ids,
reporter_handle: Some(reporter_handle),
})
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ValorWorkerMessage::Shutdown => {
tracing::warn!("Worker {}: received shutdown", state.id);
let workers_pg = ValorID::workers_pg_name();
let cell = _myself.get_cell();
ractor::pg::leave(workers_pg, vec![cell]);
if let Some(handle) = state.reporter_handle.take() {
handle.abort();
}
}
ValorWorkerMessage::NorthboundRegisterMasterConfirmed => {
let wspan = tracing::info_span!(
"flow.worker.registered",
worker_id = %state.id
);
tracing::info!(parent: &wspan, "Worker: master confirmed registration");
let service_ids: Vec<ValorServiceId> = state.service_ids.clone();
let report =
ValorWorkerEvent::ServicesReport(crate::worker::status::ServicesSnapshot {
id: state.id.clone(),
ts_mono_ms: current_millis(),
services: service_ids,
version: 1,
});
send_to_master(report);
}
ValorWorkerMessage::NorthboundRegisterMasterRejected(reason) => {
tracing::warn!("Worker {}: registration rejected: {:?}", state.id, reason);
}
ValorWorkerMessage::NorthboundMasterTask(task) => {
let tspan = tracing::info_span!(
"flow.worker.task",
worker_id = %state.id,
task_id = %task.task_id
);
tracing::info!(parent: &tspan, "Worker: received task (type={:?})", task.task_type);
let service_id = match &task.task_type {
crate::common::task::ValorTaskType::ExecuteService { service_id, .. } => {
service_id
}
};
if service_id.0 == "common.cmd" {
tracing::info!(parent: &tspan, "Worker: executing common.cmd");
let (status, output_opt, error_opt) = execute_cmd_task(&task).await;
tracing::info!(parent: &tspan, "Worker: task finished with status {:?}", status);
let update = TaskStatusUpdate {
task_id: task.task_id.to_string(),
worker_id: state.id.clone(),
status,
output: output_opt,
error: error_opt.map(crate::common::task::ValorTaskError::from),
};
send_master_message(ValorMasterMessage::UpdateTaskStatus(update));
}
}
ValorWorkerMessage::NorthboundMasterServiceManagements(cmds) => {
tracing::info!(
"Worker {}: received {} service cmd(s)",
state.id,
cmds.len()
);
}
ValorWorkerMessage::NorthboundUnregisterConfirmed => {
tracing::info!("Worker {}: unregister confirmed by master", state.id);
if let Some(handle) = state.reporter_handle.take() {
handle.abort();
}
}
}
Ok(())
}
}
async fn execute_cmd(command: &str, args: &[String]) -> anyhow::Result<(i32, String, String)> {
use tokio::process::Command;
let mut cmd = Command::new(command);
cmd.args(args);
let output = cmd.output().await?;
let code = output.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
Ok((code, stdout, stderr))
}
async fn execute_cmd_task(
task: &crate::common::task::ValorMasterTask,
) -> (
crate::common::task::ValorTaskStatus,
Option<crate::common::task::ValorTaskOutput>,
Option<String>,
) {
let (cmd, args): (String, Vec<String>) = match &task.input {
crate::common::task::ValorTaskInput::Json(v) => {
let command = v
.get("command")
.and_then(|c| c.as_str())
.unwrap_or("")
.to_string();
let args = v
.get("args")
.and_then(|a| a.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect::<Vec<String>>()
})
.unwrap_or_default();
(command, args)
}
crate::common::task::ValorTaskInput::Text(t) => (t.clone(), Vec::new()),
_ => (String::new(), Vec::new()),
};
if cmd.is_empty() {
return (
crate::common::task::ValorTaskStatus::Failed,
None,
Some("missing command".to_string()),
);
}
match execute_cmd(&cmd, &args).await {
Ok((code, stdout, stderr)) => {
if code == 0 {
(
crate::common::task::ValorTaskStatus::Completed,
Some(crate::common::task::ValorTaskOutput::Json(
serde_json::json!({
"code": code,
"stdout": stdout,
"stderr": stderr,
}),
)),
None,
)
} else {
(
crate::common::task::ValorTaskStatus::Failed,
Some(crate::common::task::ValorTaskOutput::Json(
serde_json::json!({
"code": code,
"stdout": stdout,
"stderr": stderr,
}),
)),
Some(format!("command exited with code {code}")),
)
}
}
Err(e) => (
crate::common::task::ValorTaskStatus::Failed,
None,
Some(format!("exec error: {e}")),
),
}
}
fn send_master_message(msg: ValorMasterMessage) {
let masters_pg = crate::types::ValorID::masters_pg_name();
if let Some(cell) = ractor::pg::get_members(&masters_pg).into_iter().next() {
let master_ref: ActorRef<ValorMasterMessage> = cell.into();
let _ = master_ref.cast(msg);
} else {
tracing::warn!("No master found to send task update");
}
}
pub async fn startup_worker_node(port: u16, config: &ValorConfig) {
let id = &config.cli.id;
let server = ractor_cluster::NodeServer::new(
port,
config.app.cluster_cookie(),
format!("Worker-NodeServer-{id}"),
config.app.hostname(),
None,
None,
);
let (_actor, _handle) = Actor::spawn(None, server, ())
.await
.expect("Failed to start Worker's NodeServer");
let worker_actor_name = ValorID::new_worker(id).to_string();
let (_worker_actor, _test_handle) = Actor::spawn(
Some(worker_actor_name),
Worker,
ValorID::new_worker(&config.cli.id),
)
.await
.expect("Worker actor failed to start up!");
tracing::info!(
"Worker started on port {} with cookie: {}",
port,
config.app.cluster_cookie()
);
ractor::concurrency::sleep(Duration::from_millis(1000)).await;
}
fn system_capacity() -> ValorWorkerCapacity {
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
let mut sys = System::new_with_specifics(
RefreshKind::nothing()
.with_memory(MemoryRefreshKind::everything())
.with_cpu(CpuRefreshKind::everything()),
);
sys.refresh_cpu_all();
std::thread::sleep(std::time::Duration::from_millis(100));
sys.refresh_cpu_all();
sys.refresh_memory();
let total_cpus = sys.cpus().len() as u32;
let mut avg_cpu_usage = 0.0f32;
if total_cpus > 0 {
let sum: f32 = sys.cpus().iter().map(|c| c.cpu_usage()).sum();
avg_cpu_usage = sum / total_cpus as f32; }
let used_ratio = (avg_cpu_usage / 100.0).clamp(0.0, 1.0);
let used_cpus = (used_ratio * total_cpus as f32).round() as u32;
let free_cpus = total_cpus.saturating_sub(used_cpus);
let (total_cpus, free_cpus) = if total_cpus == 0 {
(1, 1)
} else {
(total_cpus, free_cpus)
};
let b_per_mb: u64 = 1024 * 1024;
let total_bytes = sys.total_memory();
let used_bytes = sys.used_memory();
let available_bytes = sys.available_memory();
let total_mem_mb = (total_bytes / b_per_mb) as u32;
let mut avail_mem_mb = if available_bytes > 0 {
(available_bytes / b_per_mb) as u32
} else if total_bytes >= used_bytes {
((total_bytes - used_bytes) / b_per_mb) as u32
} else {
0
};
if avail_mem_mb > total_mem_mb && total_mem_mb > 0 {
avail_mem_mb = total_mem_mb;
}
ValorWorkerCapacity {
total_cpu: total_cpus,
total_mem_mb,
free_cpu: free_cpus,
free_mem_mb: avail_mem_mb,
cpu_usage_pct: avg_cpu_usage,
}
}
fn send_to_master(event: ValorWorkerEvent) {
let masters_pg = ValorID::masters_pg_name();
let mut members = ractor::pg::get_members(&masters_pg);
match members.len() {
1 => {
let cell = members.remove(0);
let master_ref: ActorRef<ValorMasterMessage> = cell.into();
let _ = master_ref.cast(ValorMasterMessage::SouthboundWorkerStateUpdate(event));
}
0 => {
tracing::debug!("No master found in PG '{}' while sending event", masters_pg);
}
n => {
tracing::warn!(
"Found {} masters in PG '{}' (expected 1); sending to the first",
n,
masters_pg
);
if let Some(cell) = members.into_iter().next() {
let master_ref: ActorRef<ValorMasterMessage> = cell.into();
let _ = master_ref.cast(ValorMasterMessage::SouthboundWorkerStateUpdate(event));
}
}
}
}
fn current_millis() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}