1use std::collections::{BTreeMap, HashMap};
2use std::io::{self, Read, Write};
3use std::process::{Command, Stdio};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, OnceLock};
6use std::thread::{self, JoinHandle};
7
8use rexlang::{
9 BuiltinTypeId, Engine, EngineError, FromPointer, Heap, Library, Pointer, Scheme, Symbol, Type,
10 Value, sym, virtual_export_name,
11};
12use uuid::Uuid;
13
14fn lock_mutex<'a, T>(
15 m: &'a Mutex<T>,
16 context: &str,
17) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
18 m.lock()
19 .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
20}
21
22fn lock_arc_mutex<'a, T>(
23 m: &'a Arc<Mutex<T>>,
24 context: &str,
25) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
26 m.lock()
27 .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
28}
29
30fn unit_pointer(heap: &Heap) -> Result<Pointer, EngineError> {
31 heap.alloc_tuple(vec![])
32}
33
34#[cfg(test)]
35fn value_type_name(value: &Value) -> &'static str {
36 value.value_type_name()
37}
38
39fn unit_type() -> Type {
40 Type::tuple(vec![])
41}
42
43fn array_type(elem: Type) -> Type {
44 Type::app(Type::builtin(BuiltinTypeId::Array), elem)
45}
46
47fn list_to_vec(heap: &Heap, pointer: &Pointer) -> Result<Vec<Pointer>, EngineError> {
48 let mut out = Vec::new();
49 let mut cursor = *pointer;
50 loop {
51 let value = heap.get(&cursor)?;
52 match value.as_ref() {
53 Value::Adt(tag, args) if tag.as_ref() == "Empty" && args.is_empty() => return Ok(out),
54 Value::Adt(tag, args) if tag.as_ref() == "Cons" && args.len() == 2 => {
55 out.push(args[0]);
56 cursor = args[1];
57 }
58 _ => {
59 return Err(EngineError::NativeType {
60 expected: "List".into(),
61 got: heap.type_name(&cursor)?.into(),
62 });
63 }
64 }
65 }
66}
67
68fn array_u8_to_bytes(heap: &Heap, pointer: &Pointer) -> Result<Vec<u8>, EngineError> {
69 let elems = heap.pointer_as_array(pointer)?;
70 let mut out = Vec::with_capacity(elems.len());
71 for elem in &elems {
72 out.push(heap.pointer_as_u8(elem)?);
73 }
74 Ok(out)
75}
76
77fn bytes_to_array_u8(heap: &Heap, bytes: Vec<u8>) -> Result<Pointer, EngineError> {
78 let out = bytes
79 .into_iter()
80 .map(|b| heap.alloc_u8(b))
81 .collect::<Result<Vec<_>, _>>()?;
82 heap.alloc_array(out)
83}
84
85#[derive(Default)]
86struct SubprocessRegistry {
87 procs: Mutex<HashMap<Uuid, Arc<SubprocessEntry>>>,
88}
89
90struct SubprocessEntry {
91 exit_code: Mutex<Option<i32>>,
92 child: Mutex<Option<std::process::Child>>,
93 stdout: Arc<Mutex<Vec<u8>>>,
94 stderr: Arc<Mutex<Vec<u8>>>,
95 stdout_done: AtomicBool,
96 stderr_done: AtomicBool,
97 stdout_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
98 stderr_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
99}
100
101impl SubprocessEntry {
102 fn new(child: std::process::Child) -> Self {
103 Self {
104 exit_code: Mutex::new(None),
105 child: Mutex::new(Some(child)),
106 stdout: Arc::new(Mutex::new(Vec::new())),
107 stderr: Arc::new(Mutex::new(Vec::new())),
108 stdout_done: AtomicBool::new(false),
109 stderr_done: AtomicBool::new(false),
110 stdout_thread: Mutex::new(None),
111 stderr_thread: Mutex::new(None),
112 }
113 }
114}
115
116static SUBPROCESSES: OnceLock<SubprocessRegistry> = OnceLock::new();
117
118fn subprocess_registry() -> &'static SubprocessRegistry {
119 SUBPROCESSES.get_or_init(SubprocessRegistry::default)
120}
121
122pub fn inject_cli_prelude_engine(engine: &mut Engine) -> Result<(), EngineError> {
123 engine.inject_tracing_log_function(&virtual_export_name("std.io", "debug"), |s| {
124 tracing::debug!("{s}")
125 })?;
126 engine.inject_tracing_log_function(&virtual_export_name("std.io", "info"), |s| {
127 tracing::info!("{s}")
128 })?;
129 engine.inject_tracing_log_function(&virtual_export_name("std.io", "warn"), |s| {
130 tracing::warn!("{s}")
131 })?;
132 engine.inject_tracing_log_function(&virtual_export_name("std.io", "error"), |s| {
133 tracing::error!("{s}")
134 })?;
135
136 inject_cli_io_natives(engine)?;
137 inject_cli_process_natives(engine)?;
138 Ok(())
139}
140
141fn inject_cli_io_natives(engine: &mut Engine) -> Result<(), EngineError> {
142 let i32_ty = Type::builtin(BuiltinTypeId::I32);
143 let u8_ty = Type::builtin(BuiltinTypeId::U8);
144 let array_u8 = array_type(u8_ty);
145 let mut library = Library::new("std.io");
146
147 let read_all_sym = sym("read_all");
148 library.export_native_async(
149 "read_all",
150 Scheme::new(vec![], vec![], Type::fun(i32_ty.clone(), array_u8.clone())),
151 1,
152 move |engine, _, args| {
153 let read_all_sym = read_all_sym.clone();
154 Box::pin(async move {
155 if args.len() != 1 {
156 return Err(EngineError::NativeArity {
157 name: read_all_sym,
158 expected: 1,
159 got: args.len(),
160 });
161 }
162 let fd = i32::from_pointer(&engine.heap, &args[0])?;
163
164 if fd != 0 {
165 return Err(EngineError::Internal(format!(
166 "read_all only supports fd 0 (stdin), got {fd}"
167 )));
168 }
169
170 let mut buf = Vec::new();
171 io::stdin()
172 .read_to_end(&mut buf)
173 .map_err(|e| EngineError::Internal(format!("read_all failed: {e}")))?;
174 bytes_to_array_u8(&engine.heap, buf)
175 })
176 },
177 )?;
178
179 let write_all_sym = sym("write_all");
180 library.export_native_async(
181 "write_all",
182 Scheme::new(
183 vec![],
184 vec![],
185 Type::fun(i32_ty, Type::fun(array_u8, unit_type())),
186 ),
187 2,
188 move |engine, _, args| {
189 let write_all_sym = write_all_sym.clone();
190 Box::pin(async move {
191 if args.len() != 2 {
192 return Err(EngineError::NativeArity {
193 name: write_all_sym,
194 expected: 2,
195 got: args.len(),
196 });
197 }
198 let fd = i32::from_pointer(&engine.heap, &args[0])?;
199 let bytes = array_u8_to_bytes(&engine.heap, &args[1])?;
200
201 match fd {
202 1 => {
203 let mut out = io::stdout().lock();
204 out.write_all(&bytes)
205 .and_then(|()| out.flush())
206 .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
207 }
208 2 => {
209 let mut out = io::stderr().lock();
210 out.write_all(&bytes)
211 .and_then(|()| out.flush())
212 .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
213 }
214 _ => {
215 return Err(EngineError::Internal(format!(
216 "write_all only supports fd 1 (stdout) and 2 (stderr), got {fd}"
217 )));
218 }
219 }
220
221 unit_pointer(&engine.heap)
222 })
223 },
224 )?;
225
226 engine.inject_library(library)
227}
228
229fn inject_cli_process_natives(engine: &mut Engine) -> Result<(), EngineError> {
230 let subprocess_name = virtual_export_name("std.process", "Subprocess");
231 let subprocess_ctor = sym(&subprocess_name);
232 let subprocess = Type::con(&subprocess_name, 0);
233 let string = Type::builtin(BuiltinTypeId::String);
234 let i32_ty = Type::builtin(BuiltinTypeId::I32);
235 let list_string = Type::app(Type::builtin(BuiltinTypeId::List), string.clone());
236 let mut library = Library::new("std.process");
237 let opts = Type::record(vec![
238 (sym("cmd"), string.clone()),
239 (sym("args"), list_string),
240 ]);
241
242 let spawn_sym = sym("spawn");
243 let subprocess_ctor_for_spawn = subprocess_ctor.clone();
244 library.export_native_async(
245 "spawn",
246 Scheme::new(vec![], vec![], Type::fun(opts, subprocess.clone())),
247 1,
248 move |engine, _, args| {
249 let spawn_sym = spawn_sym.clone();
250 let subprocess_ctor = subprocess_ctor_for_spawn.clone();
251 Box::pin(async move {
252 if args.len() != 1 {
253 return Err(EngineError::NativeArity {
254 name: spawn_sym.clone(),
255 expected: 1,
256 got: args.len(),
257 });
258 }
259 let map = engine.heap.pointer_as_dict(&args[0])?;
260
261 let cmd_pointer = map
262 .get(&sym("cmd"))
263 .cloned()
264 .ok_or_else(|| EngineError::Internal("spawn missing `cmd`".into()))?;
265 let cmd = String::from_pointer(&engine.heap, &cmd_pointer)?;
266
267 let args_pointer = map
268 .get(&sym("args"))
269 .cloned()
270 .ok_or_else(|| EngineError::Internal("spawn missing `args`".into()))?;
271 let args_list = list_to_vec(&engine.heap, &args_pointer)?;
272 let mut args_vec = Vec::with_capacity(args_list.len());
273 for arg in args_list {
274 args_vec.push(String::from_pointer(&engine.heap, &arg)?);
275 }
276
277 let mut child = Command::new(cmd)
278 .args(args_vec)
279 .stdin(Stdio::null())
280 .stdout(Stdio::piped())
281 .stderr(Stdio::piped())
282 .spawn()
283 .map_err(|e| EngineError::Internal(format!("spawn failed: {e}")))?;
284
285 let mut stdout = child.stdout.take().ok_or_else(|| {
286 EngineError::Internal("spawn failed to capture stdout".into())
287 })?;
288 let mut stderr = child.stderr.take().ok_or_else(|| {
289 EngineError::Internal("spawn failed to capture stderr".into())
290 })?;
291
292 let id = Uuid::new_v4();
293 let entry = Arc::new(SubprocessEntry::new(child));
294
295 {
296 let stdout_buf = entry.stdout.clone();
297 let entry_for_done = entry.clone();
298 let handle = thread::spawn(move || {
299 let mut tmp = [0u8; 8192];
300 loop {
301 let n = stdout.read(&mut tmp)?;
302 if n == 0 {
303 break;
304 }
305 if let Ok(mut buf) = stdout_buf.lock() {
306 buf.extend_from_slice(&tmp[..n]);
307 }
308 }
309 entry_for_done.stdout_done.store(true, Ordering::Release);
310 Ok(())
311 });
312 *lock_mutex(&entry.stdout_thread, "std.process.spawn stdout_thread")? =
313 Some(handle);
314 }
315
316 {
317 let stderr_buf = entry.stderr.clone();
318 let entry_for_done = entry.clone();
319 let handle = thread::spawn(move || {
320 let mut tmp = [0u8; 8192];
321 loop {
322 let n = stderr.read(&mut tmp)?;
323 if n == 0 {
324 break;
325 }
326 if let Ok(mut buf) = stderr_buf.lock() {
327 buf.extend_from_slice(&tmp[..n]);
328 }
329 }
330 entry_for_done.stderr_done.store(true, Ordering::Release);
331 Ok(())
332 });
333 *lock_mutex(&entry.stderr_thread, "std.process.spawn stderr_thread")? =
334 Some(handle);
335 }
336
337 subprocess_registry()
338 .procs
339 .lock()
340 .map_err(|_| {
341 EngineError::Internal(
342 "std.process.spawn: subprocess registry mutex poisoned (this is a bug)"
343 .into(),
344 )
345 })?
346 .insert(id, entry);
347
348 let mut payload = BTreeMap::new();
349 payload.insert(sym("id"), engine.heap.alloc_uuid(id)?);
350 let payload = engine.heap.alloc_dict(payload)?;
351 engine.heap.alloc_adt(subprocess_ctor, vec![payload])
352 })
353 },
354 )?;
355
356 let wait_sym = sym("wait");
357 let subprocess_ctor_for_wait = subprocess_ctor.clone();
358 library.export_native_async(
359 "wait",
360 Scheme::new(vec![], vec![], Type::fun(subprocess.clone(), i32_ty)),
361 1,
362 move |engine, _, args| {
363 let wait_sym = wait_sym.clone();
364 let subprocess_ctor = subprocess_ctor_for_wait.clone();
365 Box::pin(async move {
366 if args.len() != 1 {
367 return Err(EngineError::NativeArity {
368 name: wait_sym.clone(),
369 expected: 1,
370 got: args.len(),
371 });
372 }
373 let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
374 let entry = subprocess_get(&id, wait_sym.as_ref())?;
375
376 if let Some(code) = *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? {
377 return engine.heap.alloc_i32(code);
378 }
379
380 let status = {
381 let mut child_guard = lock_mutex(&entry.child, "std.process.wait child")?;
382 let Some(child) = child_guard.as_mut() else {
383 return Err(EngineError::Internal("subprocess already reaped".into()));
384 };
385 child
386 .wait()
387 .map_err(|e| EngineError::Internal(format!("wait failed: {e}")))?
388 };
389
390 let code = status.code().unwrap_or(-1);
391 *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? = Some(code);
392
393 if let Some(handle) = entry
395 .stdout_thread
396 .lock()
397 .map_err(|_| {
398 EngineError::Internal(
399 "std.process.wait: stdout_thread mutex poisoned (this is a bug)".into(),
400 )
401 })?
402 .take()
403 {
404 let _ = handle.join();
405 }
406 if let Some(handle) = entry
407 .stderr_thread
408 .lock()
409 .map_err(|_| {
410 EngineError::Internal(
411 "std.process.wait: stderr_thread mutex poisoned (this is a bug)".into(),
412 )
413 })?
414 .take()
415 {
416 let _ = handle.join();
417 }
418
419 engine.heap.alloc_i32(code)
420 })
421 },
422 )?;
423
424 let stdout_sym = sym("stdout");
425 let subprocess_ctor_for_stdout = subprocess_ctor.clone();
426 library.export_native_async(
427 "stdout",
428 Scheme::new(
429 vec![],
430 vec![],
431 Type::fun(
432 subprocess.clone(),
433 array_type(Type::builtin(BuiltinTypeId::U8)),
434 ),
435 ),
436 1,
437 move |engine, _, args| {
438 let stdout_sym = stdout_sym.clone();
439 let subprocess_ctor = subprocess_ctor_for_stdout.clone();
440 Box::pin(async move {
441 if args.len() != 1 {
442 return Err(EngineError::NativeArity {
443 name: stdout_sym.clone(),
444 expected: 1,
445 got: args.len(),
446 });
447 }
448 let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
449 let entry = subprocess_get(&id, stdout_sym.as_ref())?;
450 let bytes = lock_arc_mutex(&entry.stdout, "std.process.stdout buffer")?.clone();
451 bytes_to_array_u8(&engine.heap, bytes)
452 })
453 },
454 )?;
455
456 let stderr_sym = sym("stderr");
457 let subprocess_ctor_for_stderr = subprocess_ctor.clone();
458 library.export_native_async(
459 "stderr",
460 Scheme::new(
461 vec![],
462 vec![],
463 Type::fun(subprocess, array_type(Type::builtin(BuiltinTypeId::U8))),
464 ),
465 1,
466 move |engine, _, args| {
467 let stderr_sym = stderr_sym.clone();
468 let subprocess_ctor = subprocess_ctor_for_stderr.clone();
469 Box::pin(async move {
470 if args.len() != 1 {
471 return Err(EngineError::NativeArity {
472 name: stderr_sym.clone(),
473 expected: 1,
474 got: args.len(),
475 });
476 }
477 let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
478 let entry = subprocess_get(&id, stderr_sym.as_ref())?;
479 let bytes = lock_arc_mutex(&entry.stderr, "std.process.stderr buffer")?.clone();
480 bytes_to_array_u8(&engine.heap, bytes)
481 })
482 },
483 )?;
484
485 engine.inject_library(library)
486}
487
488fn subprocess_id(heap: &Heap, pointer: &Pointer, tag: &Symbol) -> Result<Uuid, EngineError> {
489 let (got_tag, args) = heap.pointer_as_adt(pointer)?;
490 if &got_tag != tag || args.len() != 1 {
491 return Err(EngineError::NativeType {
492 expected: "Subprocess".into(),
493 got: heap.type_name(pointer)?.into(),
494 });
495 }
496 let map = heap.pointer_as_dict(&args[0])?;
497 let id_pointer = map
498 .get(&sym("id"))
499 .cloned()
500 .ok_or_else(|| EngineError::Internal("Subprocess missing id".into()))?;
501 heap.pointer_as_uuid(&id_pointer)
502}
503
504fn subprocess_get(id: &Uuid, name: &str) -> Result<Arc<SubprocessEntry>, EngineError> {
505 subprocess_registry()
506 .procs
507 .lock()
508 .map_err(|_| {
509 EngineError::Internal(format!(
510 "{name}: subprocess registry mutex poisoned (this is a bug)"
511 ))
512 })?
513 .get(id)
514 .cloned()
515 .ok_or_else(|| EngineError::Internal(format!("{name}: unknown subprocess id {id}")))
516}
517
518#[cfg(test)]
519mod tests {
520 use rexlang::{Engine, GasMeter, assert_pointer_eq};
521
522 use super::*;
523
524 fn unlimited_gas() -> GasMeter {
525 GasMeter::default()
526 }
527
528 #[tokio::test]
529 async fn cli_prelude_typecheck_smoke() {
530 let code = r#"
531 import std.process
532 import std.io
533
534 let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
535 io.write_all 1 (process.stdout p)
536 "#;
537
538 let mut engine = Engine::with_prelude(()).unwrap();
539 engine.add_default_resolvers();
540 inject_cli_prelude_engine(&mut engine).unwrap();
541 let mut gas = unlimited_gas();
542 rexlang::Evaluator::new_with_compiler(
543 rexlang::RuntimeEnv::new(engine.clone()),
544 rexlang::Compiler::new(engine.clone()),
545 )
546 .eval_snippet(code, &mut gas)
547 .await
548 .unwrap();
549 }
550
551 #[tokio::test]
552 async fn cli_subprocess_captures_stdout_and_exit_code() {
553 let code = r#"
554 import std.process
555
556 let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
557 (process.wait p, process.stdout p, process.stderr p)
558 "#;
559
560 let mut engine = Engine::with_prelude(()).unwrap();
561 engine.add_default_resolvers();
562 inject_cli_prelude_engine(&mut engine).unwrap();
563 let mut gas = unlimited_gas();
564 let (value, ty) = rexlang::Evaluator::new_with_compiler(
565 rexlang::RuntimeEnv::new(engine.clone()),
566 rexlang::Compiler::new(engine.clone()),
567 )
568 .eval_snippet(code, &mut gas)
569 .await
570 .unwrap();
571 assert_eq!(
572 ty,
573 Type::tuple(vec![
574 Type::builtin(BuiltinTypeId::I32),
575 array_type(Type::builtin(BuiltinTypeId::U8)),
576 array_type(Type::builtin(BuiltinTypeId::U8)),
577 ])
578 );
579 let value = engine
580 .heap
581 .get(&value)
582 .map(|value| value.as_ref().clone())
583 .unwrap();
584 let Value::Tuple(xs) = value else {
585 panic!("expected tuple");
586 };
587 assert_pointer_eq!(
588 &engine.heap,
589 xs[0].clone(),
590 engine.heap.alloc_i32(0).unwrap()
591 );
592
593 let Value::Array(out) = engine
594 .heap
595 .get(&xs[1])
596 .map(|value| value.as_ref().clone())
597 .unwrap()
598 else {
599 panic!("expected stdout bytes");
600 };
601 let got: Vec<u8> = out
602 .iter()
603 .map(|v| {
604 match engine
605 .heap
606 .get(v)
607 .map(|value| value.as_ref().clone())
608 .unwrap()
609 {
610 Value::U8(b) => b,
611 other => panic!("expected u8, got {}", value_type_name(&other)),
612 }
613 })
614 .collect();
615 assert_eq!(got, b"hi");
616
617 let Value::Array(err) = engine
618 .heap
619 .get(&xs[2])
620 .map(|value| value.as_ref().clone())
621 .unwrap()
622 else {
623 panic!("expected stderr bytes");
624 };
625 assert!(err.is_empty());
626 }
627}