use std::io::Read;
use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crossbeam::channel::{bounded, select, unbounded, Receiver, RecvTimeoutError, Sender};
use crate::remote_wire::{
frame_kind, perl_to_json_value, read_typed_frame, send_msg, HelloAck, HelloMsg, JobMsg,
JobRespMsg, SessionAck, SessionInit, PROTO_VERSION,
};
use crate::value::{PerlValue, RemoteCluster, RemoteSlot};
#[derive(Debug, Clone)]
pub struct DispatchJob {
pub seq: u64,
pub item: serde_json::Value,
pub attempts: u32,
}
#[derive(Debug)]
pub struct DispatchResult {
pub seq: u64,
pub outcome: Result<PerlValue, String>,
}
pub fn run_cluster(
cluster: &RemoteCluster,
subs_prelude: String,
block_src: String,
capture: Vec<(String, serde_json::Value)>,
items: Vec<serde_json::Value>,
) -> Result<Vec<PerlValue>, String> {
if items.is_empty() {
return Ok(Vec::new());
}
if cluster.slots.is_empty() {
return Err("cluster: no slots".to_string());
}
let work_capacity = (cluster.slots.len() * 2).max(8);
let (work_tx, work_rx) = bounded::<DispatchJob>(work_capacity);
let (result_tx, result_rx) = unbounded::<DispatchResult>();
let (shutdown_tx, shutdown_rx) = bounded::<()>(0);
let mut handles = Vec::with_capacity(cluster.slots.len());
let session_init = Arc::new(SessionInit {
subs_prelude,
block_src,
capture,
});
let cluster_arc = Arc::new(cluster.clone());
for (slot_idx, slot) in cluster.slots.iter().enumerate() {
let slot = slot.clone();
let work_rx = work_rx.clone();
let work_tx = work_tx.clone();
let result_tx = result_tx.clone();
let shutdown_rx = shutdown_rx.clone();
let init = Arc::clone(&session_init);
let cluster = Arc::clone(&cluster_arc);
handles.push(thread::spawn(move || {
slot_worker_loop(
slot_idx,
slot,
init,
cluster,
work_rx,
work_tx,
result_tx,
shutdown_rx,
);
}));
}
drop(work_rx);
drop(result_tx);
drop(shutdown_rx);
for (i, item) in items.iter().enumerate() {
let job = DispatchJob {
seq: i as u64,
item: item.clone(),
attempts: 0,
};
if work_tx.send(job).is_err() {
return Err("cluster: all worker slots died before any work was sent".to_string());
}
}
drop(work_tx);
let mut results: Vec<Option<Result<PerlValue, String>>> =
(0..items.len()).map(|_| None).collect();
let mut received = 0usize;
while received < items.len() {
match result_rx.recv() {
Ok(r) => {
let idx = r.seq as usize;
if idx < results.len() && results[idx].is_none() {
results[idx] = Some(r.outcome);
received += 1;
}
}
Err(_) => {
break;
}
}
}
drop(shutdown_tx);
for h in handles {
let _ = h.join();
}
let mut out = Vec::with_capacity(items.len());
for (i, slot_result) in results.into_iter().enumerate() {
match slot_result {
Some(Ok(v)) => out.push(v),
Some(Err(e)) => {
return Err(format!("cluster: job {i} failed permanently: {e}"));
}
None => {
return Err(format!(
"cluster: job {i} never completed (all slots died?)"
));
}
}
}
Ok(out)
}
#[allow(clippy::too_many_arguments)]
fn slot_worker_loop(
slot_idx: usize,
slot: RemoteSlot,
init: Arc<SessionInit>,
cluster: Arc<RemoteCluster>,
work_rx: Receiver<DispatchJob>,
work_tx: Sender<DispatchJob>,
result_tx: Sender<DispatchResult>,
shutdown_rx: Receiver<()>,
) {
let mut session = match SlotSession::open(&slot, &init, &cluster) {
Ok(s) => s,
Err(e) => {
eprintln!(
"cluster: slot {slot_idx} ({}) failed to start: {e}",
slot.host
);
return;
}
};
loop {
let job = select! {
recv(work_rx) -> r => match r {
Ok(j) => j,
Err(_) => {
let _ = session.shutdown();
return;
}
},
recv(shutdown_rx) -> _ => {
let _ = session.shutdown();
return;
},
};
match session.run_job(&job, cluster.job_timeout_ms) {
Ok(resp) => {
if resp.ok {
let pv = match crate::remote_wire::json_to_perl(&resp.result) {
Ok(v) => v,
Err(e) => {
let _ = result_tx.send(DispatchResult {
seq: job.seq,
outcome: Err(format!("decode result: {e}")),
});
continue;
}
};
let _ = result_tx.send(DispatchResult {
seq: job.seq,
outcome: Ok(pv),
});
} else {
let _ = result_tx.send(DispatchResult {
seq: job.seq,
outcome: Err(resp.err_msg),
});
}
}
Err(SlotError::Transport(e)) => {
eprintln!(
"cluster: slot {slot_idx} ({}) transport error: {e}; retrying job {}",
slot.host, job.seq
);
requeue_or_fail(&work_tx, &result_tx, &cluster, job);
let _ = session.kill();
return;
}
Err(SlotError::Timeout) => {
eprintln!(
"cluster: slot {slot_idx} ({}) timed out on job {}; retrying",
slot.host, job.seq
);
requeue_or_fail(&work_tx, &result_tx, &cluster, job);
let _ = session.kill();
return;
}
}
}
}
fn requeue_or_fail(
work_tx: &Sender<DispatchJob>,
result_tx: &Sender<DispatchResult>,
cluster: &RemoteCluster,
mut job: DispatchJob,
) {
job.attempts += 1;
if job.attempts >= cluster.max_attempts {
let _ = result_tx.send(DispatchResult {
seq: job.seq,
outcome: Err(format!(
"job exhausted retry budget after {} attempts",
job.attempts
)),
});
return;
}
if work_tx.send(job).is_err() {
}
}
struct SlotSession {
child: Child,
stdin: std::process::ChildStdin,
resp_rx: Receiver<Result<JobRespMsg, String>>,
}
#[derive(Debug)]
enum SlotError {
Transport(String),
Timeout,
}
impl SlotSession {
fn open(
slot: &RemoteSlot,
init: &SessionInit,
cluster: &RemoteCluster,
) -> Result<Self, String> {
let connect_timeout = (cluster.connect_timeout_ms / 1000).max(1);
let mut child = Command::new("ssh")
.arg("-o")
.arg(format!("ConnectTimeout={connect_timeout}"))
.arg("-o")
.arg("BatchMode=yes")
.arg(&slot.host)
.arg(&slot.pe_path)
.arg("--remote-worker")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| format!("spawn ssh: {e}"))?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| "ssh stdin missing".to_string())?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| "ssh stdout missing".to_string())?;
let mut stderr = child
.stderr
.take()
.ok_or_else(|| "ssh stderr missing".to_string())?;
thread::spawn(move || {
let mut buf = String::new();
let _ = stderr.read_to_string(&mut buf);
if !buf.trim().is_empty() {
eprintln!("[remote-worker] {}", buf.trim());
}
});
let hello = HelloMsg {
proto_version: PROTO_VERSION,
pe_version: env!("CARGO_PKG_VERSION").to_string(),
};
send_msg(&mut stdin, frame_kind::HELLO, &hello).map_err(|e| format!("send HELLO: {e}"))?;
let (kind, body) =
read_typed_frame(&mut stdout).map_err(|e| format!("read HELLO_ACK: {e}"))?;
if kind != frame_kind::HELLO_ACK {
return Err(format!("expected HELLO_ACK, got frame kind {kind:#04x}"));
}
let _: HelloAck =
bincode::deserialize(&body).map_err(|e| format!("decode HELLO_ACK: {e}"))?;
send_msg(&mut stdin, frame_kind::SESSION_INIT, init)
.map_err(|e| format!("send SESSION_INIT: {e}"))?;
let (kind, body) =
read_typed_frame(&mut stdout).map_err(|e| format!("read SESSION_ACK: {e}"))?;
if kind != frame_kind::SESSION_ACK {
return Err(format!("expected SESSION_ACK, got frame kind {kind:#04x}"));
}
let ack: SessionAck =
bincode::deserialize(&body).map_err(|e| format!("decode SESSION_ACK: {e}"))?;
if !ack.ok {
return Err(format!("worker rejected session: {}", ack.err_msg));
}
let (resp_tx, resp_rx) = bounded::<Result<JobRespMsg, String>>(1);
thread::spawn(move || loop {
match read_typed_frame(&mut stdout) {
Ok((kind, body)) if kind == frame_kind::JOB_RESP => {
match bincode::deserialize::<JobRespMsg>(&body) {
Ok(r) => {
if resp_tx.send(Ok(r)).is_err() {
return;
}
}
Err(e) => {
let _ = resp_tx.send(Err(format!("decode JOB_RESP: {e}")));
return;
}
}
}
Ok((other, _)) => {
let _ = resp_tx.send(Err(format!(
"unexpected frame kind {other:#04x} in resp loop"
)));
return;
}
Err(e) => {
let _ = resp_tx.send(Err(format!("read frame: {e}")));
return;
}
}
});
Ok(Self {
child,
stdin,
resp_rx,
})
}
fn run_job(&mut self, job: &DispatchJob, timeout_ms: u64) -> Result<JobRespMsg, SlotError> {
let msg = JobMsg {
seq: job.seq,
item: job.item.clone(),
};
send_msg(&mut self.stdin, frame_kind::JOB, &msg)
.map_err(|e| SlotError::Transport(format!("send JOB: {e}")))?;
match self.resp_rx.recv_timeout(Duration::from_millis(timeout_ms)) {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(SlotError::Transport(e)),
Err(RecvTimeoutError::Timeout) => Err(SlotError::Timeout),
Err(RecvTimeoutError::Disconnected) => {
Err(SlotError::Transport("response channel closed".to_string()))
}
}
}
fn shutdown(&mut self) -> Result<(), String> {
let _ = send_msg::<_, ()>(&mut self.stdin, frame_kind::SHUTDOWN, &());
let _ = self.child.wait();
Ok(())
}
fn kill(&mut self) -> Result<(), String> {
let _ = self.child.kill();
let _ = self.child.wait();
Ok(())
}
}
pub fn perl_items_to_json(items: &[PerlValue]) -> Result<Vec<serde_json::Value>, String> {
items.iter().map(perl_to_json_value).collect()
}