1use std::collections::HashMap;
36use std::io::{Read, Write};
37use std::process::{Command, Stdio};
38use std::sync::Arc;
39
40use serde::{Deserialize, Serialize};
41
42use crate::ast::Block;
43use crate::interpreter::{FlowOrError, Interpreter};
44use crate::value::{PerlSub, PerlValue};
45
46#[allow(dead_code)]
50pub mod frame_kind {
51 pub const HELLO: u8 = 0x01;
52 pub const HELLO_ACK: u8 = 0x02;
53 pub const SESSION_INIT: u8 = 0x03;
54 pub const SESSION_ACK: u8 = 0x04;
55 pub const JOB: u8 = 0x05;
56 pub const JOB_RESP: u8 = 0x06;
57 pub const SHUTDOWN: u8 = 0x07;
58 pub const ERROR: u8 = 0xFF;
59}
60
61pub const PROTO_VERSION: u32 = 3;
65
66mod json_value_bincode {
67 use serde::{Deserialize, Deserializer, Serialize, Serializer};
68
69 pub fn serialize<S>(value: &serde_json::Value, serializer: S) -> Result<S::Ok, S::Error>
70 where
71 S: Serializer,
72 {
73 let buf = serde_json::to_vec(value).map_err(serde::ser::Error::custom)?;
74 buf.serialize(serializer)
75 }
76
77 pub fn deserialize<'de, D>(deserializer: D) -> Result<serde_json::Value, D::Error>
78 where
79 D: Deserializer<'de>,
80 {
81 let buf: Vec<u8> = Vec::deserialize(deserializer)?;
82 serde_json::from_slice(&buf).map_err(serde::de::Error::custom)
83 }
84}
85
86mod capture_json_bincode {
87 use serde::{de::Deserializer, ser::SerializeSeq, Deserialize, Serializer};
88
89 pub fn serialize<S>(v: &[(String, serde_json::Value)], serializer: S) -> Result<S::Ok, S::Error>
90 where
91 S: Serializer,
92 {
93 let mut seq = serializer.serialize_seq(Some(v.len()))?;
94 for (k, val) in v {
95 let enc = serde_json::to_vec(val).map_err(serde::ser::Error::custom)?;
96 seq.serialize_element(&(k, enc))?;
97 }
98 seq.end()
99 }
100
101 pub fn deserialize<'de, D>(
102 deserializer: D,
103 ) -> Result<Vec<(String, serde_json::Value)>, D::Error>
104 where
105 D: Deserializer<'de>,
106 {
107 let raw: Vec<(String, Vec<u8>)> = Vec::deserialize(deserializer)?;
108 let mut out = Vec::with_capacity(raw.len());
109 for (k, enc) in raw {
110 let val = serde_json::from_slice(&enc).map_err(serde::de::Error::custom)?;
111 out.push((k, val));
112 }
113 Ok(out)
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct HelloMsg {
119 pub proto_version: u32,
120 pub pe_version: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HelloAck {
125 pub proto_version: u32,
126 pub pe_version: String,
127 pub hostname: String,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SessionInit {
134 pub subs_prelude: String,
135 pub block_src: String,
136 #[serde(with = "capture_json_bincode")]
137 pub capture: Vec<(String, serde_json::Value)>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SessionAck {
142 pub ok: bool,
143 pub err_msg: String,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct JobMsg {
148 pub seq: u64,
149 #[serde(with = "json_value_bincode")]
150 pub item: serde_json::Value,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct JobRespMsg {
155 pub seq: u64,
156 pub ok: bool,
157 #[serde(with = "json_value_bincode")]
158 pub result: serde_json::Value,
159 pub err_msg: String,
160}
161
162pub fn read_typed_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
165 let raw = read_framed(r)?;
166 if raw.is_empty() {
167 return Err(std::io::Error::new(
168 std::io::ErrorKind::InvalidData,
169 "remote frame: empty payload (missing kind byte)",
170 ));
171 }
172 let kind = raw[0];
173 Ok((kind, raw[1..].to_vec()))
174}
175
176pub fn write_typed_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
179 let mut framed = Vec::with_capacity(payload.len() + 1);
180 framed.push(kind);
181 framed.extend_from_slice(payload);
182 write_framed(w, &framed)
183}
184
185pub fn send_msg<W: Write, T: Serialize>(w: &mut W, kind: u8, msg: &T) -> Result<(), String> {
188 let payload = bincode::serialize(msg).map_err(|e| format!("bincode encode: {e}"))?;
189 write_typed_frame(w, kind, &payload).map_err(|e| format!("write frame: {e}"))
190}
191
192pub fn recv_msg<R: Read, T: for<'de> Deserialize<'de>>(
194 r: &mut R,
195 expected_kind: u8,
196) -> Result<T, String> {
197 let (kind, body) = read_typed_frame(r).map_err(|e| format!("read frame: {e}"))?;
198 if kind != expected_kind {
199 return Err(format!(
200 "wire: expected frame kind {:#04x}, got {:#04x}",
201 expected_kind, kind
202 ));
203 }
204 bincode::deserialize(&body).map_err(|e| format!("bincode decode: {e}"))
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct RemoteJobV1 {
210 pub seq: u64,
211 pub subs_prelude: String,
212 pub block_src: String,
213 #[serde(with = "capture_json_bincode")]
214 pub capture: Vec<(String, serde_json::Value)>,
215 #[serde(with = "json_value_bincode")]
216 pub item: serde_json::Value,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct RemoteRespV1 {
221 pub seq: u64,
222 pub ok: bool,
223 #[serde(with = "json_value_bincode")]
224 pub result: serde_json::Value,
225 pub err_msg: String,
226}
227
228const MAX_FRAME: usize = 256 * 1024 * 1024;
229
230pub fn write_framed<W: Write>(w: &mut W, payload: &[u8]) -> std::io::Result<()> {
231 w.write_all(&(payload.len() as u64).to_le_bytes())?;
232 w.write_all(payload)?;
233 w.flush()?;
234 Ok(())
235}
236
237pub fn read_framed<R: Read>(r: &mut R) -> std::io::Result<Vec<u8>> {
238 let mut h = [0u8; 8];
239 r.read_exact(&mut h)?;
240 let n = u64::from_le_bytes(h) as usize;
241 if n > MAX_FRAME {
242 return Err(std::io::Error::new(
243 std::io::ErrorKind::InvalidData,
244 format!("remote frame too large: {n}"),
245 ));
246 }
247 let mut v = vec![0u8; n];
248 r.read_exact(&mut v)?;
249 Ok(v)
250}
251
252pub fn encode_job(job: &RemoteJobV1) -> Result<Vec<u8>, String> {
253 bincode::serialize(job).map_err(|e| e.to_string())
254}
255
256pub fn decode_job(bytes: &[u8]) -> Result<RemoteJobV1, String> {
257 bincode::deserialize(bytes).map_err(|e| e.to_string())
258}
259
260pub fn encode_resp(resp: &RemoteRespV1) -> Result<Vec<u8>, String> {
261 bincode::serialize(resp).map_err(|e| e.to_string())
262}
263
264pub fn decode_resp(bytes: &[u8]) -> Result<RemoteRespV1, String> {
265 bincode::deserialize(bytes).map_err(|e| e.to_string())
266}
267
268pub fn perl_to_json_value(v: &PerlValue) -> Result<serde_json::Value, String> {
269 if v.is_undef() {
270 return Ok(serde_json::Value::Null);
271 }
272 if let Some(i) = v.as_integer() {
273 return Ok(serde_json::json!(i));
274 }
275 if let Some(f) = v.as_float() {
276 return Ok(serde_json::json!(f));
277 }
278 if v.is_string_like() {
279 return Ok(serde_json::Value::String(v.to_string()));
280 }
281 if let Some(a) = v.as_array_vec() {
282 let mut out = Vec::with_capacity(a.len());
283 for x in a {
284 out.push(perl_to_json_value(&x)?);
285 }
286 return Ok(serde_json::Value::Array(out));
287 }
288 if let Some(h) = v.as_hash_map() {
289 let mut m = serde_json::Map::new();
290 for (k, val) in h {
291 m.insert(k.clone(), perl_to_json_value(&val)?);
292 }
293 return Ok(serde_json::Value::Object(m));
294 }
295 Err(format!(
296 "value not supported for remote pmap (need null, bool/int/float/string/array/hash): {}",
297 v.type_name()
298 ))
299}
300
301pub fn json_to_perl(v: &serde_json::Value) -> Result<PerlValue, String> {
302 Ok(match v {
303 serde_json::Value::Null => PerlValue::UNDEF,
304 serde_json::Value::Bool(b) => PerlValue::integer(if *b { 1 } else { 0 }),
305 serde_json::Value::Number(n) => {
306 if let Some(i) = n.as_i64() {
307 PerlValue::integer(i)
308 } else if let Some(u) = n.as_u64() {
309 PerlValue::integer(u as i64)
310 } else {
311 PerlValue::float(n.as_f64().unwrap_or(0.0))
312 }
313 }
314 serde_json::Value::String(s) => PerlValue::string(s.clone()),
315 serde_json::Value::Array(a) => {
316 let mut items = Vec::with_capacity(a.len());
317 for x in a {
318 items.push(json_to_perl(x)?);
319 }
320 PerlValue::array(items)
321 }
322 serde_json::Value::Object(o) => {
323 let mut map = indexmap::IndexMap::new();
324 for (k, val) in o {
325 map.insert(k.clone(), json_to_perl(val)?);
326 }
327 PerlValue::hash(map)
328 }
329 })
330}
331
332pub fn capture_entries_to_json(
333 entries: &[(String, PerlValue)],
334) -> Result<Vec<(String, serde_json::Value)>, String> {
335 let mut out = Vec::with_capacity(entries.len());
336 for (k, v) in entries {
337 out.push((k.clone(), perl_to_json_value(v)?));
338 }
339 Ok(out)
340}
341
342pub fn build_subs_prelude(subs: &HashMap<String, Arc<PerlSub>>) -> String {
343 let mut names: Vec<_> = subs.keys().cloned().collect();
344 names.sort();
345 let mut s = String::new();
346 for name in names {
347 let sub = &subs[&name];
348 if sub.closure_env.is_some() {
349 continue;
350 }
351 let sig = if !sub.params.is_empty() {
352 format!(
353 " ({})",
354 sub.params
355 .iter()
356 .map(crate::fmt::format_sub_sig_param)
357 .collect::<Vec<_>>()
358 .join(", ")
359 )
360 } else if let Some(ref p) = sub.prototype {
361 format!(" ({})", p)
362 } else {
363 String::new()
364 };
365 let body = crate::fmt::format_block(&sub.body);
366 s.push_str(&format!("fn {}{} {{\n{}\n}}\n", name, sig, body));
367 }
368 s
369}
370
371pub fn run_job_local(job: &RemoteJobV1) -> RemoteRespV1 {
373 let mut interp = Interpreter::new();
374 let cap: Vec<(String, PerlValue)> = match job
375 .capture
376 .iter()
377 .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
378 .collect()
379 {
380 Ok(c) => c,
381 Err(e) => {
382 return RemoteRespV1 {
383 seq: job.seq,
384 ok: false,
385 result: serde_json::Value::Null,
386 err_msg: e,
387 };
388 }
389 };
390 interp.scope_push_hook();
391 interp.scope.restore_capture(&cap);
392 let item_pv = match json_to_perl(&job.item) {
393 Ok(v) => v,
394 Err(e) => {
395 interp.scope_pop_hook();
396 return RemoteRespV1 {
397 seq: job.seq,
398 ok: false,
399 result: serde_json::Value::Null,
400 err_msg: e,
401 };
402 }
403 };
404 interp.scope.set_topic(item_pv);
405 let full_src = format!("{}\n{}", job.subs_prelude, job.block_src);
406 let prog = match crate::parse(&full_src) {
407 Ok(p) => p,
408 Err(e) => {
409 interp.scope_pop_hook();
410 return RemoteRespV1 {
411 seq: job.seq,
412 ok: false,
413 result: serde_json::Value::Null,
414 err_msg: e.message,
415 };
416 }
417 };
418 let block: Block = prog.statements;
419 let r = match interp.exec_block_smart(&block) {
420 Ok(v) => v,
421 Err(e) => {
422 interp.scope_pop_hook();
423 let msg = match e {
424 FlowOrError::Error(stryke) => stryke.to_string(),
425 FlowOrError::Flow(f) => format!("unexpected control flow: {:?}", f),
426 };
427 return RemoteRespV1 {
428 seq: job.seq,
429 ok: false,
430 result: serde_json::Value::Null,
431 err_msg: msg,
432 };
433 }
434 };
435 interp.scope_pop_hook();
436 match perl_to_json_value(&r) {
437 Ok(j) => RemoteRespV1 {
438 seq: job.seq,
439 ok: true,
440 result: j,
441 err_msg: String::new(),
442 },
443 Err(e) => RemoteRespV1 {
444 seq: job.seq,
445 ok: false,
446 result: serde_json::Value::Null,
447 err_msg: e,
448 },
449 }
450}
451
452pub fn run_remote_worker_session() -> i32 {
463 let stdin = std::io::stdin();
464 let mut stdin = stdin.lock();
465 let mut stdout = std::io::stdout();
466
467 let hello: HelloMsg = match recv_msg(&mut stdin, frame_kind::HELLO) {
469 Ok(h) => h,
470 Err(e) => {
471 let _ = writeln!(std::io::stderr(), "remote-worker: hello: {e}");
472 return 1;
473 }
474 };
475 if hello.proto_version != PROTO_VERSION {
476 let _ = writeln!(
477 std::io::stderr(),
478 "remote-worker: proto version mismatch (dispatcher {} vs worker {})",
479 hello.proto_version,
480 PROTO_VERSION
481 );
482 return 1;
483 }
484 let ack = HelloAck {
485 proto_version: PROTO_VERSION,
486 pe_version: env!("CARGO_PKG_VERSION").to_string(),
487 hostname: hostname_or_unknown(),
488 };
489 if let Err(e) = send_msg(&mut stdout, frame_kind::HELLO_ACK, &ack) {
490 let _ = writeln!(std::io::stderr(), "remote-worker: hello ack: {e}");
491 return 1;
492 }
493
494 let init: SessionInit = match recv_msg(&mut stdin, frame_kind::SESSION_INIT) {
496 Ok(i) => i,
497 Err(e) => {
498 let _ = writeln!(std::io::stderr(), "remote-worker: session init: {e}");
499 return 1;
500 }
501 };
502
503 let mut interp = Interpreter::new();
506 let prelude_program = match crate::parse(&init.subs_prelude) {
507 Ok(p) => p,
508 Err(e) => {
509 let nack = SessionAck {
510 ok: false,
511 err_msg: format!("parse subs prelude: {}", e.message),
512 };
513 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
514 return 2;
515 }
516 };
517 let block_program = match crate::parse(&init.block_src) {
518 Ok(p) => p,
519 Err(e) => {
520 let nack = SessionAck {
521 ok: false,
522 err_msg: format!("parse block: {}", e.message),
523 };
524 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
525 return 2;
526 }
527 };
528
529 let cap_pv: Vec<(String, PerlValue)> = match init
531 .capture
532 .iter()
533 .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
534 .collect()
535 {
536 Ok(c) => c,
537 Err(e) => {
538 let nack = SessionAck {
539 ok: false,
540 err_msg: format!("decode capture: {e}"),
541 };
542 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
543 return 2;
544 }
545 };
546 interp.scope_push_hook();
547 interp.scope.restore_capture(&cap_pv);
548
549 if let Err(e) = interp.execute(&prelude_program) {
552 let nack = SessionAck {
553 ok: false,
554 err_msg: format!("session prelude: {e}"),
555 };
556 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
557 return 2;
558 }
559
560 let ack = SessionAck {
561 ok: true,
562 err_msg: String::new(),
563 };
564 if let Err(e) = send_msg(&mut stdout, frame_kind::SESSION_ACK, &ack) {
565 let _ = writeln!(std::io::stderr(), "remote-worker: session ack: {e}");
566 return 1;
567 }
568
569 let block: Block = block_program.statements;
570
571 loop {
575 let (kind, body) = match read_typed_frame(&mut stdin) {
576 Ok(p) => p,
577 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return 0,
578 Err(e) => {
579 let _ = writeln!(std::io::stderr(), "remote-worker: read job: {e}");
580 return 1;
581 }
582 };
583 match kind {
584 frame_kind::JOB => {
585 let job: JobMsg = match bincode::deserialize(&body) {
586 Ok(j) => j,
587 Err(e) => {
588 let resp = JobRespMsg {
589 seq: 0,
590 ok: false,
591 result: serde_json::Value::Null,
592 err_msg: format!("decode job: {e}"),
593 };
594 let _ = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp);
595 continue;
596 }
597 };
598 let resp = run_one_session_job(&mut interp, &block, &job);
599 if let Err(e) = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp) {
600 let _ = writeln!(std::io::stderr(), "remote-worker: write resp: {e}");
601 return 1;
602 }
603 }
604 frame_kind::SHUTDOWN => return 0,
605 other => {
606 let _ = writeln!(
607 std::io::stderr(),
608 "remote-worker: unexpected frame kind {:#04x} in JOB loop",
609 other
610 );
611 return 1;
612 }
613 }
614 }
615}
616
617fn run_one_session_job(interp: &mut Interpreter, block: &Block, job: &JobMsg) -> JobRespMsg {
621 let item_pv = match json_to_perl(&job.item) {
622 Ok(v) => v,
623 Err(e) => {
624 return JobRespMsg {
625 seq: job.seq,
626 ok: false,
627 result: serde_json::Value::Null,
628 err_msg: e,
629 };
630 }
631 };
632 interp.scope.set_topic(item_pv);
633 let r = match interp.exec_block_smart(block) {
634 Ok(v) => v,
635 Err(FlowOrError::Error(stryke)) => {
636 return JobRespMsg {
637 seq: job.seq,
638 ok: false,
639 result: serde_json::Value::Null,
640 err_msg: stryke.to_string(),
641 };
642 }
643 Err(FlowOrError::Flow(f)) => {
644 return JobRespMsg {
645 seq: job.seq,
646 ok: false,
647 result: serde_json::Value::Null,
648 err_msg: format!("unexpected control flow: {:?}", f),
649 };
650 }
651 };
652 match perl_to_json_value(&r) {
653 Ok(j) => JobRespMsg {
654 seq: job.seq,
655 ok: true,
656 result: j,
657 err_msg: String::new(),
658 },
659 Err(e) => JobRespMsg {
660 seq: job.seq,
661 ok: false,
662 result: serde_json::Value::Null,
663 err_msg: e,
664 },
665 }
666}
667
668fn hostname_or_unknown() -> String {
669 std::env::var("HOSTNAME").unwrap_or_else(|_| {
670 std::process::Command::new("hostname")
671 .output()
672 .ok()
673 .and_then(|o| String::from_utf8(o.stdout).ok())
674 .map(|s| s.trim().to_string())
675 .unwrap_or_else(|| "unknown".to_string())
676 })
677}
678
679pub fn run_remote_worker_stdio() -> i32 {
681 let stdin = std::io::stdin();
682 let mut stdin = stdin.lock();
683 let mut stdout = std::io::stdout();
684 let payload = match read_framed(&mut stdin) {
685 Ok(p) => p,
686 Err(e) => {
687 let _ = writeln!(std::io::stderr(), "remote-worker: read frame: {e}");
688 return 1;
689 }
690 };
691 let job = match decode_job(&payload) {
692 Ok(j) => j,
693 Err(e) => {
694 let _ = writeln!(std::io::stderr(), "remote-worker: decode job: {e}");
695 return 1;
696 }
697 };
698 let resp = run_job_local(&job);
699 let out = match encode_resp(&resp) {
700 Ok(b) => b,
701 Err(e) => {
702 let _ = writeln!(std::io::stderr(), "remote-worker: encode resp: {e}");
703 return 1;
704 }
705 };
706 if let Err(e) = write_framed(&mut stdout, &out) {
707 let _ = writeln!(std::io::stderr(), "remote-worker: write frame: {e}");
708 return 1;
709 }
710 if resp.ok {
711 0
712 } else {
713 let _ = writeln!(std::io::stderr(), "remote-worker: {}", resp.err_msg);
714 2
715 }
716}
717
718pub fn ssh_invoke_remote_worker(
719 host: &str,
720 pe_bin: &str,
721 job: &RemoteJobV1,
722) -> Result<RemoteRespV1, String> {
723 let payload = encode_job(job)?;
724 let mut child = Command::new("ssh")
725 .arg(host)
726 .arg(pe_bin)
727 .arg("--remote-worker")
728 .stdin(Stdio::piped())
729 .stdout(Stdio::piped())
730 .stderr(Stdio::piped())
731 .spawn()
732 .map_err(|e| format!("ssh: {e}"))?;
733 let mut stdin = child.stdin.take().ok_or_else(|| "ssh: stdin".to_string())?;
734 write_framed(&mut stdin, &payload).map_err(|e| format!("ssh stdin: {e}"))?;
735 drop(stdin);
736 let mut stdout = child
737 .stdout
738 .take()
739 .ok_or_else(|| "ssh: stdout".to_string())?;
740 let mut stderr = child
741 .stderr
742 .take()
743 .ok_or_else(|| "ssh: stderr".to_string())?;
744 let stderr_task = std::thread::spawn(move || {
745 let mut s = String::new();
746 let _ = stderr.read_to_string(&mut s);
747 s
748 });
749 let out_bytes = read_framed(&mut stdout).map_err(|e| format!("ssh read frame: {e}"))?;
750 let status = child.wait().map_err(|e| format!("ssh wait: {e}"))?;
751 let stderr_text = stderr_task.join().unwrap_or_default();
752 if !status.success() {
753 return Err(format!(
754 "ssh remote stryke exited {:?}: {}",
755 status.code(),
756 stderr_text.trim()
757 ));
758 }
759 decode_resp(&out_bytes).map_err(|e| {
760 format!(
761 "decode remote response: {e}; stderr: {}",
762 stderr_text.trim()
763 )
764 })
765}
766
767#[cfg(test)]
768mod tests {
769 use super::*;
770
771 #[test]
772 fn job_resp_msg_bincode_roundtrip() {
773 let msg = JobRespMsg {
774 seq: 1,
775 ok: true,
776 result: serde_json::json!(42i64),
777 err_msg: String::new(),
778 };
779 let bytes = bincode::serialize(&msg).unwrap();
780 let back: JobRespMsg = bincode::deserialize(&bytes).unwrap();
781 assert_eq!(back.seq, msg.seq);
782 assert_eq!(back.ok, msg.ok);
783 assert_eq!(back.result, msg.result);
784 assert_eq!(back.err_msg, msg.err_msg);
785 }
786
787 #[test]
788 fn local_roundtrip_doubles() {
789 let job = RemoteJobV1 {
790 seq: 0,
791 subs_prelude: String::new(),
792 block_src: "$_ * 2;".to_string(),
793 capture: vec![],
794 item: serde_json::json!(21),
795 };
796 let r = run_job_local(&job);
797 assert!(r.ok, "{}", r.err_msg);
798 assert_eq!(r.result, serde_json::json!(42));
799 }
800}