1use std::collections::HashMap;
36use std::io::{Read, Write};
37use std::process::{Command, Stdio};
38use std::sync::Arc;
39
40use serde::{Deserialize, Serialize};
41
42use crate::ast::Block;
43use crate::value::{StrykeSub, StrykeValue};
44use crate::vm_helper::{FlowOrError, VMHelper};
45
46#[allow(dead_code)]
50pub mod frame_kind {
51 pub const HELLO: u8 = 0x01;
52 pub const HELLO_ACK: u8 = 0x02;
53 pub const SESSION_INIT: u8 = 0x03;
54 pub const SESSION_ACK: u8 = 0x04;
55 pub const JOB: u8 = 0x05;
56 pub const JOB_RESP: u8 = 0x06;
57 pub const SHUTDOWN: u8 = 0x07;
58 pub const ERROR: u8 = 0xFF;
59}
60
61pub const PROTO_VERSION: u32 = 3;
65
66mod json_value_bincode {
67 use serde::{Deserialize, Deserializer, Serialize, Serializer};
68
69 pub fn serialize<S>(value: &serde_json::Value, serializer: S) -> Result<S::Ok, S::Error>
70 where
71 S: Serializer,
72 {
73 let buf = serde_json::to_vec(value).map_err(serde::ser::Error::custom)?;
74 buf.serialize(serializer)
75 }
76
77 pub fn deserialize<'de, D>(deserializer: D) -> Result<serde_json::Value, D::Error>
78 where
79 D: Deserializer<'de>,
80 {
81 let buf: Vec<u8> = Vec::deserialize(deserializer)?;
82 serde_json::from_slice(&buf).map_err(serde::de::Error::custom)
83 }
84}
85
86mod capture_json_bincode {
87 use serde::{de::Deserializer, ser::SerializeSeq, Deserialize, Serializer};
88
89 pub fn serialize<S>(v: &[(String, serde_json::Value)], serializer: S) -> Result<S::Ok, S::Error>
90 where
91 S: Serializer,
92 {
93 let mut seq = serializer.serialize_seq(Some(v.len()))?;
94 for (k, val) in v {
95 let enc = serde_json::to_vec(val).map_err(serde::ser::Error::custom)?;
96 seq.serialize_element(&(k, enc))?;
97 }
98 seq.end()
99 }
100
101 pub fn deserialize<'de, D>(
102 deserializer: D,
103 ) -> Result<Vec<(String, serde_json::Value)>, D::Error>
104 where
105 D: Deserializer<'de>,
106 {
107 let raw: Vec<(String, Vec<u8>)> = Vec::deserialize(deserializer)?;
108 let mut out = Vec::with_capacity(raw.len());
109 for (k, enc) in raw {
110 let val = serde_json::from_slice(&enc).map_err(serde::de::Error::custom)?;
111 out.push((k, val));
112 }
113 Ok(out)
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct HelloMsg {
119 pub proto_version: u32,
120 pub pe_version: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HelloAck {
125 pub proto_version: u32,
126 pub pe_version: String,
127 pub hostname: String,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SessionInit {
134 pub subs_prelude: String,
135 pub block_src: String,
136 #[serde(with = "capture_json_bincode")]
137 pub capture: Vec<(String, serde_json::Value)>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SessionAck {
142 pub ok: bool,
143 pub err_msg: String,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct JobMsg {
148 pub seq: u64,
149 #[serde(with = "json_value_bincode")]
150 pub item: serde_json::Value,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct JobRespMsg {
155 pub seq: u64,
156 pub ok: bool,
157 #[serde(with = "json_value_bincode")]
158 pub result: serde_json::Value,
159 pub err_msg: String,
160}
161
162pub fn read_typed_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
165 let raw = read_framed(r)?;
166 if raw.is_empty() {
167 return Err(std::io::Error::new(
168 std::io::ErrorKind::InvalidData,
169 "remote frame: empty payload (missing kind byte)",
170 ));
171 }
172 let kind = raw[0];
173 Ok((kind, raw[1..].to_vec()))
174}
175
176pub fn write_typed_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
179 let mut framed = Vec::with_capacity(payload.len() + 1);
180 framed.push(kind);
181 framed.extend_from_slice(payload);
182 write_framed(w, &framed)
183}
184
185pub fn send_msg<W: Write, T: Serialize>(w: &mut W, kind: u8, msg: &T) -> Result<(), String> {
188 let payload = bincode::serialize(msg).map_err(|e| format!("bincode encode: {e}"))?;
189 write_typed_frame(w, kind, &payload).map_err(|e| format!("write frame: {e}"))
190}
191
192pub fn recv_msg<R: Read, T: for<'de> Deserialize<'de>>(
194 r: &mut R,
195 expected_kind: u8,
196) -> Result<T, String> {
197 let (kind, body) = read_typed_frame(r).map_err(|e| format!("read frame: {e}"))?;
198 if kind != expected_kind {
199 return Err(format!(
200 "wire: expected frame kind {:#04x}, got {:#04x}",
201 expected_kind, kind
202 ));
203 }
204 bincode::deserialize(&body).map_err(|e| format!("bincode decode: {e}"))
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct RemoteJobV1 {
210 pub seq: u64,
211 pub subs_prelude: String,
212 pub block_src: String,
213 #[serde(with = "capture_json_bincode")]
214 pub capture: Vec<(String, serde_json::Value)>,
215 #[serde(with = "json_value_bincode")]
216 pub item: serde_json::Value,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct RemoteRespV1 {
221 pub seq: u64,
222 pub ok: bool,
223 #[serde(with = "json_value_bincode")]
224 pub result: serde_json::Value,
225 pub err_msg: String,
226}
227
228const MAX_FRAME: usize = 256 * 1024 * 1024;
229
230pub fn write_framed<W: Write>(w: &mut W, payload: &[u8]) -> std::io::Result<()> {
231 w.write_all(&(payload.len() as u64).to_le_bytes())?;
232 w.write_all(payload)?;
233 w.flush()?;
234 Ok(())
235}
236
237pub fn read_framed<R: Read>(r: &mut R) -> std::io::Result<Vec<u8>> {
238 let mut h = [0u8; 8];
239 r.read_exact(&mut h)?;
240 let n = u64::from_le_bytes(h) as usize;
241 if n > MAX_FRAME {
242 return Err(std::io::Error::new(
243 std::io::ErrorKind::InvalidData,
244 format!("remote frame too large: {n}"),
245 ));
246 }
247 let mut v = vec![0u8; n];
248 r.read_exact(&mut v)?;
249 Ok(v)
250}
251
252pub fn encode_job(job: &RemoteJobV1) -> Result<Vec<u8>, String> {
253 bincode::serialize(job).map_err(|e| e.to_string())
254}
255
256pub fn decode_job(bytes: &[u8]) -> Result<RemoteJobV1, String> {
257 bincode::deserialize(bytes).map_err(|e| e.to_string())
258}
259
260pub fn encode_resp(resp: &RemoteRespV1) -> Result<Vec<u8>, String> {
261 bincode::serialize(resp).map_err(|e| e.to_string())
262}
263
264pub fn decode_resp(bytes: &[u8]) -> Result<RemoteRespV1, String> {
265 bincode::deserialize(bytes).map_err(|e| e.to_string())
266}
267
268pub fn perl_to_json_value(v: &StrykeValue) -> Result<serde_json::Value, String> {
269 if v.is_undef() {
270 return Ok(serde_json::Value::Null);
271 }
272 if let Some(i) = v.as_integer() {
273 return Ok(serde_json::json!(i));
274 }
275 if let Some(f) = v.as_float() {
276 return Ok(serde_json::json!(f));
277 }
278 if v.is_string_like() {
279 return Ok(serde_json::Value::String(v.to_string()));
280 }
281 if let Some(a) = v.as_array_vec() {
282 let mut out = Vec::with_capacity(a.len());
283 for x in a {
284 out.push(perl_to_json_value(&x)?);
285 }
286 return Ok(serde_json::Value::Array(out));
287 }
288 if let Some(ar) = v.as_array_ref() {
294 let guard = ar.read();
295 let mut out = Vec::with_capacity(guard.len());
296 for x in guard.iter() {
297 out.push(perl_to_json_value(x)?);
298 }
299 return Ok(serde_json::Value::Array(out));
300 }
301 if let Some(h) = v.as_hash_map() {
302 let mut m = serde_json::Map::new();
303 for (k, val) in h {
304 m.insert(k.clone(), perl_to_json_value(&val)?);
305 }
306 return Ok(serde_json::Value::Object(m));
307 }
308 if let Some(hr) = v.as_hash_ref() {
309 let guard = hr.read();
310 let mut m = serde_json::Map::new();
311 for (k, val) in guard.iter() {
312 m.insert(k.clone(), perl_to_json_value(val)?);
313 }
314 return Ok(serde_json::Value::Object(m));
315 }
316 Err(format!(
317 "value not supported for remote pmap (need null, bool/int/float/string/array/hash): {}",
318 v.type_name()
319 ))
320}
321
322pub fn json_to_perl(v: &serde_json::Value) -> Result<StrykeValue, String> {
323 Ok(match v {
324 serde_json::Value::Null => StrykeValue::UNDEF,
325 serde_json::Value::Bool(b) => StrykeValue::integer(if *b { 1 } else { 0 }),
326 serde_json::Value::Number(n) => {
327 if let Some(i) = n.as_i64() {
328 StrykeValue::integer(i)
329 } else if let Some(u) = n.as_u64() {
330 StrykeValue::integer(u as i64)
331 } else {
332 StrykeValue::float(n.as_f64().unwrap_or(0.0))
333 }
334 }
335 serde_json::Value::String(s) => StrykeValue::string(s.clone()),
336 serde_json::Value::Array(a) => {
337 let mut items = Vec::with_capacity(a.len());
338 for x in a {
339 items.push(json_to_perl(x)?);
340 }
341 StrykeValue::array(items)
342 }
343 serde_json::Value::Object(o) => {
344 let mut map = indexmap::IndexMap::new();
345 for (k, val) in o {
346 map.insert(k.clone(), json_to_perl(val)?);
347 }
348 StrykeValue::hash(map)
349 }
350 })
351}
352
353pub fn capture_entries_to_json(
354 entries: &[(String, StrykeValue)],
355) -> Result<Vec<(String, serde_json::Value)>, String> {
356 let mut out = Vec::with_capacity(entries.len());
357 for (k, v) in entries {
358 out.push((k.clone(), perl_to_json_value(v)?));
359 }
360 Ok(out)
361}
362
363pub fn build_subs_prelude(subs: &HashMap<String, Arc<StrykeSub>>) -> String {
364 let mut names: Vec<_> = subs.keys().cloned().collect();
365 names.sort();
366 let mut s = String::new();
367 for name in names {
368 let sub = &subs[&name];
369 if sub.closure_env.is_some() {
370 continue;
371 }
372 let sig = if !sub.params.is_empty() {
373 format!(
374 " ({})",
375 sub.params
376 .iter()
377 .map(crate::fmt::format_sub_sig_param)
378 .collect::<Vec<_>>()
379 .join(", ")
380 )
381 } else if let Some(ref p) = sub.prototype {
382 format!(" ({})", p)
383 } else {
384 String::new()
385 };
386 let body = crate::fmt::format_block(&sub.body);
387 s.push_str(&format!("fn {}{} {{\n{}\n}}\n", name, sig, body));
388 }
389 s
390}
391
392pub fn run_job_local(job: &RemoteJobV1) -> RemoteRespV1 {
394 let mut interp = VMHelper::new();
395 let cap: Vec<(String, StrykeValue)> = match job
396 .capture
397 .iter()
398 .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
399 .collect()
400 {
401 Ok(c) => c,
402 Err(e) => {
403 return RemoteRespV1 {
404 seq: job.seq,
405 ok: false,
406 result: serde_json::Value::Null,
407 err_msg: e,
408 };
409 }
410 };
411 interp.scope_push_hook();
412 interp.scope.restore_capture(&cap);
413 let item_pv = match json_to_perl(&job.item) {
414 Ok(v) => v,
415 Err(e) => {
416 interp.scope_pop_hook();
417 return RemoteRespV1 {
418 seq: job.seq,
419 ok: false,
420 result: serde_json::Value::Null,
421 err_msg: e,
422 };
423 }
424 };
425 interp.scope.set_topic(item_pv);
426 let full_src = format!("{}\n{}", job.subs_prelude, job.block_src);
427 let prog = match crate::parse(&full_src) {
428 Ok(p) => p,
429 Err(e) => {
430 interp.scope_pop_hook();
431 return RemoteRespV1 {
432 seq: job.seq,
433 ok: false,
434 result: serde_json::Value::Null,
435 err_msg: e.message,
436 };
437 }
438 };
439 let block: Block = prog.statements;
440 let r = match interp.exec_block_smart(&block) {
441 Ok(v) => v,
442 Err(e) => {
443 interp.scope_pop_hook();
444 let msg = match e {
445 FlowOrError::Error(stryke) => stryke.to_string(),
446 FlowOrError::Flow(f) => format!("unexpected control flow: {:?}", f),
447 };
448 return RemoteRespV1 {
449 seq: job.seq,
450 ok: false,
451 result: serde_json::Value::Null,
452 err_msg: msg,
453 };
454 }
455 };
456 interp.scope_pop_hook();
457 match perl_to_json_value(&r) {
458 Ok(j) => RemoteRespV1 {
459 seq: job.seq,
460 ok: true,
461 result: j,
462 err_msg: String::new(),
463 },
464 Err(e) => RemoteRespV1 {
465 seq: job.seq,
466 ok: false,
467 result: serde_json::Value::Null,
468 err_msg: e,
469 },
470 }
471}
472
473pub fn run_remote_worker_session() -> i32 {
484 let stdin = std::io::stdin();
485 let mut stdin = stdin.lock();
486 let mut stdout = std::io::stdout();
487
488 let hello: HelloMsg = match recv_msg(&mut stdin, frame_kind::HELLO) {
490 Ok(h) => h,
491 Err(e) => {
492 let _ = writeln!(std::io::stderr(), "remote-worker: hello: {e}");
493 return 1;
494 }
495 };
496 if hello.proto_version != PROTO_VERSION {
497 let _ = writeln!(
498 std::io::stderr(),
499 "remote-worker: proto version mismatch (dispatcher {} vs worker {})",
500 hello.proto_version,
501 PROTO_VERSION
502 );
503 return 1;
504 }
505 let ack = HelloAck {
506 proto_version: PROTO_VERSION,
507 pe_version: env!("CARGO_PKG_VERSION").to_string(),
508 hostname: hostname_or_unknown(),
509 };
510 if let Err(e) = send_msg(&mut stdout, frame_kind::HELLO_ACK, &ack) {
511 let _ = writeln!(std::io::stderr(), "remote-worker: hello ack: {e}");
512 return 1;
513 }
514
515 let init: SessionInit = match recv_msg(&mut stdin, frame_kind::SESSION_INIT) {
517 Ok(i) => i,
518 Err(e) => {
519 let _ = writeln!(std::io::stderr(), "remote-worker: session init: {e}");
520 return 1;
521 }
522 };
523
524 let mut interp = VMHelper::new();
527 let prelude_program = match crate::parse(&init.subs_prelude) {
528 Ok(p) => p,
529 Err(e) => {
530 let nack = SessionAck {
531 ok: false,
532 err_msg: format!("parse subs prelude: {}", e.message),
533 };
534 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
535 return 2;
536 }
537 };
538 let block_program = match crate::parse(&init.block_src) {
539 Ok(p) => p,
540 Err(e) => {
541 let nack = SessionAck {
542 ok: false,
543 err_msg: format!("parse block: {}", e.message),
544 };
545 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
546 return 2;
547 }
548 };
549
550 let cap_pv: Vec<(String, StrykeValue)> = match init
552 .capture
553 .iter()
554 .map(|(k, v)| json_to_perl(v).map(|pv| (k.clone(), pv)))
555 .collect()
556 {
557 Ok(c) => c,
558 Err(e) => {
559 let nack = SessionAck {
560 ok: false,
561 err_msg: format!("decode capture: {e}"),
562 };
563 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
564 return 2;
565 }
566 };
567 interp.scope_push_hook();
568 interp.scope.restore_capture(&cap_pv);
569
570 if let Err(e) = interp.execute(&prelude_program) {
573 let nack = SessionAck {
574 ok: false,
575 err_msg: format!("session prelude: {e}"),
576 };
577 let _ = send_msg(&mut stdout, frame_kind::SESSION_ACK, &nack);
578 return 2;
579 }
580
581 let ack = SessionAck {
582 ok: true,
583 err_msg: String::new(),
584 };
585 if let Err(e) = send_msg(&mut stdout, frame_kind::SESSION_ACK, &ack) {
586 let _ = writeln!(std::io::stderr(), "remote-worker: session ack: {e}");
587 return 1;
588 }
589
590 let block: Block = block_program.statements;
591
592 loop {
596 let (kind, body) = match read_typed_frame(&mut stdin) {
597 Ok(p) => p,
598 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return 0,
599 Err(e) => {
600 let _ = writeln!(std::io::stderr(), "remote-worker: read job: {e}");
601 return 1;
602 }
603 };
604 match kind {
605 frame_kind::JOB => {
606 let job: JobMsg = match bincode::deserialize(&body) {
607 Ok(j) => j,
608 Err(e) => {
609 let resp = JobRespMsg {
610 seq: 0,
611 ok: false,
612 result: serde_json::Value::Null,
613 err_msg: format!("decode job: {e}"),
614 };
615 let _ = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp);
616 continue;
617 }
618 };
619 let resp = run_one_session_job(&mut interp, &block, &job);
620 if let Err(e) = send_msg(&mut stdout, frame_kind::JOB_RESP, &resp) {
621 let _ = writeln!(std::io::stderr(), "remote-worker: write resp: {e}");
622 return 1;
623 }
624 }
625 frame_kind::SHUTDOWN => return 0,
626 other => {
627 let _ = writeln!(
628 std::io::stderr(),
629 "remote-worker: unexpected frame kind {:#04x} in JOB loop",
630 other
631 );
632 return 1;
633 }
634 }
635 }
636}
637
638fn run_one_session_job(interp: &mut VMHelper, block: &Block, job: &JobMsg) -> JobRespMsg {
642 let item_pv = match json_to_perl(&job.item) {
643 Ok(v) => v,
644 Err(e) => {
645 return JobRespMsg {
646 seq: job.seq,
647 ok: false,
648 result: serde_json::Value::Null,
649 err_msg: e,
650 };
651 }
652 };
653 interp.scope.set_topic(item_pv);
654 let r = match interp.exec_block_smart(block) {
655 Ok(v) => v,
656 Err(FlowOrError::Error(stryke)) => {
657 return JobRespMsg {
658 seq: job.seq,
659 ok: false,
660 result: serde_json::Value::Null,
661 err_msg: stryke.to_string(),
662 };
663 }
664 Err(FlowOrError::Flow(f)) => {
665 return JobRespMsg {
666 seq: job.seq,
667 ok: false,
668 result: serde_json::Value::Null,
669 err_msg: format!("unexpected control flow: {:?}", f),
670 };
671 }
672 };
673 match perl_to_json_value(&r) {
674 Ok(j) => JobRespMsg {
675 seq: job.seq,
676 ok: true,
677 result: j,
678 err_msg: String::new(),
679 },
680 Err(e) => JobRespMsg {
681 seq: job.seq,
682 ok: false,
683 result: serde_json::Value::Null,
684 err_msg: e,
685 },
686 }
687}
688
689fn hostname_or_unknown() -> String {
690 std::env::var("HOSTNAME").unwrap_or_else(|_| {
691 std::process::Command::new("hostname")
692 .output()
693 .ok()
694 .and_then(|o| String::from_utf8(o.stdout).ok())
695 .map(|s| s.trim().to_string())
696 .unwrap_or_else(|| "unknown".to_string())
697 })
698}
699
700pub fn run_remote_worker_stdio() -> i32 {
702 let stdin = std::io::stdin();
703 let mut stdin = stdin.lock();
704 let mut stdout = std::io::stdout();
705 let payload = match read_framed(&mut stdin) {
706 Ok(p) => p,
707 Err(e) => {
708 let _ = writeln!(std::io::stderr(), "remote-worker: read frame: {e}");
709 return 1;
710 }
711 };
712 let job = match decode_job(&payload) {
713 Ok(j) => j,
714 Err(e) => {
715 let _ = writeln!(std::io::stderr(), "remote-worker: decode job: {e}");
716 return 1;
717 }
718 };
719 let resp = run_job_local(&job);
720 let out = match encode_resp(&resp) {
721 Ok(b) => b,
722 Err(e) => {
723 let _ = writeln!(std::io::stderr(), "remote-worker: encode resp: {e}");
724 return 1;
725 }
726 };
727 if let Err(e) = write_framed(&mut stdout, &out) {
728 let _ = writeln!(std::io::stderr(), "remote-worker: write frame: {e}");
729 return 1;
730 }
731 if resp.ok {
732 0
733 } else {
734 let _ = writeln!(std::io::stderr(), "remote-worker: {}", resp.err_msg);
735 2
736 }
737}
738
739pub fn ssh_invoke_remote_worker(
740 host: &str,
741 pe_bin: &str,
742 job: &RemoteJobV1,
743) -> Result<RemoteRespV1, String> {
744 let payload = encode_job(job)?;
745 let mut child = Command::new("ssh")
746 .arg(host)
747 .arg(pe_bin)
748 .arg("--remote-worker")
749 .stdin(Stdio::piped())
750 .stdout(Stdio::piped())
751 .stderr(Stdio::piped())
752 .spawn()
753 .map_err(|e| format!("ssh: {e}"))?;
754 let mut stdin = child.stdin.take().ok_or_else(|| "ssh: stdin".to_string())?;
755 write_framed(&mut stdin, &payload).map_err(|e| format!("ssh stdin: {e}"))?;
756 drop(stdin);
757 let mut stdout = child
758 .stdout
759 .take()
760 .ok_or_else(|| "ssh: stdout".to_string())?;
761 let mut stderr = child
762 .stderr
763 .take()
764 .ok_or_else(|| "ssh: stderr".to_string())?;
765 let stderr_task = std::thread::spawn(move || {
766 let mut s = String::new();
767 let _ = stderr.read_to_string(&mut s);
768 s
769 });
770 let out_bytes = read_framed(&mut stdout).map_err(|e| format!("ssh read frame: {e}"))?;
771 let status = child.wait().map_err(|e| format!("ssh wait: {e}"))?;
772 let stderr_text = stderr_task.join().unwrap_or_default();
773 if !status.success() {
774 return Err(format!(
775 "ssh remote stryke exited {:?}: {}",
776 status.code(),
777 stderr_text.trim()
778 ));
779 }
780 decode_resp(&out_bytes).map_err(|e| {
781 format!(
782 "decode remote response: {e}; stderr: {}",
783 stderr_text.trim()
784 )
785 })
786}
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791
792 #[test]
793 fn job_resp_msg_bincode_roundtrip() {
794 let msg = JobRespMsg {
795 seq: 1,
796 ok: true,
797 result: serde_json::json!(42i64),
798 err_msg: String::new(),
799 };
800 let bytes = bincode::serialize(&msg).unwrap();
801 let back: JobRespMsg = bincode::deserialize(&bytes).unwrap();
802 assert_eq!(back.seq, msg.seq);
803 assert_eq!(back.ok, msg.ok);
804 assert_eq!(back.result, msg.result);
805 assert_eq!(back.err_msg, msg.err_msg);
806 }
807
808 #[test]
809 fn local_roundtrip_doubles() {
810 let job = RemoteJobV1 {
811 seq: 0,
812 subs_prelude: String::new(),
813 block_src: "$_ * 2;".to_string(),
814 capture: vec![],
815 item: serde_json::json!(21),
816 };
817 let r = run_job_local(&job);
818 assert!(r.ok, "{}", r.err_msg);
819 assert_eq!(r.result, serde_json::json!(42));
820 }
821}