Skip to main content

stryke/
remote_wire.rs

1//! Framed bincode over stdin/stdout for `stryke --remote-worker` (distributed `pmap_on`).
2//!
3//! ## Wire protocol
4//!
5//! Every message is a length-prefixed frame: `[u64 LE length][u8 kind][bincode payload]`.
6//! The single-byte `kind` discriminator lets future revisions add message types without
7//! breaking older workers — an unknown kind is a hard error so version skew is loud.
8//!
9//! ### Message flow (v3 — persistent session)
10//!
11//! ```text
12//! dispatcher                    worker
13//!     │                            │
14//!     │── HELLO ─────────────────►│   (proto version, build id)
15//!     │◄───────────── HELLO_ACK ──│   (worker stryke version, hostname)
16//!     │── SESSION_INIT ──────────►│   (subs prelude, block source, captured lexicals)
17//!     │◄────────── SESSION_ACK ───│   (or ERROR)
18//!     │── JOB(seq=0) ────────────►│   (item)
19//!     │◄────────── JOB_RESP(0) ───│
20//!     │── JOB(seq=1) ────────────►│
21//!     │◄────────── JOB_RESP(1) ───│
22//!     │           ...             │
23//!     │── SHUTDOWN ──────────────►│
24//!     │                            └─ exit 0
25//! ```
26//!
27//! Why this beats the basic v1 protocol: subs prelude + block source ship **once** per
28//! session instead of once per item, the parser+compiler runs once per worker instead of
29//! once per job, and one ssh handshake amortizes across the whole map.
30//!
31//! Dynamic [`serde_json::Value`] fields are embedded as JSON UTF-8 bytes inside the bincode
32//! envelope (v3+). Bincode cannot deserialize `Value` directly (`deserialize_any`); nested
33//! JSON keeps the on-wire type self-describing.
34
35use std::collections::HashMap;
36use std::io::{Read, Write};
37use std::process::{Command, Stdio};
38use std::sync::Arc;
39
40use serde::{Deserialize, Serialize};
41
42use crate::ast::Block;
43use crate::interpreter::{FlowOrError, Interpreter};
44use crate::value::{PerlSub, PerlValue};
45
46/// Frame-kind discriminator. Stored as the first byte of every wire payload after the
47/// length prefix. Sub-byte values are reserved (anything outside the documented set is
48/// rejected with a clean error rather than silently misparsed).
49#[allow(dead_code)]
50pub mod frame_kind {
51    pub const HELLO: u8 = 0x01;
52    pub const HELLO_ACK: u8 = 0x02;
53    pub const SESSION_INIT: u8 = 0x03;
54    pub const SESSION_ACK: u8 = 0x04;
55    pub const JOB: u8 = 0x05;
56    pub const JOB_RESP: u8 = 0x06;
57    pub const SHUTDOWN: u8 = 0x07;
58    pub const ERROR: u8 = 0xFF;
59}
60
61/// Wire protocol version. Bumped whenever the layout of an existing message changes in a
62/// backwards-incompatible way. The HELLO handshake fails fast on version mismatch so
63/// dispatcher and worker never silently disagree on layout.
64pub const PROTO_VERSION: u32 = 3;
65
66mod json_value_bincode {
67    use serde::{Deserialize, Deserializer, Serialize, Serializer};
68
69    pub fn serialize<S>(value: &serde_json::Value, serializer: S) -> Result<S::Ok, S::Error>
70    where
71        S: Serializer,
72    {
73        let buf = serde_json::to_vec(value).map_err(serde::ser::Error::custom)?;
74        buf.serialize(serializer)
75    }
76
77    pub fn deserialize<'de, D>(deserializer: D) -> Result<serde_json::Value, D::Error>
78    where
79        D: Deserializer<'de>,
80    {
81        let buf: Vec<u8> = Vec::deserialize(deserializer)?;
82        serde_json::from_slice(&buf).map_err(serde::de::Error::custom)
83    }
84}
85
86mod capture_json_bincode {
87    use serde::{de::Deserializer, ser::SerializeSeq, Deserialize, Serializer};
88
89    pub fn serialize<S>(v: &[(String, serde_json::Value)], serializer: S) -> Result<S::Ok, S::Error>
90    where
91        S: Serializer,
92    {
93        let mut seq = serializer.serialize_seq(Some(v.len()))?;
94        for (k, val) in v {
95            let enc = serde_json::to_vec(val).map_err(serde::ser::Error::custom)?;
96            seq.serialize_element(&(k, enc))?;
97        }
98        seq.end()
99    }
100
101    pub fn deserialize<'de, D>(
102        deserializer: D,
103    ) -> Result<Vec<(String, serde_json::Value)>, D::Error>
104    where
105        D: Deserializer<'de>,
106    {
107        let raw: Vec<(String, Vec<u8>)> = Vec::deserialize(deserializer)?;
108        let mut out = Vec::with_capacity(raw.len());
109        for (k, enc) in raw {
110            let val = serde_json::from_slice(&enc).map_err(serde::de::Error::custom)?;
111            out.push((k, val));
112        }
113        Ok(out)
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct HelloMsg {
119    pub proto_version: u32,
120    pub pe_version: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HelloAck {
125    pub proto_version: u32,
126    pub pe_version: String,
127    pub hostname: String,
128}
129
130/// Sent **once** per worker session. Carries everything that doesn't change between jobs:
131/// the user's named subs, the `pmap_on` block source, and the captured-lexical snapshot.
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SessionInit {
134    pub subs_prelude: String,
135    pub block_src: String,
136    #[serde(with = "capture_json_bincode")]
137    pub capture: Vec<(String, serde_json::Value)>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SessionAck {
142    pub ok: bool,
143    pub err_msg: String,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct JobMsg {
148    pub seq: u64,
149    #[serde(with = "json_value_bincode")]
150    pub item: serde_json::Value,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct JobRespMsg {
155    pub seq: u64,
156    pub ok: bool,
157    #[serde(with = "json_value_bincode")]
158    pub result: serde_json::Value,
159    pub err_msg: String,
160}
161
162/// Read a typed frame: returns `(kind, body)` where `body` is the bincode payload after
163/// the kind byte. Caller decides how to interpret based on `kind`.
164pub fn read_typed_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
165    let raw = read_framed(r)?;
166    if raw.is_empty() {
167        return Err(std::io::Error::new(
168            std::io::ErrorKind::InvalidData,
169            "remote frame: empty payload (missing kind byte)",
170        ));
171    }
172    let kind = raw[0];
173    Ok((kind, raw[1..].to_vec()))
174}
175
176/// Write a typed frame: prepends the `kind` byte to `payload` and writes one length-prefixed
177/// frame.
178pub fn write_typed_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
179    let mut framed = Vec::with_capacity(payload.len() + 1);
180    framed.push(kind);
181    framed.extend_from_slice(payload);
182    write_framed(w, &framed)
183}
184
185/// Bincode + write helper. The two-step `bincode::serialize` + `write_typed_frame` pattern
186/// is the same in every send site so it lives here once.
187pub fn send_msg<W: Write, T: Serialize>(w: &mut W, kind: u8, msg: &T) -> Result<(), String> {
188    let payload = bincode::serialize(msg).map_err(|e| format!("bincode encode: {e}"))?;
189    write_typed_frame(w, kind, &payload).map_err(|e| format!("write frame: {e}"))
190}
191
192/// Bincode + read helper. Returns the deserialized message and verifies the kind matches.
193pub fn recv_msg<R: Read, T: for<'de> Deserialize<'de>>(
194    r: &mut R,
195    expected_kind: u8,
196) -> Result<T, String> {
197    let (kind, body) = read_typed_frame(r).map_err(|e| format!("read frame: {e}"))?;
198    if kind != expected_kind {
199        return Err(format!(
200            "wire: expected frame kind {:#04x}, got {:#04x}",
201            expected_kind, kind
202        ));
203    }
204    bincode::deserialize(&body).map_err(|e| format!("bincode decode: {e}"))
205}
206
207/// One unit of work executed on a remote `stryke --remote-worker`.
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct RemoteJobV1 {
210    pub seq: u64,
211    pub subs_prelude: String,
212    pub block_src: String,
213    #[serde(with = "capture_json_bincode")]
214    pub capture: Vec<(String, serde_json::Value)>,
215    #[serde(with = "json_value_bincode")]
216    pub item: serde_json::Value,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct RemoteRespV1 {
221    pub seq: u64,
222    pub ok: bool,
223    #[serde(with = "json_value_bincode")]
224    pub result: serde_json::Value,
225    pub err_msg: String,
226}
227
228const MAX_FRAME: usize = 256 * 1024 * 1024;
229
230pub fn write_framed<W: Write>(w: &mut W, payload: &[u8]) -> std::io::Result<()> {
231    w.write_all(&(payload.len() as u64).to_le_bytes())?;
232    w.write_all(payload)?;
233    w.flush()?;
234    Ok(())
235}
236
237pub fn read_framed<R: Read>(r: &mut R) -> std::io::Result<Vec<u8>> {
238    let mut h = [0u8; 8];
239    r.read_exact(&mut h)?;
240    let n = u64::from_le_bytes(h) as usize;
241    if n > MAX_FRAME {
242        return Err(std::io::Error::new(
243            std::io::ErrorKind::InvalidData,
244            format!("remote frame too large: {n}"),
245        ));
246    }
247    let mut v = vec![0u8; n];
248    r.read_exact(&mut v)?;
249    Ok(v)
250}
251
252pub fn encode_job(job: &RemoteJobV1) -> Result<Vec<u8>, String> {
253    bincode::serialize(job).map_err(|e| e.to_string())
254}
255
256pub fn decode_job(bytes: &[u8]) -> Result<RemoteJobV1, String> {
257    bincode::deserialize(bytes).map_err(|e| e.to_string())
258}
259
260pub fn encode_resp(resp: &RemoteRespV1) -> Result<Vec<u8>, String> {
261    bincode::serialize(resp).map_err(|e| e.to_string())
262}
263
264pub fn decode_resp(bytes: &[u8]) -> Result<RemoteRespV1, String> {
265    bincode::deserialize(bytes).map_err(|e| e.to_string())
266}
267
268pub fn perl_to_json_value(v: &PerlValue) -> Result<serde_json::Value, String> {
269    if v.is_undef() {
270        return Ok(serde_json::Value::Null);
271    }
272    if let Some(i) = v.as_integer() {
273        return Ok(serde_json::json!(i));
274    }
275    if let Some(f) = v.as_float() {
276        return Ok(serde_json::json!(f));
277    }
278    if v.is_string_like() {
279        return Ok(serde_json::Value::String(v.to_string()));
280    }
281    if let Some(a) = v.as_array_vec() {
282        let mut out = Vec::with_capacity(a.len());
283        for x in a {
284            out.push(perl_to_json_value(&x)?);
285        }
286        return Ok(serde_json::Value::Array(out));
287    }
288    if let Some(h) = v.as_hash_map() {
289        let mut m = serde_json::Map::new();
290        for (k, val) in h {
291            m.insert(k.clone(), perl_to_json_value(&val)?);
292        }
293        return Ok(serde_json::Value::Object(m));
294    }
295    Err(format!(
296        "value not supported for remote pmap (need null, bool/int/float/string/array/hash): {}",
297        v.type_name()
298    ))
299}
300
301pub fn json_to_perl(v: &serde_json::Value) -> Result<PerlValue, String> {
302    Ok(match v {
303        serde_json::Value::Null => PerlValue::UNDEF,
304        serde_json::Value::Bool(b) => PerlValue::integer(if *b { 1 } else { 0 }),
305        serde_json::Value::Number(n) => {
306            if let Some(i) = n.as_i64() {
307                PerlValue::integer(i)
308            } else if let Some(u) = n.as_u64() {
309                PerlValue::integer(u as i64)
310            } else {
311                PerlValue::float(n.as_f64().unwrap_or(0.0))
312            }
313        }
314        serde_json::Value::String(s) => PerlValue::string(s.clone()),
315        serde_json::Value::Array(a) => {
316            let mut items = Vec::with_capacity(a.len());
317            for x in a {
318                items.push(json_to_perl(x)?);
319            }
320            PerlValue::array(items)
321        }
322        serde_json::Value::Object(o) => {
323            let mut map = indexmap::IndexMap::new();
324            for (k, val) in o {
325                map.insert(k.clone(), json_to_perl(val)?);
326            }
327            PerlValue::hash(map)
328        }
329    })
330}
331
332pub fn capture_entries_to_json(
333    entries: &[(String, PerlValue)],
334) -> Result<Vec<(String, serde_json::Value)>, String> {
335    let mut out = Vec::with_capacity(entries.len());
336    for (k, v) in entries {
337        out.push((k.clone(), perl_to_json_value(v)?));
338    }
339    Ok(out)
340}
341
342pub fn build_subs_prelude(subs: &HashMap<String, Arc<PerlSub>>) -> String {
343    let mut names: Vec<_> = subs.keys().cloned().collect();
344    names.sort();
345    let mut s = String::new();
346    for name in names {
347        let sub = &subs[&name];
348        if sub.closure_env.is_some() {
349            continue;
350        }
351        let sig = if !sub.params.is_empty() {
352            format!(
353                " ({})",
354                sub.params
355                    .iter()
356                    .map(crate::fmt::format_sub_sig_param)
357                    .collect::<Vec<_>>()
358                    .join(", ")
359            )
360        } else if let Some(ref p) = sub.prototype {
361            format!(" ({})", p)
362        } else {
363            String::new()
364        };
365        let body = crate::fmt::format_block(&sub.body);
366        s.push_str(&format!("fn {}{} {{\n{}\n}}\n", name, sig, body));
367    }
368    s
369}
370
371/// Run one job in-process (for tests / local debugging).
372pub fn run_job_local(job: &RemoteJobV1) -> RemoteRespV1 {
373    let mut interp = Interpreter::new();
374    let cap: Vec<(String, PerlValue)> = match job
375        .capture
376        .iter()
377        .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
378        .collect()
379    {
380        Ok(c) => c,
381        Err(e) => {
382            return RemoteRespV1 {
383                seq: job.seq,
384                ok: false,
385                result: serde_json::Value::Null,
386                err_msg: e,
387            };
388        }
389    };
390    interp.scope_push_hook();
391    interp.scope.restore_capture(&cap);
392    let item_pv = match json_to_perl(&job.item) {
393        Ok(v) => v,
394        Err(e) => {
395            interp.scope_pop_hook();
396            return RemoteRespV1 {
397                seq: job.seq,
398                ok: false,
399                result: serde_json::Value::Null,
400                err_msg: e,
401            };
402        }
403    };
404    interp.scope.set_topic(item_pv);
405    let full_src = format!("{}\n{}", job.subs_prelude, job.block_src);
406    let prog = match crate::parse(&full_src) {
407        Ok(p) => p,
408        Err(e) => {
409            interp.scope_pop_hook();
410            return RemoteRespV1 {
411                seq: job.seq,
412                ok: false,
413                result: serde_json::Value::Null,
414                err_msg: e.message,
415            };
416        }
417    };
418    let block: Block = prog.statements;
419    let r = match interp.exec_block_smart(&block) {
420        Ok(v) => v,
421        Err(e) => {
422            interp.scope_pop_hook();
423            let msg = match e {
424                FlowOrError::Error(stryke) => stryke.to_string(),
425                FlowOrError::Flow(f) => format!("unexpected control flow: {:?}", f),
426            };
427            return RemoteRespV1 {
428                seq: job.seq,
429                ok: false,
430                result: serde_json::Value::Null,
431                err_msg: msg,
432            };
433        }
434    };
435    interp.scope_pop_hook();
436    match perl_to_json_value(&r) {
437        Ok(j) => RemoteRespV1 {
438            seq: job.seq,
439            ok: true,
440            result: j,
441            err_msg: String::new(),
442        },
443        Err(e) => RemoteRespV1 {
444            seq: job.seq,
445            ok: false,
446            result: serde_json::Value::Null,
447            err_msg: e,
448        },
449    }
450}
451
452/// Persistent v3 worker session: handles many jobs over a single stdin/stdout pair, with
453/// one Interpreter and one parsed block shared across the whole session.
454///
455/// Protocol order: HELLO → HELLO_ACK → SESSION_INIT → SESSION_ACK → JOB / JOB_RESP loop
456/// → SHUTDOWN → exit. Any wire error or unknown frame kind causes a clean non-zero exit so
457/// the dispatcher can re-route in-flight jobs to a different slot.
458///
459/// Why this beats the basic v1 [`run_remote_worker_stdio`]: subs prelude + block source
460/// ship **once** per session instead of per-item, parser+compiler runs once per worker,
461/// and one ssh handshake amortizes across the whole map.
462pub fn run_remote_worker_session() -> i32 {
463    let stdin = std::io::stdin();
464    let mut stdin = stdin.lock();
465    let mut stdout = std::io::stdout();
466
467    // 1. HELLO handshake. Dispatcher sends first; we reply with our build info.
468    let hello: HelloMsg = match recv_msg(&mut stdin, frame_kind::HELLO) {
469        Ok(h) => h,
470        Err(e) => {
471            let _ = writeln!(std::io::stderr(), "remote-worker: hello: {e}");
472            return 1;
473        }
474    };
475    if hello.proto_version != PROTO_VERSION {
476        let _ = writeln!(
477            std::io::stderr(),
478            "remote-worker: proto version mismatch (dispatcher {} vs worker {})",
479            hello.proto_version,
480            PROTO_VERSION
481        );
482        return 1;
483    }
484    let ack = HelloAck {
485        proto_version: PROTO_VERSION,
486        pe_version: env!("CARGO_PKG_VERSION").to_string(),
487        hostname: hostname_or_unknown(),
488    };
489    if let Err(e) = send_msg(&mut stdout, frame_kind::HELLO_ACK, &ack) {
490        let _ = writeln!(std::io::stderr(), "remote-worker: hello ack: {e}");
491        return 1;
492    }
493
494    // 2. SESSION_INIT: subs prelude + block source + captured lexicals.
495    let init: SessionInit = match recv_msg(&mut stdin, frame_kind::SESSION_INIT) {
496        Ok(i) => i,
497        Err(e) => {
498            let _ = writeln!(std::io::stderr(), "remote-worker: session init: {e}");
499            return 1;
500        }
501    };
502
503    // Parse subs prelude ONCE so they're registered for every JOB; parse block ONCE so we
504    // can hand the same `Block` to `exec_block_smart` per item without re-parsing.
505    let mut interp = Interpreter::new();
506    let prelude_program = match crate::parse(&init.subs_prelude) {
507        Ok(p) => p,
508        Err(e) => {
509            let nack = SessionAck {
510                ok: false,
511                err_msg: format!("parse subs prelude: {}", e.message),
512            };
513            let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
514            return 2;
515        }
516    };
517    let block_program = match crate::parse(&init.block_src) {
518        Ok(p) => p,
519        Err(e) => {
520            let nack = SessionAck {
521                ok: false,
522                err_msg: format!("parse block: {}", e.message),
523            };
524            let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
525            return 2;
526        }
527    };
528
529    // Restore captured lexicals once per session — they don't change across jobs.
530    let cap_pv: Vec<(String, PerlValue)> = match init
531        .capture
532        .iter()
533        .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
534        .collect()
535    {
536        Ok(c) => c,
537        Err(e) => {
538            let nack = SessionAck {
539                ok: false,
540                err_msg: format!("decode capture: {e}"),
541            };
542            let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
543            return 2;
544        }
545    };
546    interp.scope_push_hook();
547    interp.scope.restore_capture(&cap_pv);
548
549    // Run the prelude (sub decls) once. After this every JOB has the user's named subs in
550    // scope without re-parsing or re-executing the prelude per item.
551    if let Err(e) = interp.execute(&prelude_program) {
552        let nack = SessionAck {
553            ok: false,
554            err_msg: format!("session prelude: {e}"),
555        };
556        let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
557        return 2;
558    }
559
560    let ack = SessionAck {
561        ok: true,
562        err_msg: String::new(),
563    };
564    if let Err(e) = send_msg(&mut stdout, frame_kind::SESSION_ACK, &ack) {
565        let _ = writeln!(std::io::stderr(), "remote-worker: session ack: {e}");
566        return 1;
567    }
568
569    let block: Block = block_program.statements;
570
571    // 3. JOB loop. Each iteration sets `$_ = item`, re-evaluates the cached block, and
572    // sends back the result. The Interpreter is reused — sub registrations, package state,
573    // anything mutated by SESSION_INIT persists across jobs.
574    loop {
575        let (kind, body) = match read_typed_frame(&mut stdin) {
576            Ok(p) => p,
577            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return 0,
578            Err(e) => {
579                let _ = writeln!(std::io::stderr(), "remote-worker: read job: {e}");
580                return 1;
581            }
582        };
583        match kind {
584            frame_kind::JOB => {
585                let job: JobMsg = match bincode::deserialize(&body) {
586                    Ok(j) => j,
587                    Err(e) => {
588                        let resp = JobRespMsg {
589                            seq: 0,
590                            ok: false,
591                            result: serde_json::Value::Null,
592                            err_msg: format!("decode job: {e}"),
593                        };
594                        let _ = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp);
595                        continue;
596                    }
597                };
598                let resp = run_one_session_job(&mut interp, &block, &job);
599                if let Err(e) = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp) {
600                    let _ = writeln!(std::io::stderr(), "remote-worker: write resp: {e}");
601                    return 1;
602                }
603            }
604            frame_kind::SHUTDOWN => return 0,
605            other => {
606                let _ = writeln!(
607                    std::io::stderr(),
608                    "remote-worker: unexpected frame kind {:#04x} in JOB loop",
609                    other
610                );
611                return 1;
612            }
613        }
614    }
615}
616
617/// Run one JOB inside an active session. Sets `$_` to the item, evaluates the cached block,
618/// returns the JSON-marshalled result. Preserves Interpreter state across jobs so anything
619/// the prelude installed (named subs, package vars) stays live.
620fn run_one_session_job(interp: &mut Interpreter, block: &Block, job: &JobMsg) -> JobRespMsg {
621    let item_pv = match json_to_perl(&job.item) {
622        Ok(v) => v,
623        Err(e) => {
624            return JobRespMsg {
625                seq: job.seq,
626                ok: false,
627                result: serde_json::Value::Null,
628                err_msg: e,
629            };
630        }
631    };
632    interp.scope.set_topic(item_pv);
633    let r = match interp.exec_block_smart(block) {
634        Ok(v) => v,
635        Err(FlowOrError::Error(stryke)) => {
636            return JobRespMsg {
637                seq: job.seq,
638                ok: false,
639                result: serde_json::Value::Null,
640                err_msg: stryke.to_string(),
641            };
642        }
643        Err(FlowOrError::Flow(f)) => {
644            return JobRespMsg {
645                seq: job.seq,
646                ok: false,
647                result: serde_json::Value::Null,
648                err_msg: format!("unexpected control flow: {:?}", f),
649            };
650        }
651    };
652    match perl_to_json_value(&r) {
653        Ok(j) => JobRespMsg {
654            seq: job.seq,
655            ok: true,
656            result: j,
657            err_msg: String::new(),
658        },
659        Err(e) => JobRespMsg {
660            seq: job.seq,
661            ok: false,
662            result: serde_json::Value::Null,
663            err_msg: e,
664        },
665    }
666}
667
668fn hostname_or_unknown() -> String {
669    std::env::var("HOSTNAME").unwrap_or_else(|_| {
670        std::process::Command::new("hostname")
671            .output()
672            .ok()
673            .and_then(|o| String::from_utf8(o.stdout).ok())
674            .map(|s| s.trim().to_string())
675            .unwrap_or_else(|| "unknown".to_string())
676    })
677}
678
679/// stdin/stdout worker loop: one framed request → one framed response, then exit 0.
680pub fn run_remote_worker_stdio() -> i32 {
681    let stdin = std::io::stdin();
682    let mut stdin = stdin.lock();
683    let mut stdout = std::io::stdout();
684    let payload = match read_framed(&mut stdin) {
685        Ok(p) => p,
686        Err(e) => {
687            let _ = writeln!(std::io::stderr(), "remote-worker: read frame: {e}");
688            return 1;
689        }
690    };
691    let job = match decode_job(&payload) {
692        Ok(j) => j,
693        Err(e) => {
694            let _ = writeln!(std::io::stderr(), "remote-worker: decode job: {e}");
695            return 1;
696        }
697    };
698    let resp = run_job_local(&job);
699    let out = match encode_resp(&resp) {
700        Ok(b) => b,
701        Err(e) => {
702            let _ = writeln!(std::io::stderr(), "remote-worker: encode resp: {e}");
703            return 1;
704        }
705    };
706    if let Err(e) = write_framed(&mut stdout, &out) {
707        let _ = writeln!(std::io::stderr(), "remote-worker: write frame: {e}");
708        return 1;
709    }
710    if resp.ok {
711        0
712    } else {
713        let _ = writeln!(std::io::stderr(), "remote-worker: {}", resp.err_msg);
714        2
715    }
716}
717
718pub fn ssh_invoke_remote_worker(
719    host: &str,
720    pe_bin: &str,
721    job: &RemoteJobV1,
722) -> Result<RemoteRespV1, String> {
723    let payload = encode_job(job)?;
724    let mut child = Command::new("ssh")
725        .arg(host)
726        .arg(pe_bin)
727        .arg("--remote-worker")
728        .stdin(Stdio::piped())
729        .stdout(Stdio::piped())
730        .stderr(Stdio::piped())
731        .spawn()
732        .map_err(|e| format!("ssh: {e}"))?;
733    let mut stdin = child.stdin.take().ok_or_else(|| "ssh: stdin".to_string())?;
734    write_framed(&mut stdin, &payload).map_err(|e| format!("ssh stdin: {e}"))?;
735    drop(stdin);
736    let mut stdout = child
737        .stdout
738        .take()
739        .ok_or_else(|| "ssh: stdout".to_string())?;
740    let mut stderr = child
741        .stderr
742        .take()
743        .ok_or_else(|| "ssh: stderr".to_string())?;
744    let stderr_task = std::thread::spawn(move || {
745        let mut s = String::new();
746        let _ = stderr.read_to_string(&mut s);
747        s
748    });
749    let out_bytes = read_framed(&mut stdout).map_err(|e| format!("ssh read frame: {e}"))?;
750    let status = child.wait().map_err(|e| format!("ssh wait: {e}"))?;
751    let stderr_text = stderr_task.join().unwrap_or_default();
752    if !status.success() {
753        return Err(format!(
754            "ssh remote stryke exited {:?}: {}",
755            status.code(),
756            stderr_text.trim()
757        ));
758    }
759    decode_resp(&out_bytes).map_err(|e| {
760        format!(
761            "decode remote response: {e}; stderr: {}",
762            stderr_text.trim()
763        )
764    })
765}
766
767#[cfg(test)]
768mod tests {
769    use super::*;
770
771    #[test]
772    fn job_resp_msg_bincode_roundtrip() {
773        let msg = JobRespMsg {
774            seq: 1,
775            ok: true,
776            result: serde_json::json!(42i64),
777            err_msg: String::new(),
778        };
779        let bytes = bincode::serialize(&msg).unwrap();
780        let back: JobRespMsg = bincode::deserialize(&bytes).unwrap();
781        assert_eq!(back.seq, msg.seq);
782        assert_eq!(back.ok, msg.ok);
783        assert_eq!(back.result, msg.result);
784        assert_eq!(back.err_msg, msg.err_msg);
785    }
786
787    #[test]
788    fn local_roundtrip_doubles() {
789        let job = RemoteJobV1 {
790            seq: 0,
791            subs_prelude: String::new(),
792            block_src: "$_ * 2;".to_string(),
793            capture: vec![],
794            item: serde_json::json!(21),
795        };
796        let r = run_job_local(&job);
797        assert!(r.ok, "{}", r.err_msg);
798        assert_eq!(r.result, serde_json::json!(42));
799    }
800}