Skip to main content

stryke/
cluster.rs

1//! Persistent SSH worker pool dispatcher for `pmap_on`.
2//!
3//! ## Architecture
4//!
5//! ```text
6//!                                   ┌── slot 0 (ssh host1) ────┐
7//!                                   │  worker thread + ssh proc │
8//!                                   │  HELLO + SESSION_INIT     │
9//!                                   │  loop: take JOB from work │
10//!                                   │        send + read        │
11//!                                   │        push to results    │
12//!                                   └────────────────────────────┘
13//!                                   ┌── slot 1 (ssh host1) ────┐
14//!                                   │  worker thread + ssh proc │
15//!  main thread                      │  ...                      │
16//!  ┌─────────────────┐              └────────────────────────────┘
17//!  │ enqueue all jobs├──► work_tx ─►┌── slot 2 (ssh host2) ────┐
18//!  │ collect results │              │  ...                      │
19//!  └─────────────────┘              └────────────────────────────┘
20//!         ▲                                    │
21//!         │                                    ▼
22//!         └────────── result_rx ────────────────┘
23//! ```
24//!
25//! Each slot is one persistent `ssh HOST PE_PATH --remote-worker` process. The HELLO and
26//! SESSION_INIT handshakes happen once per slot lifetime, then the slot pulls JOB messages
27//! from a shared crossbeam channel and pushes responses to a result channel. Work-stealing
28//! emerges naturally: fast slots drain the queue faster, slow slots take fewer jobs.
29//!
30//! ## Fault tolerance
31//!
32//! When a slot's read or write fails (ssh died, network blip, remote crash), the worker
33//! thread re-enqueues the in-flight job to the shared queue with `attempts++` and exits.
34//! Other living slots pick the job up. A job is permanently failed when its attempt count
35//! reaches `cluster.max_attempts`. The whole map fails only when **every** slot is dead or
36//! every queued job has exhausted its retry budget.
37//!
38//! ## Per-job timeout
39//!
40//! Each `recv` from a slot's stdout uses a per-slot helper thread + bounded channel so the
41//! main wait is `crossbeam::channel::recv_timeout(cluster.job_timeout_ms)`. On timeout the
42//! ssh child is killed (SIGKILL), the slot is marked dead, and the in-flight job is
43//! re-enqueued just like any other slot failure.
44
45use std::io::Read;
46use std::process::{Child, Command, Stdio};
47use std::sync::Arc;
48use std::thread;
49use std::time::Duration;
50
51use crossbeam::channel::{bounded, select, unbounded, Receiver, RecvTimeoutError, Sender};
52
53use crate::remote_wire::{
54    frame_kind, perl_to_json_value, read_typed_frame, send_msg, HelloAck, HelloMsg, JobMsg,
55    JobRespMsg, SessionAck, SessionInit, PROTO_VERSION,
56};
57use crate::value::{PerlValue, RemoteCluster, RemoteSlot};
58
59/// One unit of work tracked by the dispatcher. Carries the original sequence number for
60/// order-preserving result collection plus an attempt counter for retry accounting.
61#[derive(Debug, Clone)]
62pub struct DispatchJob {
63    pub seq: u64,
64    pub item: serde_json::Value,
65    pub attempts: u32,
66}
67
68/// One result reported back to the main thread. `seq` matches the originating
69/// [`DispatchJob::seq`] so the dispatcher can stitch results back into source order.
70#[derive(Debug)]
71pub struct DispatchResult {
72    pub seq: u64,
73    pub outcome: Result<PerlValue, String>,
74}
75
76/// Run a `pmap_on` against a [`RemoteCluster`]. Blocks until every job has either succeeded
77/// or exhausted its retry budget. Returns the per-item results in the original list order
78/// or the first permanent failure.
79///
80/// `subs_prelude` and `block_src` are sent **once** per slot at session init.
81/// `capture` is the captured-lexical snapshot from the calling scope.
82/// `items` is the list of work items (already JSON-marshalled).
83pub fn run_cluster(
84    cluster: &RemoteCluster,
85    subs_prelude: String,
86    block_src: String,
87    capture: Vec<(String, serde_json::Value)>,
88    items: Vec<serde_json::Value>,
89) -> Result<Vec<PerlValue>, String> {
90    if items.is_empty() {
91        return Ok(Vec::new());
92    }
93    if cluster.slots.is_empty() {
94        return Err("cluster: no slots".to_string());
95    }
96
97    // Shared work queue: every slot pulls from here, and slot threads re-enqueue on failure.
98    // Bounded so a misbehaving producer can't memory-blow; size is `slot_count * 2` to give
99    // each slot something to grab on the next iteration without blocking.
100    let work_capacity = (cluster.slots.len() * 2).max(8);
101    let (work_tx, work_rx) = bounded::<DispatchJob>(work_capacity);
102    let (result_tx, result_rx) = unbounded::<DispatchResult>();
103    // Shutdown signal: slot workers hold their own `work_tx` clones for re-enqueue, so the
104    // work channel never closes on its own once every initial job is sent. When all results
105    // have been collected the main thread drops `shutdown_tx`, which closes `shutdown_rx`
106    // and breaks the slot workers out of their blocking `recv` in `select!`.
107    let (shutdown_tx, shutdown_rx) = bounded::<()>(0);
108
109    // Spawn one worker thread per slot.
110    let mut handles = Vec::with_capacity(cluster.slots.len());
111    let session_init = Arc::new(SessionInit {
112        subs_prelude,
113        block_src,
114        capture,
115    });
116    let cluster_arc = Arc::new(cluster.clone());
117
118    for (slot_idx, slot) in cluster.slots.iter().enumerate() {
119        let slot = slot.clone();
120        let work_rx = work_rx.clone();
121        let work_tx = work_tx.clone();
122        let result_tx = result_tx.clone();
123        let shutdown_rx = shutdown_rx.clone();
124        let init = Arc::clone(&session_init);
125        let cluster = Arc::clone(&cluster_arc);
126        handles.push(thread::spawn(move || {
127            slot_worker_loop(
128                slot_idx,
129                slot,
130                init,
131                cluster,
132                work_rx,
133                work_tx,
134                result_tx,
135                shutdown_rx,
136            );
137        }));
138    }
139
140    // Drop the dispatcher-side handles so closing all slot copies signals queue shutdown.
141    drop(work_rx);
142    drop(result_tx);
143    drop(shutdown_rx);
144
145    // Seed the queue with the initial work.
146    for (i, item) in items.iter().enumerate() {
147        let job = DispatchJob {
148            seq: i as u64,
149            item: item.clone(),
150            attempts: 0,
151        };
152        if work_tx.send(job).is_err() {
153            return Err("cluster: all worker slots died before any work was sent".to_string());
154        }
155    }
156    drop(work_tx); // close once initial enqueue is done; slot threads keep their own clones
157
158    // Collect results in seq order. We allocate the full vector up-front and assign by
159    // index so we don't depend on receive order — slot threads complete jobs in any order.
160    let mut results: Vec<Option<Result<PerlValue, String>>> =
161        (0..items.len()).map(|_| None).collect();
162    let mut received = 0usize;
163    while received < items.len() {
164        match result_rx.recv() {
165            Ok(r) => {
166                let idx = r.seq as usize;
167                if idx < results.len() && results[idx].is_none() {
168                    results[idx] = Some(r.outcome);
169                    received += 1;
170                }
171            }
172            Err(_) => {
173                // All slot threads dropped their senders before we got every result.
174                break;
175            }
176        }
177    }
178
179    // All results (or terminal slot-death) are in. Signal slots to stop pulling new work
180    // from the queue so they can run their SHUTDOWN handshake and exit cleanly. Without
181    // this drop the slot `select!` below would park forever on `work_rx.recv()` because
182    // every slot still holds its own `work_tx` clone for re-enqueue.
183    drop(shutdown_tx);
184
185    // Wait for slot threads to wind down.
186    for h in handles {
187        let _ = h.join();
188    }
189
190    // Stitch results back together; surface the first permanent failure if any.
191    let mut out = Vec::with_capacity(items.len());
192    for (i, slot_result) in results.into_iter().enumerate() {
193        match slot_result {
194            Some(Ok(v)) => out.push(v),
195            Some(Err(e)) => {
196                return Err(format!("cluster: job {i} failed permanently: {e}"));
197            }
198            None => {
199                return Err(format!(
200                    "cluster: job {i} never completed (all slots died?)"
201                ));
202            }
203        }
204    }
205    Ok(out)
206}
207
208/// Per-slot worker thread: spawn ssh, do HELLO + SESSION_INIT, then loop pulling JOBs from
209/// the shared queue. On any I/O failure the in-flight job is re-enqueued (or permanently
210/// failed if it has exhausted its retry budget) and the slot exits.
211#[allow(clippy::too_many_arguments)]
212fn slot_worker_loop(
213    slot_idx: usize,
214    slot: RemoteSlot,
215    init: Arc<SessionInit>,
216    cluster: Arc<RemoteCluster>,
217    work_rx: Receiver<DispatchJob>,
218    work_tx: Sender<DispatchJob>,
219    result_tx: Sender<DispatchResult>,
220    shutdown_rx: Receiver<()>,
221) {
222    // Spawn the ssh child + initial handshake. Failures here mean this slot never makes
223    // any progress; we exit and let other slots drain the queue.
224    let mut session = match SlotSession::open(&slot, &init, &cluster) {
225        Ok(s) => s,
226        Err(e) => {
227            eprintln!(
228                "cluster: slot {slot_idx} ({}) failed to start: {e}",
229                slot.host
230            );
231            return;
232        }
233    };
234
235    loop {
236        // Take one job, or bail out if the dispatcher has signalled shutdown. We can't rely
237        // on `work_rx` closing by itself because every slot holds its own `work_tx` clone
238        // for re-enqueue on transport failure — so the channel would stay open forever once
239        // all initial jobs are drained. The shutdown channel is the explicit wakeup.
240        let job = select! {
241            recv(work_rx) -> r => match r {
242                Ok(j) => j,
243                Err(_) => {
244                    // Queue fully closed (e.g. every slot dropped its `work_tx`) — done.
245                    let _ = session.shutdown();
246                    return;
247                }
248            },
249            recv(shutdown_rx) -> _ => {
250                // Dispatcher collected every result — clean SHUTDOWN frame + child wait.
251                let _ = session.shutdown();
252                return;
253            },
254        };
255
256        match session.run_job(&job, cluster.job_timeout_ms) {
257            Ok(resp) => {
258                if resp.ok {
259                    let pv = match crate::remote_wire::json_to_perl(&resp.result) {
260                        Ok(v) => v,
261                        Err(e) => {
262                            let _ = result_tx.send(DispatchResult {
263                                seq: job.seq,
264                                outcome: Err(format!("decode result: {e}")),
265                            });
266                            continue;
267                        }
268                    };
269                    let _ = result_tx.send(DispatchResult {
270                        seq: job.seq,
271                        outcome: Ok(pv),
272                    });
273                } else {
274                    // Permanent in-script failure — no point retrying, the body is the
275                    // same on every slot. Surface immediately.
276                    let _ = result_tx.send(DispatchResult {
277                        seq: job.seq,
278                        outcome: Err(resp.err_msg),
279                    });
280                }
281            }
282            Err(SlotError::Transport(e)) => {
283                // Wire-level failure — retry on a different slot if budget allows.
284                eprintln!(
285                    "cluster: slot {slot_idx} ({}) transport error: {e}; retrying job {}",
286                    slot.host, job.seq
287                );
288                requeue_or_fail(&work_tx, &result_tx, &cluster, job);
289                let _ = session.kill();
290                return;
291            }
292            Err(SlotError::Timeout) => {
293                eprintln!(
294                    "cluster: slot {slot_idx} ({}) timed out on job {}; retrying",
295                    slot.host, job.seq
296                );
297                requeue_or_fail(&work_tx, &result_tx, &cluster, job);
298                let _ = session.kill();
299                return;
300            }
301        }
302    }
303}
304
305fn requeue_or_fail(
306    work_tx: &Sender<DispatchJob>,
307    result_tx: &Sender<DispatchResult>,
308    cluster: &RemoteCluster,
309    mut job: DispatchJob,
310) {
311    job.attempts += 1;
312    if job.attempts >= cluster.max_attempts {
313        let _ = result_tx.send(DispatchResult {
314            seq: job.seq,
315            outcome: Err(format!(
316                "job exhausted retry budget after {} attempts",
317                job.attempts
318            )),
319        });
320        return;
321    }
322    if work_tx.send(job).is_err() {
323        // No live slots left to take the work — the dispatcher will detect this when
324        // result_rx closes with missing entries.
325    }
326}
327
328/// One persistent ssh child + the framed I/O handles to talk to it. Holds a stderr
329/// drainer thread so a verbose remote `stryke` doesn't fill its pipe and deadlock.
330struct SlotSession {
331    child: Child,
332    stdin: std::process::ChildStdin,
333    /// Channel that receives one `JobRespMsg` per JOB, with a per-job timeout. Backed by a
334    /// helper thread that loops on `read_typed_frame(stdout)` and forwards results.
335    resp_rx: Receiver<Result<JobRespMsg, String>>,
336}
337
338#[derive(Debug)]
339enum SlotError {
340    Transport(String),
341    Timeout,
342}
343
344impl SlotSession {
345    fn open(
346        slot: &RemoteSlot,
347        init: &SessionInit,
348        cluster: &RemoteCluster,
349    ) -> Result<Self, String> {
350        // ssh -o ConnectTimeout=N HOST PE_PATH --remote-worker
351        let connect_timeout = (cluster.connect_timeout_ms / 1000).max(1);
352        let mut child = Command::new("ssh")
353            .arg("-o")
354            .arg(format!("ConnectTimeout={connect_timeout}"))
355            .arg("-o")
356            .arg("BatchMode=yes")
357            .arg(&slot.host)
358            .arg(&slot.pe_path)
359            .arg("--remote-worker")
360            .stdin(Stdio::piped())
361            .stdout(Stdio::piped())
362            .stderr(Stdio::piped())
363            .spawn()
364            .map_err(|e| format!("spawn ssh: {e}"))?;
365        let mut stdin = child
366            .stdin
367            .take()
368            .ok_or_else(|| "ssh stdin missing".to_string())?;
369        let mut stdout = child
370            .stdout
371            .take()
372            .ok_or_else(|| "ssh stdout missing".to_string())?;
373        let mut stderr = child
374            .stderr
375            .take()
376            .ok_or_else(|| "ssh stderr missing".to_string())?;
377
378        // Drain stderr in the background so a verbose worker can't deadlock its pipe.
379        thread::spawn(move || {
380            let mut buf = String::new();
381            let _ = stderr.read_to_string(&mut buf);
382            // Forward to our own stderr prefixed for visibility — operators want to see
383            // remote crashes when debugging cluster runs.
384            if !buf.trim().is_empty() {
385                eprintln!("[remote-worker] {}", buf.trim());
386            }
387        });
388
389        // 1. HELLO. Direct stdin write (the helper-thread response loop hasn't started yet).
390        let hello = HelloMsg {
391            proto_version: PROTO_VERSION,
392            pe_version: env!("CARGO_PKG_VERSION").to_string(),
393        };
394        send_msg(&mut stdin, frame_kind::HELLO, &hello).map_err(|e| format!("send HELLO: {e}"))?;
395        let (kind, body) =
396            read_typed_frame(&mut stdout).map_err(|e| format!("read HELLO_ACK: {e}"))?;
397        if kind != frame_kind::HELLO_ACK {
398            return Err(format!("expected HELLO_ACK, got frame kind {kind:#04x}"));
399        }
400        let _: HelloAck =
401            bincode::deserialize(&body).map_err(|e| format!("decode HELLO_ACK: {e}"))?;
402
403        // 2. SESSION_INIT (`init` is `&SessionInit` via deref coercion from `&Arc<SessionInit>`).
404        send_msg(&mut stdin, frame_kind::SESSION_INIT, init)
405            .map_err(|e| format!("send SESSION_INIT: {e}"))?;
406        let (kind, body) =
407            read_typed_frame(&mut stdout).map_err(|e| format!("read SESSION_ACK: {e}"))?;
408        if kind != frame_kind::SESSION_ACK {
409            return Err(format!("expected SESSION_ACK, got frame kind {kind:#04x}"));
410        }
411        let ack: SessionAck =
412            bincode::deserialize(&body).map_err(|e| format!("decode SESSION_ACK: {e}"))?;
413        if !ack.ok {
414            return Err(format!("worker rejected session: {}", ack.err_msg));
415        }
416
417        // 3. Spin up the response helper thread. Each iteration reads one frame and
418        // forwards either the parsed JobRespMsg or an error string.
419        let (resp_tx, resp_rx) = bounded::<Result<JobRespMsg, String>>(1);
420        thread::spawn(move || loop {
421            match read_typed_frame(&mut stdout) {
422                Ok((kind, body)) if kind == frame_kind::JOB_RESP => {
423                    match bincode::deserialize::<JobRespMsg>(&body) {
424                        Ok(r) => {
425                            if resp_tx.send(Ok(r)).is_err() {
426                                return;
427                            }
428                        }
429                        Err(e) => {
430                            let _ = resp_tx.send(Err(format!("decode JOB_RESP: {e}")));
431                            return;
432                        }
433                    }
434                }
435                Ok((other, _)) => {
436                    let _ = resp_tx.send(Err(format!(
437                        "unexpected frame kind {other:#04x} in resp loop"
438                    )));
439                    return;
440                }
441                Err(e) => {
442                    let _ = resp_tx.send(Err(format!("read frame: {e}")));
443                    return;
444                }
445            }
446        });
447
448        Ok(Self {
449            child,
450            stdin,
451            resp_rx,
452        })
453    }
454
455    fn run_job(&mut self, job: &DispatchJob, timeout_ms: u64) -> Result<JobRespMsg, SlotError> {
456        let msg = JobMsg {
457            seq: job.seq,
458            item: job.item.clone(),
459        };
460        send_msg(&mut self.stdin, frame_kind::JOB, &msg)
461            .map_err(|e| SlotError::Transport(format!("send JOB: {e}")))?;
462        match self.resp_rx.recv_timeout(Duration::from_millis(timeout_ms)) {
463            Ok(Ok(r)) => Ok(r),
464            Ok(Err(e)) => Err(SlotError::Transport(e)),
465            Err(RecvTimeoutError::Timeout) => Err(SlotError::Timeout),
466            Err(RecvTimeoutError::Disconnected) => {
467                Err(SlotError::Transport("response channel closed".to_string()))
468            }
469        }
470    }
471
472    fn shutdown(&mut self) -> Result<(), String> {
473        // Best-effort SHUTDOWN frame; ignore errors because we're tearing down anyway.
474        let _ = send_msg::<_, ()>(&mut self.stdin, frame_kind::SHUTDOWN, &());
475        let _ = self.child.wait();
476        Ok(())
477    }
478
479    fn kill(&mut self) -> Result<(), String> {
480        let _ = self.child.kill();
481        let _ = self.child.wait();
482        Ok(())
483    }
484}
485
486/// Convenience: marshal a `Vec<PerlValue>` into the JSON values the dispatcher needs.
487pub fn perl_items_to_json(items: &[PerlValue]) -> Result<Vec<serde_json::Value>, String> {
488    items.iter().map(perl_to_json_value).collect()
489}