Skip to main content

stryke/
remote_wire.rs

1//! Framed bincode over stdin/stdout for `stryke --remote-worker` (distributed `pmap_on`).
2//!
3//! ## Wire protocol
4//!
5//! Every message is a length-prefixed frame: `[u64 LE length][u8 kind][bincode payload]`.
6//! The single-byte `kind` discriminator lets future revisions add message types without
7//! breaking older workers — an unknown kind is a hard error so version skew is loud.
8//!
9//! ### Message flow (v3 — persistent session)
10//!
11//! ```text
12//! dispatcher                    worker
13//!     │                            │
14//!     │── HELLO ─────────────────►│   (proto version, build id)
15//!     │◄───────────── HELLO_ACK ──│   (worker stryke version, hostname)
16//!     │── SESSION_INIT ──────────►│   (subs prelude, block source, captured lexicals)
17//!     │◄────────── SESSION_ACK ───│   (or ERROR)
18//!     │── JOB(seq=0) ────────────►│   (item)
19//!     │◄────────── JOB_RESP(0) ───│
20//!     │── JOB(seq=1) ────────────►│
21//!     │◄────────── JOB_RESP(1) ───│
22//!     │           ...             │
23//!     │── SHUTDOWN ──────────────►│
24//!     │                            └─ exit 0
25//! ```
26//!
27//! Why this beats the basic v1 protocol: subs prelude + block source ship **once** per
28//! session instead of once per item, the parser+compiler runs once per worker instead of
29//! once per job, and one ssh handshake amortizes across the whole map.
30//!
31//! Dynamic [`serde_json::Value`] fields are embedded as JSON UTF-8 bytes inside the bincode
32//! envelope (v3+). Bincode cannot deserialize `Value` directly (`deserialize_any`); nested
33//! JSON keeps the on-wire type self-describing.
34
35use std::collections::HashMap;
36use std::io::{Read, Write};
37use std::process::{Command, Stdio};
38use std::sync::Arc;
39
40use serde::{Deserialize, Serialize};
41
42use crate::ast::Block;
43use crate::value::{StrykeSub, StrykeValue};
44use crate::vm_helper::{FlowOrError, VMHelper};
45
46/// Frame-kind discriminator. Stored as the first byte of every wire payload after the
47/// length prefix. Sub-byte values are reserved (anything outside the documented set is
48/// rejected with a clean error rather than silently misparsed).
49#[allow(dead_code)]
50pub mod frame_kind {
51    pub const HELLO: u8 = 0x01;
52    pub const HELLO_ACK: u8 = 0x02;
53    pub const SESSION_INIT: u8 = 0x03;
54    pub const SESSION_ACK: u8 = 0x04;
55    pub const JOB: u8 = 0x05;
56    pub const JOB_RESP: u8 = 0x06;
57    pub const SHUTDOWN: u8 = 0x07;
58    pub const ERROR: u8 = 0xFF;
59}
60
61/// Wire protocol version. Bumped whenever the layout of an existing message changes in a
62/// backwards-incompatible way. The HELLO handshake fails fast on version mismatch so
63/// dispatcher and worker never silently disagree on layout.
64pub const PROTO_VERSION: u32 = 3;
65
66mod json_value_bincode {
67    use serde::{Deserialize, Deserializer, Serialize, Serializer};
68
69    pub fn serialize<S>(value: &serde_json::Value, serializer: S) -> Result<S::Ok, S::Error>
70    where
71        S: Serializer,
72    {
73        let buf = serde_json::to_vec(value).map_err(serde::ser::Error::custom)?;
74        buf.serialize(serializer)
75    }
76
77    pub fn deserialize<'de, D>(deserializer: D) -> Result<serde_json::Value, D::Error>
78    where
79        D: Deserializer<'de>,
80    {
81        let buf: Vec<u8> = Vec::deserialize(deserializer)?;
82        serde_json::from_slice(&buf).map_err(serde::de::Error::custom)
83    }
84}
85
86mod capture_json_bincode {
87    use serde::{de::Deserializer, ser::SerializeSeq, Deserialize, Serializer};
88
89    pub fn serialize<S>(v: &[(String, serde_json::Value)], serializer: S) -> Result<S::Ok, S::Error>
90    where
91        S: Serializer,
92    {
93        let mut seq = serializer.serialize_seq(Some(v.len()))?;
94        for (k, val) in v {
95            let enc = serde_json::to_vec(val).map_err(serde::ser::Error::custom)?;
96            seq.serialize_element(&(k, enc))?;
97        }
98        seq.end()
99    }
100
101    pub fn deserialize<'de, D>(
102        deserializer: D,
103    ) -> Result<Vec<(String, serde_json::Value)>, D::Error>
104    where
105        D: Deserializer<'de>,
106    {
107        let raw: Vec<(String, Vec<u8>)> = Vec::deserialize(deserializer)?;
108        let mut out = Vec::with_capacity(raw.len());
109        for (k, enc) in raw {
110            let val = serde_json::from_slice(&enc).map_err(serde::de::Error::custom)?;
111            out.push((k, val));
112        }
113        Ok(out)
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct HelloMsg {
119    pub proto_version: u32,
120    pub pe_version: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HelloAck {
125    pub proto_version: u32,
126    pub pe_version: String,
127    pub hostname: String,
128}
129
130/// Sent **once** per worker session. Carries everything that doesn't change between jobs:
131/// the user's named subs, the `pmap_on` block source, and the captured-lexical snapshot.
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SessionInit {
134    pub subs_prelude: String,
135    pub block_src: String,
136    #[serde(with = "capture_json_bincode")]
137    pub capture: Vec<(String, serde_json::Value)>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SessionAck {
142    pub ok: bool,
143    pub err_msg: String,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct JobMsg {
148    pub seq: u64,
149    #[serde(with = "json_value_bincode")]
150    pub item: serde_json::Value,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct JobRespMsg {
155    pub seq: u64,
156    pub ok: bool,
157    #[serde(with = "json_value_bincode")]
158    pub result: serde_json::Value,
159    pub err_msg: String,
160}
161
162/// Read a typed frame: returns `(kind, body)` where `body` is the bincode payload after
163/// the kind byte. Caller decides how to interpret based on `kind`.
164pub fn read_typed_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
165    let raw = read_framed(r)?;
166    if raw.is_empty() {
167        return Err(std::io::Error::new(
168            std::io::ErrorKind::InvalidData,
169            "remote frame: empty payload (missing kind byte)",
170        ));
171    }
172    let kind = raw[0];
173    Ok((kind, raw[1..].to_vec()))
174}
175
176/// Write a typed frame: prepends the `kind` byte to `payload` and writes one length-prefixed
177/// frame.
178pub fn write_typed_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
179    let mut framed = Vec::with_capacity(payload.len() + 1);
180    framed.push(kind);
181    framed.extend_from_slice(payload);
182    write_framed(w, &framed)
183}
184
185/// Bincode + write helper. The two-step `bincode::serialize` + `write_typed_frame` pattern
186/// is the same in every send site so it lives here once.
187pub fn send_msg<W: Write, T: Serialize>(w: &mut W, kind: u8, msg: &T) -> Result<(), String> {
188    let payload = bincode::serialize(msg).map_err(|e| format!("bincode encode: {e}"))?;
189    write_typed_frame(w, kind, &payload).map_err(|e| format!("write frame: {e}"))
190}
191
192/// Bincode + read helper. Returns the deserialized message and verifies the kind matches.
193pub fn recv_msg<R: Read, T: for<'de> Deserialize<'de>>(
194    r: &mut R,
195    expected_kind: u8,
196) -> Result<T, String> {
197    let (kind, body) = read_typed_frame(r).map_err(|e| format!("read frame: {e}"))?;
198    if kind != expected_kind {
199        return Err(format!(
200            "wire: expected frame kind {:#04x}, got {:#04x}",
201            expected_kind, kind
202        ));
203    }
204    bincode::deserialize(&body).map_err(|e| format!("bincode decode: {e}"))
205}
206
207/// One unit of work executed on a remote `stryke --remote-worker`.
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct RemoteJobV1 {
210    pub seq: u64,
211    pub subs_prelude: String,
212    pub block_src: String,
213    #[serde(with = "capture_json_bincode")]
214    pub capture: Vec<(String, serde_json::Value)>,
215    #[serde(with = "json_value_bincode")]
216    pub item: serde_json::Value,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct RemoteRespV1 {
221    pub seq: u64,
222    pub ok: bool,
223    #[serde(with = "json_value_bincode")]
224    pub result: serde_json::Value,
225    pub err_msg: String,
226}
227
228const MAX_FRAME: usize = 256 * 1024 * 1024;
229
230pub fn write_framed<W: Write>(w: &mut W, payload: &[u8]) -> std::io::Result<()> {
231    w.write_all(&(payload.len() as u64).to_le_bytes())?;
232    w.write_all(payload)?;
233    w.flush()?;
234    Ok(())
235}
236
237pub fn read_framed<R: Read>(r: &mut R) -> std::io::Result<Vec<u8>> {
238    let mut h = [0u8; 8];
239    r.read_exact(&mut h)?;
240    let n = u64::from_le_bytes(h) as usize;
241    if n > MAX_FRAME {
242        return Err(std::io::Error::new(
243            std::io::ErrorKind::InvalidData,
244            format!("remote frame too large: {n}"),
245        ));
246    }
247    let mut v = vec![0u8; n];
248    r.read_exact(&mut v)?;
249    Ok(v)
250}
251
252pub fn encode_job(job: &RemoteJobV1) -> Result<Vec<u8>, String> {
253    bincode::serialize(job).map_err(|e| e.to_string())
254}
255
256pub fn decode_job(bytes: &[u8]) -> Result<RemoteJobV1, String> {
257    bincode::deserialize(bytes).map_err(|e| e.to_string())
258}
259
260pub fn encode_resp(resp: &RemoteRespV1) -> Result<Vec<u8>, String> {
261    bincode::serialize(resp).map_err(|e| e.to_string())
262}
263
264pub fn decode_resp(bytes: &[u8]) -> Result<RemoteRespV1, String> {
265    bincode::deserialize(bytes).map_err(|e| e.to_string())
266}
267
268pub fn perl_to_json_value(v: &StrykeValue) -> Result<serde_json::Value, String> {
269    if v.is_undef() {
270        return Ok(serde_json::Value::Null);
271    }
272    if let Some(i) = v.as_integer() {
273        return Ok(serde_json::json!(i));
274    }
275    if let Some(f) = v.as_float() {
276        return Ok(serde_json::json!(f));
277    }
278    if v.is_string_like() {
279        return Ok(serde_json::Value::String(v.to_string()));
280    }
281    if let Some(a) = v.as_array_vec() {
282        let mut out = Vec::with_capacity(a.len());
283        for x in a {
284            out.push(perl_to_json_value(&x)?);
285        }
286        return Ok(serde_json::Value::Array(out));
287    }
288    // Arrayref / hashref carry the same shape as flat array / hash for
289    // JSON — there's no ref/value distinction over the wire. Without this
290    // branch a stage block that ends in `[ ... ]` (used by `~d>` to keep
291    // list shape across the worker's scalar-return boundary) would fail
292    // with "value not supported for remote pmap".
293    if let Some(ar) = v.as_array_ref() {
294        let guard = ar.read();
295        let mut out = Vec::with_capacity(guard.len());
296        for x in guard.iter() {
297            out.push(perl_to_json_value(x)?);
298        }
299        return Ok(serde_json::Value::Array(out));
300    }
301    if let Some(h) = v.as_hash_map() {
302        let mut m = serde_json::Map::new();
303        for (k, val) in h {
304            m.insert(k.clone(), perl_to_json_value(&val)?);
305        }
306        return Ok(serde_json::Value::Object(m));
307    }
308    if let Some(hr) = v.as_hash_ref() {
309        let guard = hr.read();
310        let mut m = serde_json::Map::new();
311        for (k, val) in guard.iter() {
312            m.insert(k.clone(), perl_to_json_value(val)?);
313        }
314        return Ok(serde_json::Value::Object(m));
315    }
316    Err(format!(
317        "value not supported for remote pmap (need null, bool/int/float/string/array/hash): {}",
318        v.type_name()
319    ))
320}
321
322pub fn json_to_perl(v: &serde_json::Value) -> Result<StrykeValue, String> {
323    Ok(match v {
324        serde_json::Value::Null => StrykeValue::UNDEF,
325        serde_json::Value::Bool(b) => StrykeValue::integer(if *b { 1 } else { 0 }),
326        serde_json::Value::Number(n) => {
327            if let Some(i) = n.as_i64() {
328                StrykeValue::integer(i)
329            } else if let Some(u) = n.as_u64() {
330                StrykeValue::integer(u as i64)
331            } else {
332                StrykeValue::float(n.as_f64().unwrap_or(0.0))
333            }
334        }
335        serde_json::Value::String(s) => StrykeValue::string(s.clone()),
336        serde_json::Value::Array(a) => {
337            let mut items = Vec::with_capacity(a.len());
338            for x in a {
339                items.push(json_to_perl(x)?);
340            }
341            StrykeValue::array(items)
342        }
343        serde_json::Value::Object(o) => {
344            let mut map = indexmap::IndexMap::new();
345            for (k, val) in o {
346                map.insert(k.clone(), json_to_perl(val)?);
347            }
348            StrykeValue::hash(map)
349        }
350    })
351}
352
353pub fn capture_entries_to_json(
354    entries: &[(String, StrykeValue)],
355) -> Result<Vec<(String, serde_json::Value)>, String> {
356    let mut out = Vec::with_capacity(entries.len());
357    for (k, v) in entries {
358        out.push((k.clone(), perl_to_json_value(v)?));
359    }
360    Ok(out)
361}
362
363pub fn build_subs_prelude(subs: &HashMap<String, Arc<StrykeSub>>) -> String {
364    let mut names: Vec<_> = subs.keys().cloned().collect();
365    names.sort();
366    let mut s = String::new();
367    for name in names {
368        let sub = &subs[&name];
369        if sub.closure_env.is_some() {
370            continue;
371        }
372        let sig = if !sub.params.is_empty() {
373            format!(
374                " ({})",
375                sub.params
376                    .iter()
377                    .map(crate::fmt::format_sub_sig_param)
378                    .collect::<Vec<_>>()
379                    .join(", ")
380            )
381        } else if let Some(ref p) = sub.prototype {
382            format!(" ({})", p)
383        } else {
384            String::new()
385        };
386        let body = crate::fmt::format_block(&sub.body);
387        s.push_str(&format!("fn {}{} {{\n{}\n}}\n", name, sig, body));
388    }
389    s
390}
391
392/// Run one job in-process (for tests / local debugging).
393pub fn run_job_local(job: &RemoteJobV1) -> RemoteRespV1 {
394    let mut interp = VMHelper::new();
395    let cap: Vec<(String, StrykeValue)> = match job
396        .capture
397        .iter()
398        .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
399        .collect()
400    {
401        Ok(c) => c,
402        Err(e) => {
403            return RemoteRespV1 {
404                seq: job.seq,
405                ok: false,
406                result: serde_json::Value::Null,
407                err_msg: e,
408            };
409        }
410    };
411    interp.scope_push_hook();
412    interp.scope.restore_capture(&cap);
413    let item_pv = match json_to_perl(&job.item) {
414        Ok(v) => v,
415        Err(e) => {
416            interp.scope_pop_hook();
417            return RemoteRespV1 {
418                seq: job.seq,
419                ok: false,
420                result: serde_json::Value::Null,
421                err_msg: e,
422            };
423        }
424    };
425    interp.scope.set_topic(item_pv);
426    let full_src = format!("{}\n{}", job.subs_prelude, job.block_src);
427    let prog = match crate::parse(&full_src) {
428        Ok(p) => p,
429        Err(e) => {
430            interp.scope_pop_hook();
431            return RemoteRespV1 {
432                seq: job.seq,
433                ok: false,
434                result: serde_json::Value::Null,
435                err_msg: e.message,
436            };
437        }
438    };
439    let block: Block = prog.statements;
440    let r = match interp.exec_block_smart(&block) {
441        Ok(v) => v,
442        Err(e) => {
443            interp.scope_pop_hook();
444            let msg = match e {
445                FlowOrError::Error(stryke) => stryke.to_string(),
446                FlowOrError::Flow(f) => format!("unexpected control flow: {:?}", f),
447            };
448            return RemoteRespV1 {
449                seq: job.seq,
450                ok: false,
451                result: serde_json::Value::Null,
452                err_msg: msg,
453            };
454        }
455    };
456    interp.scope_pop_hook();
457    match perl_to_json_value(&r) {
458        Ok(j) => RemoteRespV1 {
459            seq: job.seq,
460            ok: true,
461            result: j,
462            err_msg: String::new(),
463        },
464        Err(e) => RemoteRespV1 {
465            seq: job.seq,
466            ok: false,
467            result: serde_json::Value::Null,
468            err_msg: e,
469        },
470    }
471}
472
473/// Persistent v3 worker session: handles many jobs over a single stdin/stdout pair, with
474/// one Interpreter and one parsed block shared across the whole session.
475///
476/// Protocol order: HELLO → HELLO_ACK → SESSION_INIT → SESSION_ACK → JOB / JOB_RESP loop
477/// → SHUTDOWN → exit. Any wire error or unknown frame kind causes a clean non-zero exit so
478/// the dispatcher can re-route in-flight jobs to a different slot.
479///
480/// Why this beats the basic v1 [`run_remote_worker_stdio`]: subs prelude + block source
481/// ship **once** per session instead of per-item, parser+compiler runs once per worker,
482/// and one ssh handshake amortizes across the whole map.
483pub fn run_remote_worker_session() -> i32 {
484    let stdin = std::io::stdin();
485    let mut stdin = stdin.lock();
486    let mut stdout = std::io::stdout();
487
488    // 1. HELLO handshake. Dispatcher sends first; we reply with our build info.
489    let hello: HelloMsg = match recv_msg(&mut stdin, frame_kind::HELLO) {
490        Ok(h) => h,
491        Err(e) => {
492            let _ = writeln!(std::io::stderr(), "remote-worker: hello: {e}");
493            return 1;
494        }
495    };
496    if hello.proto_version != PROTO_VERSION {
497        let _ = writeln!(
498            std::io::stderr(),
499            "remote-worker: proto version mismatch (dispatcher {} vs worker {})",
500            hello.proto_version,
501            PROTO_VERSION
502        );
503        return 1;
504    }
505    let ack = HelloAck {
506        proto_version: PROTO_VERSION,
507        pe_version: env!("CARGO_PKG_VERSION").to_string(),
508        hostname: hostname_or_unknown(),
509    };
510    if let Err(e) = send_msg(&mut stdout, frame_kind::HELLO_ACK, &ack) {
511        let _ = writeln!(std::io::stderr(), "remote-worker: hello ack: {e}");
512        return 1;
513    }
514
515    // 2. SESSION_INIT: subs prelude + block source + captured lexicals.
516    let init: SessionInit = match recv_msg(&mut stdin, frame_kind::SESSION_INIT) {
517        Ok(i) => i,
518        Err(e) => {
519            let _ = writeln!(std::io::stderr(), "remote-worker: session init: {e}");
520            return 1;
521        }
522    };
523
524    // Parse subs prelude ONCE so they're registered for every JOB; parse block ONCE so we
525    // can hand the same `Block` to `exec_block_smart` per item without re-parsing.
526    let mut interp = VMHelper::new();
527    let prelude_program = match crate::parse(&init.subs_prelude) {
528        Ok(p) => p,
529        Err(e) => {
530            let nack = SessionAck {
531                ok: false,
532                err_msg: format!("parse subs prelude: {}", e.message),
533            };
534            let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
535            return 2;
536        }
537    };
538    let block_program = match crate::parse(&init.block_src) {
539        Ok(p) => p,
540        Err(e) => {
541            let nack = SessionAck {
542                ok: false,
543                err_msg: format!("parse block: {}", e.message),
544            };
545            let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
546            return 2;
547        }
548    };
549
550    // Restore captured lexicals once per session — they don't change across jobs.
551    let cap_pv: Vec<(String, StrykeValue)> = match init
552        .capture
553        .iter()
554        .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
555        .collect()
556    {
557        Ok(c) => c,
558        Err(e) => {
559            let nack = SessionAck {
560                ok: false,
561                err_msg: format!("decode capture: {e}"),
562            };
563            let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
564            return 2;
565        }
566    };
567    interp.scope_push_hook();
568    interp.scope.restore_capture(&cap_pv);
569
570    // Run the prelude (sub decls) once. After this every JOB has the user's named subs in
571    // scope without re-parsing or re-executing the prelude per item.
572    if let Err(e) = interp.execute(&prelude_program) {
573        let nack = SessionAck {
574            ok: false,
575            err_msg: format!("session prelude: {e}"),
576        };
577        let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
578        return 2;
579    }
580
581    let ack = SessionAck {
582        ok: true,
583        err_msg: String::new(),
584    };
585    if let Err(e) = send_msg(&mut stdout, frame_kind::SESSION_ACK, &ack) {
586        let _ = writeln!(std::io::stderr(), "remote-worker: session ack: {e}");
587        return 1;
588    }
589
590    let block: Block = block_program.statements;
591
592    // 3. JOB loop. Each iteration sets `$_ = item`, re-evaluates the cached block, and
593    // sends back the result. The Interpreter is reused — sub registrations, package state,
594    // anything mutated by SESSION_INIT persists across jobs.
595    loop {
596        let (kind, body) = match read_typed_frame(&mut stdin) {
597            Ok(p) => p,
598            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return 0,
599            Err(e) => {
600                let _ = writeln!(std::io::stderr(), "remote-worker: read job: {e}");
601                return 1;
602            }
603        };
604        match kind {
605            frame_kind::JOB => {
606                let job: JobMsg = match bincode::deserialize(&body) {
607                    Ok(j) => j,
608                    Err(e) => {
609                        let resp = JobRespMsg {
610                            seq: 0,
611                            ok: false,
612                            result: serde_json::Value::Null,
613                            err_msg: format!("decode job: {e}"),
614                        };
615                        let _ = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp);
616                        continue;
617                    }
618                };
619                let resp = run_one_session_job(&mut interp, &block, &job);
620                if let Err(e) = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp) {
621                    let _ = writeln!(std::io::stderr(), "remote-worker: write resp: {e}");
622                    return 1;
623                }
624            }
625            frame_kind::SHUTDOWN => return 0,
626            other => {
627                let _ = writeln!(
628                    std::io::stderr(),
629                    "remote-worker: unexpected frame kind {:#04x} in JOB loop",
630                    other
631                );
632                return 1;
633            }
634        }
635    }
636}
637
638/// Run one JOB inside an active session. Sets `$_` to the item, evaluates the cached block,
639/// returns the JSON-marshalled result. Preserves Interpreter state across jobs so anything
640/// the prelude installed (named subs, package vars) stays live.
641fn run_one_session_job(interp: &mut VMHelper, block: &Block, job: &JobMsg) -> JobRespMsg {
642    let item_pv = match json_to_perl(&job.item) {
643        Ok(v) => v,
644        Err(e) => {
645            return JobRespMsg {
646                seq: job.seq,
647                ok: false,
648                result: serde_json::Value::Null,
649                err_msg: e,
650            };
651        }
652    };
653    interp.scope.set_topic(item_pv);
654    let r = match interp.exec_block_smart(block) {
655        Ok(v) => v,
656        Err(FlowOrError::Error(stryke)) => {
657            return JobRespMsg {
658                seq: job.seq,
659                ok: false,
660                result: serde_json::Value::Null,
661                err_msg: stryke.to_string(),
662            };
663        }
664        Err(FlowOrError::Flow(f)) => {
665            return JobRespMsg {
666                seq: job.seq,
667                ok: false,
668                result: serde_json::Value::Null,
669                err_msg: format!("unexpected control flow: {:?}", f),
670            };
671        }
672    };
673    match perl_to_json_value(&r) {
674        Ok(j) => JobRespMsg {
675            seq: job.seq,
676            ok: true,
677            result: j,
678            err_msg: String::new(),
679        },
680        Err(e) => JobRespMsg {
681            seq: job.seq,
682            ok: false,
683            result: serde_json::Value::Null,
684            err_msg: e,
685        },
686    }
687}
688
689fn hostname_or_unknown() -> String {
690    std::env::var("HOSTNAME").unwrap_or_else(|_| {
691        std::process::Command::new("hostname")
692            .output()
693            .ok()
694            .and_then(|o| String::from_utf8(o.stdout).ok())
695            .map(|s| s.trim().to_string())
696            .unwrap_or_else(|| "unknown".to_string())
697    })
698}
699
700/// stdin/stdout worker loop: one framed request → one framed response, then exit 0.
701pub fn run_remote_worker_stdio() -> i32 {
702    let stdin = std::io::stdin();
703    let mut stdin = stdin.lock();
704    let mut stdout = std::io::stdout();
705    let payload = match read_framed(&mut stdin) {
706        Ok(p) => p,
707        Err(e) => {
708            let _ = writeln!(std::io::stderr(), "remote-worker: read frame: {e}");
709            return 1;
710        }
711    };
712    let job = match decode_job(&payload) {
713        Ok(j) => j,
714        Err(e) => {
715            let _ = writeln!(std::io::stderr(), "remote-worker: decode job: {e}");
716            return 1;
717        }
718    };
719    let resp = run_job_local(&job);
720    let out = match encode_resp(&resp) {
721        Ok(b) => b,
722        Err(e) => {
723            let _ = writeln!(std::io::stderr(), "remote-worker: encode resp: {e}");
724            return 1;
725        }
726    };
727    if let Err(e) = write_framed(&mut stdout, &out) {
728        let _ = writeln!(std::io::stderr(), "remote-worker: write frame: {e}");
729        return 1;
730    }
731    if resp.ok {
732        0
733    } else {
734        let _ = writeln!(std::io::stderr(), "remote-worker: {}", resp.err_msg);
735        2
736    }
737}
738
739pub fn ssh_invoke_remote_worker(
740    host: &str,
741    pe_bin: &str,
742    job: &RemoteJobV1,
743) -> Result<RemoteRespV1, String> {
744    let payload = encode_job(job)?;
745    let mut child = Command::new("ssh")
746        .arg(host)
747        .arg(pe_bin)
748        .arg("--remote-worker")
749        .stdin(Stdio::piped())
750        .stdout(Stdio::piped())
751        .stderr(Stdio::piped())
752        .spawn()
753        .map_err(|e| format!("ssh: {e}"))?;
754    let mut stdin = child.stdin.take().ok_or_else(|| "ssh: stdin".to_string())?;
755    write_framed(&mut stdin, &payload).map_err(|e| format!("ssh stdin: {e}"))?;
756    drop(stdin);
757    let mut stdout = child
758        .stdout
759        .take()
760        .ok_or_else(|| "ssh: stdout".to_string())?;
761    let mut stderr = child
762        .stderr
763        .take()
764        .ok_or_else(|| "ssh: stderr".to_string())?;
765    let stderr_task = std::thread::spawn(move || {
766        let mut s = String::new();
767        let _ = stderr.read_to_string(&mut s);
768        s
769    });
770    let out_bytes = read_framed(&mut stdout).map_err(|e| format!("ssh read frame: {e}"))?;
771    let status = child.wait().map_err(|e| format!("ssh wait: {e}"))?;
772    let stderr_text = stderr_task.join().unwrap_or_default();
773    if !status.success() {
774        return Err(format!(
775            "ssh remote stryke exited {:?}: {}",
776            status.code(),
777            stderr_text.trim()
778        ));
779    }
780    decode_resp(&out_bytes).map_err(|e| {
781        format!(
782            "decode remote response: {e}; stderr: {}",
783            stderr_text.trim()
784        )
785    })
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791
792    #[test]
793    fn job_resp_msg_bincode_roundtrip() {
794        let msg = JobRespMsg {
795            seq: 1,
796            ok: true,
797            result: serde_json::json!(42i64),
798            err_msg: String::new(),
799        };
800        let bytes = bincode::serialize(&msg).unwrap();
801        let back: JobRespMsg = bincode::deserialize(&bytes).unwrap();
802        assert_eq!(back.seq, msg.seq);
803        assert_eq!(back.ok, msg.ok);
804        assert_eq!(back.result, msg.result);
805        assert_eq!(back.err_msg, msg.err_msg);
806    }
807
808    #[test]
809    fn local_roundtrip_doubles() {
810        let job = RemoteJobV1 {
811            seq: 0,
812            subs_prelude: String::new(),
813            block_src: "$_ * 2;".to_string(),
814            capture: vec![],
815            item: serde_json::json!(21),
816        };
817        let r = run_job_local(&job);
818        assert!(r.ok, "{}", r.err_msg);
819        assert_eq!(r.result, serde_json::json!(42));
820    }
821}