stryke/cluster.rs
1//! Persistent SSH worker pool dispatcher for `pmap_on`.
2//!
3//! ## Architecture
4//!
5//! ```text
6//! ┌── slot 0 (ssh host1) ────┐
7//! │ worker thread + ssh proc │
8//! │ HELLO + SESSION_INIT │
9//! │ loop: take JOB from work │
10//! │ send + read │
11//! │ push to results │
12//! └────────────────────────────┘
13//! ┌── slot 1 (ssh host1) ────┐
14//! │ worker thread + ssh proc │
15//! main thread │ ... │
16//! ┌─────────────────┐ └────────────────────────────┘
17//! │ enqueue all jobs├──► work_tx ─►┌── slot 2 (ssh host2) ────┐
18//! │ collect results │ │ ... │
19//! └─────────────────┘ └────────────────────────────┘
20//! ▲ │
21//! │ ▼
22//! └────────── result_rx ────────────────┘
23//! ```
24//!
25//! Each slot is one persistent `ssh HOST PE_PATH --remote-worker` process. The HELLO and
26//! SESSION_INIT handshakes happen once per slot lifetime, then the slot pulls JOB messages
27//! from a shared crossbeam channel and pushes responses to a result channel. Work-stealing
28//! emerges naturally: fast slots drain the queue faster, slow slots take fewer jobs.
29//!
30//! ## Fault tolerance
31//!
32//! When a slot's read or write fails (ssh died, network blip, remote crash), the worker
33//! thread re-enqueues the in-flight job to the shared queue with `attempts++` and exits.
34//! Other living slots pick the job up. A job is permanently failed when its attempt count
35//! reaches `cluster.max_attempts`. The whole map fails only when **every** slot is dead or
36//! every queued job has exhausted its retry budget.
37//!
38//! ## Per-job timeout
39//!
40//! Each `recv` from a slot's stdout uses a per-slot helper thread + bounded channel so the
41//! main wait is `crossbeam::channel::recv_timeout(cluster.job_timeout_ms)`. On timeout the
42//! ssh child is killed (SIGKILL), the slot is marked dead, and the in-flight job is
43//! re-enqueued just like any other slot failure.
44
45use std::io::Read;
46use std::process::{Child, Command, Stdio};
47use std::sync::Arc;
48use std::thread;
49use std::time::Duration;
50
51use crossbeam::channel::{bounded, select, unbounded, Receiver, RecvTimeoutError, Sender};
52
53use crate::remote_wire::{
54 frame_kind, perl_to_json_value, read_typed_frame, send_msg, HelloAck, HelloMsg, JobMsg,
55 JobRespMsg, SessionAck, SessionInit, PROTO_VERSION,
56};
57use crate::value::{PerlValue, RemoteCluster, RemoteSlot};
58
59/// One unit of work tracked by the dispatcher. Carries the original sequence number for
60/// order-preserving result collection plus an attempt counter for retry accounting.
61#[derive(Debug, Clone)]
62pub struct DispatchJob {
63 pub seq: u64,
64 pub item: serde_json::Value,
65 pub attempts: u32,
66}
67
68/// One result reported back to the main thread. `seq` matches the originating
69/// [`DispatchJob::seq`] so the dispatcher can stitch results back into source order.
70#[derive(Debug)]
71pub struct DispatchResult {
72 pub seq: u64,
73 pub outcome: Result<PerlValue, String>,
74}
75
76/// Run a `pmap_on` against a [`RemoteCluster`]. Blocks until every job has either succeeded
77/// or exhausted its retry budget. Returns the per-item results in the original list order
78/// or the first permanent failure.
79///
80/// `subs_prelude` and `block_src` are sent **once** per slot at session init.
81/// `capture` is the captured-lexical snapshot from the calling scope.
82/// `items` is the list of work items (already JSON-marshalled).
83pub fn run_cluster(
84 cluster: &RemoteCluster,
85 subs_prelude: String,
86 block_src: String,
87 capture: Vec<(String, serde_json::Value)>,
88 items: Vec<serde_json::Value>,
89) -> Result<Vec<PerlValue>, String> {
90 if items.is_empty() {
91 return Ok(Vec::new());
92 }
93 if cluster.slots.is_empty() {
94 return Err("cluster: no slots".to_string());
95 }
96
97 // Shared work queue: every slot pulls from here, and slot threads re-enqueue on failure.
98 // Bounded so a misbehaving producer can't memory-blow; size is `slot_count * 2` to give
99 // each slot something to grab on the next iteration without blocking.
100 let work_capacity = (cluster.slots.len() * 2).max(8);
101 let (work_tx, work_rx) = bounded::<DispatchJob>(work_capacity);
102 let (result_tx, result_rx) = unbounded::<DispatchResult>();
103 // Shutdown signal: slot workers hold their own `work_tx` clones for re-enqueue, so the
104 // work channel never closes on its own once every initial job is sent. When all results
105 // have been collected the main thread drops `shutdown_tx`, which closes `shutdown_rx`
106 // and breaks the slot workers out of their blocking `recv` in `select!`.
107 let (shutdown_tx, shutdown_rx) = bounded::<()>(0);
108
109 // Spawn one worker thread per slot.
110 let mut handles = Vec::with_capacity(cluster.slots.len());
111 let session_init = Arc::new(SessionInit {
112 subs_prelude,
113 block_src,
114 capture,
115 });
116 let cluster_arc = Arc::new(cluster.clone());
117
118 for (slot_idx, slot) in cluster.slots.iter().enumerate() {
119 let slot = slot.clone();
120 let work_rx = work_rx.clone();
121 let work_tx = work_tx.clone();
122 let result_tx = result_tx.clone();
123 let shutdown_rx = shutdown_rx.clone();
124 let init = Arc::clone(&session_init);
125 let cluster = Arc::clone(&cluster_arc);
126 handles.push(thread::spawn(move || {
127 slot_worker_loop(
128 slot_idx,
129 slot,
130 init,
131 cluster,
132 work_rx,
133 work_tx,
134 result_tx,
135 shutdown_rx,
136 );
137 }));
138 }
139
140 // Drop the dispatcher-side handles so closing all slot copies signals queue shutdown.
141 drop(work_rx);
142 drop(result_tx);
143 drop(shutdown_rx);
144
145 // Seed the queue with the initial work.
146 for (i, item) in items.iter().enumerate() {
147 let job = DispatchJob {
148 seq: i as u64,
149 item: item.clone(),
150 attempts: 0,
151 };
152 if work_tx.send(job).is_err() {
153 return Err("cluster: all worker slots died before any work was sent".to_string());
154 }
155 }
156 drop(work_tx); // close once initial enqueue is done; slot threads keep their own clones
157
158 // Collect results in seq order. We allocate the full vector up-front and assign by
159 // index so we don't depend on receive order — slot threads complete jobs in any order.
160 let mut results: Vec<Option<Result<PerlValue, String>>> =
161 (0..items.len()).map(|_| None).collect();
162 let mut received = 0usize;
163 while received < items.len() {
164 match result_rx.recv() {
165 Ok(r) => {
166 let idx = r.seq as usize;
167 if idx < results.len() && results[idx].is_none() {
168 results[idx] = Some(r.outcome);
169 received += 1;
170 }
171 }
172 Err(_) => {
173 // All slot threads dropped their senders before we got every result.
174 break;
175 }
176 }
177 }
178
179 // All results (or terminal slot-death) are in. Signal slots to stop pulling new work
180 // from the queue so they can run their SHUTDOWN handshake and exit cleanly. Without
181 // this drop the slot `select!` below would park forever on `work_rx.recv()` because
182 // every slot still holds its own `work_tx` clone for re-enqueue.
183 drop(shutdown_tx);
184
185 // Wait for slot threads to wind down.
186 for h in handles {
187 let _ = h.join();
188 }
189
190 // Stitch results back together; surface the first permanent failure if any.
191 let mut out = Vec::with_capacity(items.len());
192 for (i, slot_result) in results.into_iter().enumerate() {
193 match slot_result {
194 Some(Ok(v)) => out.push(v),
195 Some(Err(e)) => {
196 return Err(format!("cluster: job {i} failed permanently: {e}"));
197 }
198 None => {
199 return Err(format!(
200 "cluster: job {i} never completed (all slots died?)"
201 ));
202 }
203 }
204 }
205 Ok(out)
206}
207
208/// Per-slot worker thread: spawn ssh, do HELLO + SESSION_INIT, then loop pulling JOBs from
209/// the shared queue. On any I/O failure the in-flight job is re-enqueued (or permanently
210/// failed if it has exhausted its retry budget) and the slot exits.
211#[allow(clippy::too_many_arguments)]
212fn slot_worker_loop(
213 slot_idx: usize,
214 slot: RemoteSlot,
215 init: Arc<SessionInit>,
216 cluster: Arc<RemoteCluster>,
217 work_rx: Receiver<DispatchJob>,
218 work_tx: Sender<DispatchJob>,
219 result_tx: Sender<DispatchResult>,
220 shutdown_rx: Receiver<()>,
221) {
222 // Spawn the ssh child + initial handshake. Failures here mean this slot never makes
223 // any progress; we exit and let other slots drain the queue.
224 let mut session = match SlotSession::open(&slot, &init, &cluster) {
225 Ok(s) => s,
226 Err(e) => {
227 eprintln!(
228 "cluster: slot {slot_idx} ({}) failed to start: {e}",
229 slot.host
230 );
231 return;
232 }
233 };
234
235 loop {
236 // Take one job, or bail out if the dispatcher has signalled shutdown. We can't rely
237 // on `work_rx` closing by itself because every slot holds its own `work_tx` clone
238 // for re-enqueue on transport failure — so the channel would stay open forever once
239 // all initial jobs are drained. The shutdown channel is the explicit wakeup.
240 let job = select! {
241 recv(work_rx) -> r => match r {
242 Ok(j) => j,
243 Err(_) => {
244 // Queue fully closed (e.g. every slot dropped its `work_tx`) — done.
245 let _ = session.shutdown();
246 return;
247 }
248 },
249 recv(shutdown_rx) -> _ => {
250 // Dispatcher collected every result — clean SHUTDOWN frame + child wait.
251 let _ = session.shutdown();
252 return;
253 },
254 };
255
256 match session.run_job(&job, cluster.job_timeout_ms) {
257 Ok(resp) => {
258 if resp.ok {
259 let pv = match crate::remote_wire::json_to_perl(&resp.result) {
260 Ok(v) => v,
261 Err(e) => {
262 let _ = result_tx.send(DispatchResult {
263 seq: job.seq,
264 outcome: Err(format!("decode result: {e}")),
265 });
266 continue;
267 }
268 };
269 let _ = result_tx.send(DispatchResult {
270 seq: job.seq,
271 outcome: Ok(pv),
272 });
273 } else {
274 // Permanent in-script failure — no point retrying, the body is the
275 // same on every slot. Surface immediately.
276 let _ = result_tx.send(DispatchResult {
277 seq: job.seq,
278 outcome: Err(resp.err_msg),
279 });
280 }
281 }
282 Err(SlotError::Transport(e)) => {
283 // Wire-level failure — retry on a different slot if budget allows.
284 eprintln!(
285 "cluster: slot {slot_idx} ({}) transport error: {e}; retrying job {}",
286 slot.host, job.seq
287 );
288 requeue_or_fail(&work_tx, &result_tx, &cluster, job);
289 let _ = session.kill();
290 return;
291 }
292 Err(SlotError::Timeout) => {
293 eprintln!(
294 "cluster: slot {slot_idx} ({}) timed out on job {}; retrying",
295 slot.host, job.seq
296 );
297 requeue_or_fail(&work_tx, &result_tx, &cluster, job);
298 let _ = session.kill();
299 return;
300 }
301 }
302 }
303}
304
305fn requeue_or_fail(
306 work_tx: &Sender<DispatchJob>,
307 result_tx: &Sender<DispatchResult>,
308 cluster: &RemoteCluster,
309 mut job: DispatchJob,
310) {
311 job.attempts += 1;
312 if job.attempts >= cluster.max_attempts {
313 let _ = result_tx.send(DispatchResult {
314 seq: job.seq,
315 outcome: Err(format!(
316 "job exhausted retry budget after {} attempts",
317 job.attempts
318 )),
319 });
320 return;
321 }
322 if work_tx.send(job).is_err() {
323 // No live slots left to take the work — the dispatcher will detect this when
324 // result_rx closes with missing entries.
325 }
326}
327
328/// One persistent ssh child + the framed I/O handles to talk to it. Holds a stderr
329/// drainer thread so a verbose remote `stryke` doesn't fill its pipe and deadlock.
330struct SlotSession {
331 child: Child,
332 stdin: std::process::ChildStdin,
333 /// Channel that receives one `JobRespMsg` per JOB, with a per-job timeout. Backed by a
334 /// helper thread that loops on `read_typed_frame(stdout)` and forwards results.
335 resp_rx: Receiver<Result<JobRespMsg, String>>,
336}
337
338#[derive(Debug)]
339enum SlotError {
340 Transport(String),
341 Timeout,
342}
343
344impl SlotSession {
345 fn open(
346 slot: &RemoteSlot,
347 init: &SessionInit,
348 cluster: &RemoteCluster,
349 ) -> Result<Self, String> {
350 // ssh -o ConnectTimeout=N HOST PE_PATH --remote-worker
351 let connect_timeout = (cluster.connect_timeout_ms / 1000).max(1);
352 let mut child = Command::new("ssh")
353 .arg("-o")
354 .arg(format!("ConnectTimeout={connect_timeout}"))
355 .arg("-o")
356 .arg("BatchMode=yes")
357 .arg(&slot.host)
358 .arg(&slot.pe_path)
359 .arg("--remote-worker")
360 .stdin(Stdio::piped())
361 .stdout(Stdio::piped())
362 .stderr(Stdio::piped())
363 .spawn()
364 .map_err(|e| format!("spawn ssh: {e}"))?;
365 let mut stdin = child
366 .stdin
367 .take()
368 .ok_or_else(|| "ssh stdin missing".to_string())?;
369 let mut stdout = child
370 .stdout
371 .take()
372 .ok_or_else(|| "ssh stdout missing".to_string())?;
373 let mut stderr = child
374 .stderr
375 .take()
376 .ok_or_else(|| "ssh stderr missing".to_string())?;
377
378 // Drain stderr in the background so a verbose worker can't deadlock its pipe.
379 thread::spawn(move || {
380 let mut buf = String::new();
381 let _ = stderr.read_to_string(&mut buf);
382 // Forward to our own stderr prefixed for visibility — operators want to see
383 // remote crashes when debugging cluster runs.
384 if !buf.trim().is_empty() {
385 eprintln!("[remote-worker] {}", buf.trim());
386 }
387 });
388
389 // 1. HELLO. Direct stdin write (the helper-thread response loop hasn't started yet).
390 let hello = HelloMsg {
391 proto_version: PROTO_VERSION,
392 pe_version: env!("CARGO_PKG_VERSION").to_string(),
393 };
394 send_msg(&mut stdin, frame_kind::HELLO, &hello).map_err(|e| format!("send HELLO: {e}"))?;
395 let (kind, body) =
396 read_typed_frame(&mut stdout).map_err(|e| format!("read HELLO_ACK: {e}"))?;
397 if kind != frame_kind::HELLO_ACK {
398 return Err(format!("expected HELLO_ACK, got frame kind {kind:#04x}"));
399 }
400 let _: HelloAck =
401 bincode::deserialize(&body).map_err(|e| format!("decode HELLO_ACK: {e}"))?;
402
403 // 2. SESSION_INIT (`init` is `&SessionInit` via deref coercion from `&Arc<SessionInit>`).
404 send_msg(&mut stdin, frame_kind::SESSION_INIT, init)
405 .map_err(|e| format!("send SESSION_INIT: {e}"))?;
406 let (kind, body) =
407 read_typed_frame(&mut stdout).map_err(|e| format!("read SESSION_ACK: {e}"))?;
408 if kind != frame_kind::SESSION_ACK {
409 return Err(format!("expected SESSION_ACK, got frame kind {kind:#04x}"));
410 }
411 let ack: SessionAck =
412 bincode::deserialize(&body).map_err(|e| format!("decode SESSION_ACK: {e}"))?;
413 if !ack.ok {
414 return Err(format!("worker rejected session: {}", ack.err_msg));
415 }
416
417 // 3. Spin up the response helper thread. Each iteration reads one frame and
418 // forwards either the parsed JobRespMsg or an error string.
419 let (resp_tx, resp_rx) = bounded::<Result<JobRespMsg, String>>(1);
420 thread::spawn(move || loop {
421 match read_typed_frame(&mut stdout) {
422 Ok((kind, body)) if kind == frame_kind::JOB_RESP => {
423 match bincode::deserialize::<JobRespMsg>(&body) {
424 Ok(r) => {
425 if resp_tx.send(Ok(r)).is_err() {
426 return;
427 }
428 }
429 Err(e) => {
430 let _ = resp_tx.send(Err(format!("decode JOB_RESP: {e}")));
431 return;
432 }
433 }
434 }
435 Ok((other, _)) => {
436 let _ = resp_tx.send(Err(format!(
437 "unexpected frame kind {other:#04x} in resp loop"
438 )));
439 return;
440 }
441 Err(e) => {
442 let _ = resp_tx.send(Err(format!("read frame: {e}")));
443 return;
444 }
445 }
446 });
447
448 Ok(Self {
449 child,
450 stdin,
451 resp_rx,
452 })
453 }
454
455 fn run_job(&mut self, job: &DispatchJob, timeout_ms: u64) -> Result<JobRespMsg, SlotError> {
456 let msg = JobMsg {
457 seq: job.seq,
458 item: job.item.clone(),
459 };
460 send_msg(&mut self.stdin, frame_kind::JOB, &msg)
461 .map_err(|e| SlotError::Transport(format!("send JOB: {e}")))?;
462 match self.resp_rx.recv_timeout(Duration::from_millis(timeout_ms)) {
463 Ok(Ok(r)) => Ok(r),
464 Ok(Err(e)) => Err(SlotError::Transport(e)),
465 Err(RecvTimeoutError::Timeout) => Err(SlotError::Timeout),
466 Err(RecvTimeoutError::Disconnected) => {
467 Err(SlotError::Transport("response channel closed".to_string()))
468 }
469 }
470 }
471
472 fn shutdown(&mut self) -> Result<(), String> {
473 // Best-effort SHUTDOWN frame; ignore errors because we're tearing down anyway.
474 let _ = send_msg::<_, ()>(&mut self.stdin, frame_kind::SHUTDOWN, &());
475 let _ = self.child.wait();
476 Ok(())
477 }
478
479 fn kill(&mut self) -> Result<(), String> {
480 let _ = self.child.kill();
481 let _ = self.child.wait();
482 Ok(())
483 }
484}
485
486/// Convenience: marshal a `Vec<PerlValue>` into the JSON values the dispatcher needs.
487pub fn perl_items_to_json(items: &[PerlValue]) -> Result<Vec<serde_json::Value>, String> {
488 items.iter().map(perl_to_json_value).collect()
489}