1use std::collections::{BTreeMap, HashMap};
2use std::io::{self, Read, Write};
3use std::process::{Command, Stdio};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, OnceLock};
6use std::thread::{self, JoinHandle};
7
8use rexlang::{
9 BuiltinTypeId, Engine, EngineError, FromPointer, Heap, Library, Pointer, Scheme, Symbol, Type,
10 Value, sym, virtual_export_name,
11};
12use uuid::Uuid;
13
14fn lock_mutex<'a, T>(
15 m: &'a Mutex<T>,
16 context: &str,
17) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
18 m.lock()
19 .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
20}
21
22fn lock_arc_mutex<'a, T>(
23 m: &'a Arc<Mutex<T>>,
24 context: &str,
25) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
26 m.lock()
27 .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
28}
29
30fn unit_pointer(heap: &Heap) -> Result<Pointer, EngineError> {
31 heap.alloc_tuple(vec![])
32}
33
34#[cfg(test)]
35fn value_type_name(value: &Value) -> &'static str {
36 value.value_type_name()
37}
38
39fn unit_type() -> Type {
40 Type::tuple(vec![])
41}
42
43fn array_type(elem: Type) -> Type {
44 Type::app(Type::builtin(BuiltinTypeId::Array), elem)
45}
46
47fn list_to_vec(heap: &Heap, pointer: &Pointer) -> Result<Vec<Pointer>, EngineError> {
48 let mut out = Vec::new();
49 let mut cursor = *pointer;
50 loop {
51 let value = heap.get(&cursor)?;
52 match value.as_ref() {
53 Value::Adt(tag, args) if tag.as_ref() == "Empty" && args.is_empty() => return Ok(out),
54 Value::Adt(tag, args) if tag.as_ref() == "Cons" && args.len() == 2 => {
55 out.push(args[0]);
56 cursor = args[1];
57 }
58 _ => {
59 return Err(EngineError::NativeType {
60 expected: "List".into(),
61 got: heap.type_name(&cursor)?.into(),
62 });
63 }
64 }
65 }
66}
67
68fn array_u8_to_bytes(heap: &Heap, pointer: &Pointer) -> Result<Vec<u8>, EngineError> {
69 let elems = heap.pointer_as_array(pointer)?;
70 let mut out = Vec::with_capacity(elems.len());
71 for elem in &elems {
72 out.push(heap.pointer_as_u8(elem)?);
73 }
74 Ok(out)
75}
76
77fn bytes_to_array_u8(heap: &Heap, bytes: Vec<u8>) -> Result<Pointer, EngineError> {
78 let out = bytes
79 .into_iter()
80 .map(|b| heap.alloc_u8(b))
81 .collect::<Result<Vec<_>, _>>()?;
82 heap.alloc_array(out)
83}
84
85#[derive(Default)]
86struct SubprocessRegistry {
87 procs: Mutex<HashMap<Uuid, Arc<SubprocessEntry>>>,
88}
89
90struct SubprocessEntry {
91 exit_code: Mutex<Option<i32>>,
92 child: Mutex<Option<std::process::Child>>,
93 stdout: Arc<Mutex<Vec<u8>>>,
94 stderr: Arc<Mutex<Vec<u8>>>,
95 stdout_done: AtomicBool,
96 stderr_done: AtomicBool,
97 stdout_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
98 stderr_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
99}
100
101impl SubprocessEntry {
102 fn new(child: std::process::Child) -> Self {
103 Self {
104 exit_code: Mutex::new(None),
105 child: Mutex::new(Some(child)),
106 stdout: Arc::new(Mutex::new(Vec::new())),
107 stderr: Arc::new(Mutex::new(Vec::new())),
108 stdout_done: AtomicBool::new(false),
109 stderr_done: AtomicBool::new(false),
110 stdout_thread: Mutex::new(None),
111 stderr_thread: Mutex::new(None),
112 }
113 }
114}
115
116static SUBPROCESSES: OnceLock<SubprocessRegistry> = OnceLock::new();
117
118fn subprocess_registry() -> &'static SubprocessRegistry {
119 SUBPROCESSES.get_or_init(SubprocessRegistry::default)
120}
121
122pub fn inject_cli_prelude_engine(engine: &mut Engine) -> Result<(), EngineError> {
123 inject_cli_io_natives(engine)?;
124 inject_cli_process_natives(engine)?;
125 Ok(())
126}
127
128fn inject_cli_io_natives(engine: &mut Engine) -> Result<(), EngineError> {
129 let i32_ty = Type::builtin(BuiltinTypeId::I32);
130 let u8_ty = Type::builtin(BuiltinTypeId::U8);
131 let array_u8 = array_type(u8_ty);
132 let mut library = Library::new("std.io");
133 library.export_tracing_log_functions()?;
134
135 let read_all_sym = sym("read_all");
136 library.export_native_async(
137 "read_all",
138 Scheme::new(vec![], vec![], Type::fun(i32_ty.clone(), array_u8.clone())),
139 1,
140 move |engine, _, args| {
141 let read_all_sym = read_all_sym.clone();
142 Box::pin(async move {
143 if args.len() != 1 {
144 return Err(EngineError::NativeArity {
145 name: read_all_sym,
146 expected: 1,
147 got: args.len(),
148 });
149 }
150 let fd = i32::from_pointer(&engine.heap, &args[0])?;
151
152 if fd != 0 {
153 return Err(EngineError::Internal(format!(
154 "read_all only supports fd 0 (stdin), got {fd}"
155 )));
156 }
157
158 let mut buf = Vec::new();
159 io::stdin()
160 .read_to_end(&mut buf)
161 .map_err(|e| EngineError::Internal(format!("read_all failed: {e}")))?;
162 bytes_to_array_u8(&engine.heap, buf)
163 })
164 },
165 )?;
166
167 let write_all_sym = sym("write_all");
168 library.export_native_async(
169 "write_all",
170 Scheme::new(
171 vec![],
172 vec![],
173 Type::fun(i32_ty, Type::fun(array_u8, unit_type())),
174 ),
175 2,
176 move |engine, _, args| {
177 let write_all_sym = write_all_sym.clone();
178 Box::pin(async move {
179 if args.len() != 2 {
180 return Err(EngineError::NativeArity {
181 name: write_all_sym,
182 expected: 2,
183 got: args.len(),
184 });
185 }
186 let fd = i32::from_pointer(&engine.heap, &args[0])?;
187 let bytes = array_u8_to_bytes(&engine.heap, &args[1])?;
188
189 match fd {
190 1 => {
191 let mut out = io::stdout().lock();
192 out.write_all(&bytes)
193 .and_then(|()| out.flush())
194 .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
195 }
196 2 => {
197 let mut out = io::stderr().lock();
198 out.write_all(&bytes)
199 .and_then(|()| out.flush())
200 .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
201 }
202 _ => {
203 return Err(EngineError::Internal(format!(
204 "write_all only supports fd 1 (stdout) and 2 (stderr), got {fd}"
205 )));
206 }
207 }
208
209 unit_pointer(&engine.heap)
210 })
211 },
212 )?;
213
214 engine.inject_library(library)
215}
216
217fn inject_cli_process_natives(engine: &mut Engine) -> Result<(), EngineError> {
218 let subprocess_name = virtual_export_name("std.process", "Subprocess");
219 let subprocess_ctor = sym(&subprocess_name);
220 let subprocess = Type::con(&subprocess_name, 0);
221 let string = Type::builtin(BuiltinTypeId::String);
222 let i32_ty = Type::builtin(BuiltinTypeId::I32);
223 let list_string = Type::app(Type::builtin(BuiltinTypeId::List), string.clone());
224 let mut library = Library::new("std.process");
225 let opts = Type::record(vec![
226 (sym("cmd"), string.clone()),
227 (sym("args"), list_string),
228 ]);
229
230 let spawn_sym = sym("spawn");
231 let subprocess_ctor_for_spawn = subprocess_ctor.clone();
232 library.export_native_async(
233 "spawn",
234 Scheme::new(vec![], vec![], Type::fun(opts, subprocess.clone())),
235 1,
236 move |engine, _, args| {
237 let spawn_sym = spawn_sym.clone();
238 let subprocess_ctor = subprocess_ctor_for_spawn.clone();
239 Box::pin(async move {
240 if args.len() != 1 {
241 return Err(EngineError::NativeArity {
242 name: spawn_sym.clone(),
243 expected: 1,
244 got: args.len(),
245 });
246 }
247 let map = engine.heap.pointer_as_dict(&args[0])?;
248
249 let cmd_pointer = map
250 .get(&sym("cmd"))
251 .cloned()
252 .ok_or_else(|| EngineError::Internal("spawn missing `cmd`".into()))?;
253 let cmd = String::from_pointer(&engine.heap, &cmd_pointer)?;
254
255 let args_pointer = map
256 .get(&sym("args"))
257 .cloned()
258 .ok_or_else(|| EngineError::Internal("spawn missing `args`".into()))?;
259 let args_list = list_to_vec(&engine.heap, &args_pointer)?;
260 let mut args_vec = Vec::with_capacity(args_list.len());
261 for arg in args_list {
262 args_vec.push(String::from_pointer(&engine.heap, &arg)?);
263 }
264
265 let mut child = Command::new(cmd)
266 .args(args_vec)
267 .stdin(Stdio::null())
268 .stdout(Stdio::piped())
269 .stderr(Stdio::piped())
270 .spawn()
271 .map_err(|e| EngineError::Internal(format!("spawn failed: {e}")))?;
272
273 let mut stdout = child.stdout.take().ok_or_else(|| {
274 EngineError::Internal("spawn failed to capture stdout".into())
275 })?;
276 let mut stderr = child.stderr.take().ok_or_else(|| {
277 EngineError::Internal("spawn failed to capture stderr".into())
278 })?;
279
280 let id = Uuid::new_v4();
281 let entry = Arc::new(SubprocessEntry::new(child));
282
283 {
284 let stdout_buf = entry.stdout.clone();
285 let entry_for_done = entry.clone();
286 let handle = thread::spawn(move || {
287 let mut tmp = [0u8; 8192];
288 loop {
289 let n = stdout.read(&mut tmp)?;
290 if n == 0 {
291 break;
292 }
293 if let Ok(mut buf) = stdout_buf.lock() {
294 buf.extend_from_slice(&tmp[..n]);
295 }
296 }
297 entry_for_done.stdout_done.store(true, Ordering::Release);
298 Ok(())
299 });
300 *lock_mutex(&entry.stdout_thread, "std.process.spawn stdout_thread")? =
301 Some(handle);
302 }
303
304 {
305 let stderr_buf = entry.stderr.clone();
306 let entry_for_done = entry.clone();
307 let handle = thread::spawn(move || {
308 let mut tmp = [0u8; 8192];
309 loop {
310 let n = stderr.read(&mut tmp)?;
311 if n == 0 {
312 break;
313 }
314 if let Ok(mut buf) = stderr_buf.lock() {
315 buf.extend_from_slice(&tmp[..n]);
316 }
317 }
318 entry_for_done.stderr_done.store(true, Ordering::Release);
319 Ok(())
320 });
321 *lock_mutex(&entry.stderr_thread, "std.process.spawn stderr_thread")? =
322 Some(handle);
323 }
324
325 subprocess_registry()
326 .procs
327 .lock()
328 .map_err(|_| {
329 EngineError::Internal(
330 "std.process.spawn: subprocess registry mutex poisoned (this is a bug)"
331 .into(),
332 )
333 })?
334 .insert(id, entry);
335
336 let mut payload = BTreeMap::new();
337 payload.insert(sym("id"), engine.heap.alloc_uuid(id)?);
338 let payload = engine.heap.alloc_dict(payload)?;
339 engine.heap.alloc_adt(subprocess_ctor, vec![payload])
340 })
341 },
342 )?;
343
344 let wait_sym = sym("wait");
345 let subprocess_ctor_for_wait = subprocess_ctor.clone();
346 library.export_native_async(
347 "wait",
348 Scheme::new(vec![], vec![], Type::fun(subprocess.clone(), i32_ty)),
349 1,
350 move |engine, _, args| {
351 let wait_sym = wait_sym.clone();
352 let subprocess_ctor = subprocess_ctor_for_wait.clone();
353 Box::pin(async move {
354 if args.len() != 1 {
355 return Err(EngineError::NativeArity {
356 name: wait_sym.clone(),
357 expected: 1,
358 got: args.len(),
359 });
360 }
361 let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
362 let entry = subprocess_get(&id, wait_sym.as_ref())?;
363
364 if let Some(code) = *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? {
365 return engine.heap.alloc_i32(code);
366 }
367
368 let status = {
369 let mut child_guard = lock_mutex(&entry.child, "std.process.wait child")?;
370 let Some(child) = child_guard.as_mut() else {
371 return Err(EngineError::Internal("subprocess already reaped".into()));
372 };
373 child
374 .wait()
375 .map_err(|e| EngineError::Internal(format!("wait failed: {e}")))?
376 };
377
378 let code = status.code().unwrap_or(-1);
379 *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? = Some(code);
380
381 if let Some(handle) = entry
383 .stdout_thread
384 .lock()
385 .map_err(|_| {
386 EngineError::Internal(
387 "std.process.wait: stdout_thread mutex poisoned (this is a bug)".into(),
388 )
389 })?
390 .take()
391 {
392 let _ = handle.join();
393 }
394 if let Some(handle) = entry
395 .stderr_thread
396 .lock()
397 .map_err(|_| {
398 EngineError::Internal(
399 "std.process.wait: stderr_thread mutex poisoned (this is a bug)".into(),
400 )
401 })?
402 .take()
403 {
404 let _ = handle.join();
405 }
406
407 engine.heap.alloc_i32(code)
408 })
409 },
410 )?;
411
412 let stdout_sym = sym("stdout");
413 let subprocess_ctor_for_stdout = subprocess_ctor.clone();
414 library.export_native_async(
415 "stdout",
416 Scheme::new(
417 vec![],
418 vec![],
419 Type::fun(
420 subprocess.clone(),
421 array_type(Type::builtin(BuiltinTypeId::U8)),
422 ),
423 ),
424 1,
425 move |engine, _, args| {
426 let stdout_sym = stdout_sym.clone();
427 let subprocess_ctor = subprocess_ctor_for_stdout.clone();
428 Box::pin(async move {
429 if args.len() != 1 {
430 return Err(EngineError::NativeArity {
431 name: stdout_sym.clone(),
432 expected: 1,
433 got: args.len(),
434 });
435 }
436 let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
437 let entry = subprocess_get(&id, stdout_sym.as_ref())?;
438 let bytes = lock_arc_mutex(&entry.stdout, "std.process.stdout buffer")?.clone();
439 bytes_to_array_u8(&engine.heap, bytes)
440 })
441 },
442 )?;
443
444 let stderr_sym = sym("stderr");
445 let subprocess_ctor_for_stderr = subprocess_ctor.clone();
446 library.export_native_async(
447 "stderr",
448 Scheme::new(
449 vec![],
450 vec![],
451 Type::fun(subprocess, array_type(Type::builtin(BuiltinTypeId::U8))),
452 ),
453 1,
454 move |engine, _, args| {
455 let stderr_sym = stderr_sym.clone();
456 let subprocess_ctor = subprocess_ctor_for_stderr.clone();
457 Box::pin(async move {
458 if args.len() != 1 {
459 return Err(EngineError::NativeArity {
460 name: stderr_sym.clone(),
461 expected: 1,
462 got: args.len(),
463 });
464 }
465 let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
466 let entry = subprocess_get(&id, stderr_sym.as_ref())?;
467 let bytes = lock_arc_mutex(&entry.stderr, "std.process.stderr buffer")?.clone();
468 bytes_to_array_u8(&engine.heap, bytes)
469 })
470 },
471 )?;
472
473 engine.inject_library(library)
474}
475
476fn subprocess_id(heap: &Heap, pointer: &Pointer, tag: &Symbol) -> Result<Uuid, EngineError> {
477 let (got_tag, args) = heap.pointer_as_adt(pointer)?;
478 if &got_tag != tag || args.len() != 1 {
479 return Err(EngineError::NativeType {
480 expected: "Subprocess".into(),
481 got: heap.type_name(pointer)?.into(),
482 });
483 }
484 let map = heap.pointer_as_dict(&args[0])?;
485 let id_pointer = map
486 .get(&sym("id"))
487 .cloned()
488 .ok_or_else(|| EngineError::Internal("Subprocess missing id".into()))?;
489 heap.pointer_as_uuid(&id_pointer)
490}
491
492fn subprocess_get(id: &Uuid, name: &str) -> Result<Arc<SubprocessEntry>, EngineError> {
493 subprocess_registry()
494 .procs
495 .lock()
496 .map_err(|_| {
497 EngineError::Internal(format!(
498 "{name}: subprocess registry mutex poisoned (this is a bug)"
499 ))
500 })?
501 .get(id)
502 .cloned()
503 .ok_or_else(|| EngineError::Internal(format!("{name}: unknown subprocess id {id}")))
504}
505
506#[cfg(test)]
507mod tests {
508 use rexlang::{Engine, GasMeter, assert_pointer_eq};
509
510 use super::*;
511
512 fn unlimited_gas() -> GasMeter {
513 GasMeter::default()
514 }
515
516 #[tokio::test]
517 async fn cli_prelude_typecheck_smoke() {
518 let code = r#"
519 import std.process
520 import std.io
521
522 let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
523 io.write_all 1 (process.stdout p)
524 "#;
525
526 let mut engine = Engine::with_prelude(()).unwrap();
527 engine.add_default_resolvers();
528 inject_cli_prelude_engine(&mut engine).unwrap();
529 let mut gas = unlimited_gas();
530 rexlang::Evaluator::new_with_compiler(
531 rexlang::RuntimeEnv::new(engine.clone()),
532 rexlang::Compiler::new(engine.clone()),
533 )
534 .eval_snippet(code, &mut gas)
535 .await
536 .unwrap();
537 }
538
539 #[tokio::test]
540 async fn cli_subprocess_captures_stdout_and_exit_code() {
541 let code = r#"
542 import std.process
543
544 let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
545 (process.wait p, process.stdout p, process.stderr p)
546 "#;
547
548 let mut engine = Engine::with_prelude(()).unwrap();
549 engine.add_default_resolvers();
550 inject_cli_prelude_engine(&mut engine).unwrap();
551 let mut gas = unlimited_gas();
552 let (value, ty) = rexlang::Evaluator::new_with_compiler(
553 rexlang::RuntimeEnv::new(engine.clone()),
554 rexlang::Compiler::new(engine.clone()),
555 )
556 .eval_snippet(code, &mut gas)
557 .await
558 .unwrap();
559 assert_eq!(
560 ty,
561 Type::tuple(vec![
562 Type::builtin(BuiltinTypeId::I32),
563 array_type(Type::builtin(BuiltinTypeId::U8)),
564 array_type(Type::builtin(BuiltinTypeId::U8)),
565 ])
566 );
567 let value = engine
568 .heap
569 .get(&value)
570 .map(|value| value.as_ref().clone())
571 .unwrap();
572 let Value::Tuple(xs) = value else {
573 panic!("expected tuple");
574 };
575 assert_pointer_eq!(
576 &engine.heap,
577 xs[0].clone(),
578 engine.heap.alloc_i32(0).unwrap()
579 );
580
581 let Value::Array(out) = engine
582 .heap
583 .get(&xs[1])
584 .map(|value| value.as_ref().clone())
585 .unwrap()
586 else {
587 panic!("expected stdout bytes");
588 };
589 let got: Vec<u8> = out
590 .iter()
591 .map(|v| {
592 match engine
593 .heap
594 .get(v)
595 .map(|value| value.as_ref().clone())
596 .unwrap()
597 {
598 Value::U8(b) => b,
599 other => panic!("expected u8, got {}", value_type_name(&other)),
600 }
601 })
602 .collect();
603 assert_eq!(got, b"hi");
604
605 let Value::Array(err) = engine
606 .heap
607 .get(&xs[2])
608 .map(|value| value.as_ref().clone())
609 .unwrap()
610 else {
611 panic!("expected stderr bytes");
612 };
613 assert!(err.is_empty());
614 }
615}