Skip to main content

rexlang_cli/
cli_prelude.rs

1use std::collections::{BTreeMap, HashMap};
2use std::io::{self, Read, Write};
3use std::process::{Command, Stdio};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, OnceLock};
6use std::thread::{self, JoinHandle};
7
8use rexlang::{
9    BuiltinTypeId, Engine, EngineError, FromPointer, Heap, Library, Pointer, Scheme, Symbol, Type,
10    Value, sym, virtual_export_name,
11};
12use uuid::Uuid;
13
14fn lock_mutex<'a, T>(
15    m: &'a Mutex<T>,
16    context: &str,
17) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
18    m.lock()
19        .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
20}
21
22fn lock_arc_mutex<'a, T>(
23    m: &'a Arc<Mutex<T>>,
24    context: &str,
25) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
26    m.lock()
27        .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
28}
29
30fn unit_pointer(heap: &Heap) -> Result<Pointer, EngineError> {
31    heap.alloc_tuple(vec![])
32}
33
34#[cfg(test)]
35fn value_type_name(value: &Value) -> &'static str {
36    value.value_type_name()
37}
38
39fn unit_type() -> Type {
40    Type::tuple(vec![])
41}
42
43fn array_type(elem: Type) -> Type {
44    Type::app(Type::builtin(BuiltinTypeId::Array), elem)
45}
46
47fn list_to_vec(heap: &Heap, pointer: &Pointer) -> Result<Vec<Pointer>, EngineError> {
48    let mut out = Vec::new();
49    let mut cursor = *pointer;
50    loop {
51        let value = heap.get(&cursor)?;
52        match value.as_ref() {
53            Value::Adt(tag, args) if tag.as_ref() == "Empty" && args.is_empty() => return Ok(out),
54            Value::Adt(tag, args) if tag.as_ref() == "Cons" && args.len() == 2 => {
55                out.push(args[0]);
56                cursor = args[1];
57            }
58            _ => {
59                return Err(EngineError::NativeType {
60                    expected: "List".into(),
61                    got: heap.type_name(&cursor)?.into(),
62                });
63            }
64        }
65    }
66}
67
68fn array_u8_to_bytes(heap: &Heap, pointer: &Pointer) -> Result<Vec<u8>, EngineError> {
69    let elems = heap.pointer_as_array(pointer)?;
70    let mut out = Vec::with_capacity(elems.len());
71    for elem in &elems {
72        out.push(heap.pointer_as_u8(elem)?);
73    }
74    Ok(out)
75}
76
77fn bytes_to_array_u8(heap: &Heap, bytes: Vec<u8>) -> Result<Pointer, EngineError> {
78    let out = bytes
79        .into_iter()
80        .map(|b| heap.alloc_u8(b))
81        .collect::<Result<Vec<_>, _>>()?;
82    heap.alloc_array(out)
83}
84
85#[derive(Default)]
86struct SubprocessRegistry {
87    procs: Mutex<HashMap<Uuid, Arc<SubprocessEntry>>>,
88}
89
90struct SubprocessEntry {
91    exit_code: Mutex<Option<i32>>,
92    child: Mutex<Option<std::process::Child>>,
93    stdout: Arc<Mutex<Vec<u8>>>,
94    stderr: Arc<Mutex<Vec<u8>>>,
95    stdout_done: AtomicBool,
96    stderr_done: AtomicBool,
97    stdout_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
98    stderr_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
99}
100
101impl SubprocessEntry {
102    fn new(child: std::process::Child) -> Self {
103        Self {
104            exit_code: Mutex::new(None),
105            child: Mutex::new(Some(child)),
106            stdout: Arc::new(Mutex::new(Vec::new())),
107            stderr: Arc::new(Mutex::new(Vec::new())),
108            stdout_done: AtomicBool::new(false),
109            stderr_done: AtomicBool::new(false),
110            stdout_thread: Mutex::new(None),
111            stderr_thread: Mutex::new(None),
112        }
113    }
114}
115
116static SUBPROCESSES: OnceLock<SubprocessRegistry> = OnceLock::new();
117
118fn subprocess_registry() -> &'static SubprocessRegistry {
119    SUBPROCESSES.get_or_init(SubprocessRegistry::default)
120}
121
122pub fn inject_cli_prelude_engine(engine: &mut Engine) -> Result<(), EngineError> {
123    inject_cli_io_natives(engine)?;
124    inject_cli_process_natives(engine)?;
125    Ok(())
126}
127
128fn inject_cli_io_natives(engine: &mut Engine) -> Result<(), EngineError> {
129    let i32_ty = Type::builtin(BuiltinTypeId::I32);
130    let u8_ty = Type::builtin(BuiltinTypeId::U8);
131    let array_u8 = array_type(u8_ty);
132    let mut library = Library::new("std.io");
133    library.export_tracing_log_functions()?;
134
135    let read_all_sym = sym("read_all");
136    library.export_native_async(
137        "read_all",
138        Scheme::new(vec![], vec![], Type::fun(i32_ty.clone(), array_u8.clone())),
139        1,
140        move |engine, _, args| {
141            let read_all_sym = read_all_sym.clone();
142            Box::pin(async move {
143                if args.len() != 1 {
144                    return Err(EngineError::NativeArity {
145                        name: read_all_sym,
146                        expected: 1,
147                        got: args.len(),
148                    });
149                }
150                let fd = i32::from_pointer(&engine.heap, &args[0])?;
151
152                if fd != 0 {
153                    return Err(EngineError::Internal(format!(
154                        "read_all only supports fd 0 (stdin), got {fd}"
155                    )));
156                }
157
158                let mut buf = Vec::new();
159                io::stdin()
160                    .read_to_end(&mut buf)
161                    .map_err(|e| EngineError::Internal(format!("read_all failed: {e}")))?;
162                bytes_to_array_u8(&engine.heap, buf)
163            })
164        },
165    )?;
166
167    let write_all_sym = sym("write_all");
168    library.export_native_async(
169        "write_all",
170        Scheme::new(
171            vec![],
172            vec![],
173            Type::fun(i32_ty, Type::fun(array_u8, unit_type())),
174        ),
175        2,
176        move |engine, _, args| {
177            let write_all_sym = write_all_sym.clone();
178            Box::pin(async move {
179                if args.len() != 2 {
180                    return Err(EngineError::NativeArity {
181                        name: write_all_sym,
182                        expected: 2,
183                        got: args.len(),
184                    });
185                }
186                let fd = i32::from_pointer(&engine.heap, &args[0])?;
187                let bytes = array_u8_to_bytes(&engine.heap, &args[1])?;
188
189                match fd {
190                    1 => {
191                        let mut out = io::stdout().lock();
192                        out.write_all(&bytes)
193                            .and_then(|()| out.flush())
194                            .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
195                    }
196                    2 => {
197                        let mut out = io::stderr().lock();
198                        out.write_all(&bytes)
199                            .and_then(|()| out.flush())
200                            .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
201                    }
202                    _ => {
203                        return Err(EngineError::Internal(format!(
204                            "write_all only supports fd 1 (stdout) and 2 (stderr), got {fd}"
205                        )));
206                    }
207                }
208
209                unit_pointer(&engine.heap)
210            })
211        },
212    )?;
213
214    engine.inject_library(library)
215}
216
217fn inject_cli_process_natives(engine: &mut Engine) -> Result<(), EngineError> {
218    let subprocess_name = virtual_export_name("std.process", "Subprocess");
219    let subprocess_ctor = sym(&subprocess_name);
220    let subprocess = Type::con(&subprocess_name, 0);
221    let string = Type::builtin(BuiltinTypeId::String);
222    let i32_ty = Type::builtin(BuiltinTypeId::I32);
223    let list_string = Type::app(Type::builtin(BuiltinTypeId::List), string.clone());
224    let mut library = Library::new("std.process");
225    let opts = Type::record(vec![
226        (sym("cmd"), string.clone()),
227        (sym("args"), list_string),
228    ]);
229
230    let spawn_sym = sym("spawn");
231    let subprocess_ctor_for_spawn = subprocess_ctor.clone();
232    library.export_native_async(
233        "spawn",
234        Scheme::new(vec![], vec![], Type::fun(opts, subprocess.clone())),
235        1,
236        move |engine, _, args| {
237            let spawn_sym = spawn_sym.clone();
238            let subprocess_ctor = subprocess_ctor_for_spawn.clone();
239            Box::pin(async move {
240                if args.len() != 1 {
241                    return Err(EngineError::NativeArity {
242                        name: spawn_sym.clone(),
243                        expected: 1,
244                        got: args.len(),
245                    });
246                }
247                let map = engine.heap.pointer_as_dict(&args[0])?;
248
249                let cmd_pointer = map
250                    .get(&sym("cmd"))
251                    .cloned()
252                    .ok_or_else(|| EngineError::Internal("spawn missing `cmd`".into()))?;
253                let cmd = String::from_pointer(&engine.heap, &cmd_pointer)?;
254
255                let args_pointer = map
256                    .get(&sym("args"))
257                    .cloned()
258                    .ok_or_else(|| EngineError::Internal("spawn missing `args`".into()))?;
259                let args_list = list_to_vec(&engine.heap, &args_pointer)?;
260                let mut args_vec = Vec::with_capacity(args_list.len());
261                for arg in args_list {
262                    args_vec.push(String::from_pointer(&engine.heap, &arg)?);
263                }
264
265                let mut child = Command::new(cmd)
266                    .args(args_vec)
267                    .stdin(Stdio::null())
268                    .stdout(Stdio::piped())
269                    .stderr(Stdio::piped())
270                    .spawn()
271                    .map_err(|e| EngineError::Internal(format!("spawn failed: {e}")))?;
272
273                let mut stdout = child.stdout.take().ok_or_else(|| {
274                    EngineError::Internal("spawn failed to capture stdout".into())
275                })?;
276                let mut stderr = child.stderr.take().ok_or_else(|| {
277                    EngineError::Internal("spawn failed to capture stderr".into())
278                })?;
279
280                let id = Uuid::new_v4();
281                let entry = Arc::new(SubprocessEntry::new(child));
282
283                {
284                    let stdout_buf = entry.stdout.clone();
285                    let entry_for_done = entry.clone();
286                    let handle = thread::spawn(move || {
287                        let mut tmp = [0u8; 8192];
288                        loop {
289                            let n = stdout.read(&mut tmp)?;
290                            if n == 0 {
291                                break;
292                            }
293                            if let Ok(mut buf) = stdout_buf.lock() {
294                                buf.extend_from_slice(&tmp[..n]);
295                            }
296                        }
297                        entry_for_done.stdout_done.store(true, Ordering::Release);
298                        Ok(())
299                    });
300                    *lock_mutex(&entry.stdout_thread, "std.process.spawn stdout_thread")? =
301                        Some(handle);
302                }
303
304                {
305                    let stderr_buf = entry.stderr.clone();
306                    let entry_for_done = entry.clone();
307                    let handle = thread::spawn(move || {
308                        let mut tmp = [0u8; 8192];
309                        loop {
310                            let n = stderr.read(&mut tmp)?;
311                            if n == 0 {
312                                break;
313                            }
314                            if let Ok(mut buf) = stderr_buf.lock() {
315                                buf.extend_from_slice(&tmp[..n]);
316                            }
317                        }
318                        entry_for_done.stderr_done.store(true, Ordering::Release);
319                        Ok(())
320                    });
321                    *lock_mutex(&entry.stderr_thread, "std.process.spawn stderr_thread")? =
322                        Some(handle);
323                }
324
325                subprocess_registry()
326                    .procs
327                    .lock()
328                    .map_err(|_| {
329                        EngineError::Internal(
330                            "std.process.spawn: subprocess registry mutex poisoned (this is a bug)"
331                                .into(),
332                        )
333                    })?
334                    .insert(id, entry);
335
336                let mut payload = BTreeMap::new();
337                payload.insert(sym("id"), engine.heap.alloc_uuid(id)?);
338                let payload = engine.heap.alloc_dict(payload)?;
339                engine.heap.alloc_adt(subprocess_ctor, vec![payload])
340            })
341        },
342    )?;
343
344    let wait_sym = sym("wait");
345    let subprocess_ctor_for_wait = subprocess_ctor.clone();
346    library.export_native_async(
347        "wait",
348        Scheme::new(vec![], vec![], Type::fun(subprocess.clone(), i32_ty)),
349        1,
350        move |engine, _, args| {
351            let wait_sym = wait_sym.clone();
352            let subprocess_ctor = subprocess_ctor_for_wait.clone();
353            Box::pin(async move {
354                if args.len() != 1 {
355                    return Err(EngineError::NativeArity {
356                        name: wait_sym.clone(),
357                        expected: 1,
358                        got: args.len(),
359                    });
360                }
361                let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
362                let entry = subprocess_get(&id, wait_sym.as_ref())?;
363
364                if let Some(code) = *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? {
365                    return engine.heap.alloc_i32(code);
366                }
367
368                let status = {
369                    let mut child_guard = lock_mutex(&entry.child, "std.process.wait child")?;
370                    let Some(child) = child_guard.as_mut() else {
371                        return Err(EngineError::Internal("subprocess already reaped".into()));
372                    };
373                    child
374                        .wait()
375                        .map_err(|e| EngineError::Internal(format!("wait failed: {e}")))?
376                };
377
378                let code = status.code().unwrap_or(-1);
379                *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? = Some(code);
380
381                // Ensure pipes are drained.
382                if let Some(handle) = entry
383                    .stdout_thread
384                    .lock()
385                    .map_err(|_| {
386                        EngineError::Internal(
387                            "std.process.wait: stdout_thread mutex poisoned (this is a bug)".into(),
388                        )
389                    })?
390                    .take()
391                {
392                    let _ = handle.join();
393                }
394                if let Some(handle) = entry
395                    .stderr_thread
396                    .lock()
397                    .map_err(|_| {
398                        EngineError::Internal(
399                            "std.process.wait: stderr_thread mutex poisoned (this is a bug)".into(),
400                        )
401                    })?
402                    .take()
403                {
404                    let _ = handle.join();
405                }
406
407                engine.heap.alloc_i32(code)
408            })
409        },
410    )?;
411
412    let stdout_sym = sym("stdout");
413    let subprocess_ctor_for_stdout = subprocess_ctor.clone();
414    library.export_native_async(
415        "stdout",
416        Scheme::new(
417            vec![],
418            vec![],
419            Type::fun(
420                subprocess.clone(),
421                array_type(Type::builtin(BuiltinTypeId::U8)),
422            ),
423        ),
424        1,
425        move |engine, _, args| {
426            let stdout_sym = stdout_sym.clone();
427            let subprocess_ctor = subprocess_ctor_for_stdout.clone();
428            Box::pin(async move {
429                if args.len() != 1 {
430                    return Err(EngineError::NativeArity {
431                        name: stdout_sym.clone(),
432                        expected: 1,
433                        got: args.len(),
434                    });
435                }
436                let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
437                let entry = subprocess_get(&id, stdout_sym.as_ref())?;
438                let bytes = lock_arc_mutex(&entry.stdout, "std.process.stdout buffer")?.clone();
439                bytes_to_array_u8(&engine.heap, bytes)
440            })
441        },
442    )?;
443
444    let stderr_sym = sym("stderr");
445    let subprocess_ctor_for_stderr = subprocess_ctor.clone();
446    library.export_native_async(
447        "stderr",
448        Scheme::new(
449            vec![],
450            vec![],
451            Type::fun(subprocess, array_type(Type::builtin(BuiltinTypeId::U8))),
452        ),
453        1,
454        move |engine, _, args| {
455            let stderr_sym = stderr_sym.clone();
456            let subprocess_ctor = subprocess_ctor_for_stderr.clone();
457            Box::pin(async move {
458                if args.len() != 1 {
459                    return Err(EngineError::NativeArity {
460                        name: stderr_sym.clone(),
461                        expected: 1,
462                        got: args.len(),
463                    });
464                }
465                let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
466                let entry = subprocess_get(&id, stderr_sym.as_ref())?;
467                let bytes = lock_arc_mutex(&entry.stderr, "std.process.stderr buffer")?.clone();
468                bytes_to_array_u8(&engine.heap, bytes)
469            })
470        },
471    )?;
472
473    engine.inject_library(library)
474}
475
476fn subprocess_id(heap: &Heap, pointer: &Pointer, tag: &Symbol) -> Result<Uuid, EngineError> {
477    let (got_tag, args) = heap.pointer_as_adt(pointer)?;
478    if &got_tag != tag || args.len() != 1 {
479        return Err(EngineError::NativeType {
480            expected: "Subprocess".into(),
481            got: heap.type_name(pointer)?.into(),
482        });
483    }
484    let map = heap.pointer_as_dict(&args[0])?;
485    let id_pointer = map
486        .get(&sym("id"))
487        .cloned()
488        .ok_or_else(|| EngineError::Internal("Subprocess missing id".into()))?;
489    heap.pointer_as_uuid(&id_pointer)
490}
491
492fn subprocess_get(id: &Uuid, name: &str) -> Result<Arc<SubprocessEntry>, EngineError> {
493    subprocess_registry()
494        .procs
495        .lock()
496        .map_err(|_| {
497            EngineError::Internal(format!(
498                "{name}: subprocess registry mutex poisoned (this is a bug)"
499            ))
500        })?
501        .get(id)
502        .cloned()
503        .ok_or_else(|| EngineError::Internal(format!("{name}: unknown subprocess id {id}")))
504}
505
506#[cfg(test)]
507mod tests {
508    use rexlang::{Engine, GasMeter, assert_pointer_eq};
509
510    use super::*;
511
512    fn unlimited_gas() -> GasMeter {
513        GasMeter::default()
514    }
515
516    #[tokio::test]
517    async fn cli_prelude_typecheck_smoke() {
518        let code = r#"
519            import std.process
520            import std.io
521
522            let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
523              io.write_all 1 (process.stdout p)
524        "#;
525
526        let mut engine = Engine::with_prelude(()).unwrap();
527        engine.add_default_resolvers();
528        inject_cli_prelude_engine(&mut engine).unwrap();
529        let mut gas = unlimited_gas();
530        rexlang::Evaluator::new_with_compiler(
531            rexlang::RuntimeEnv::new(engine.clone()),
532            rexlang::Compiler::new(engine.clone()),
533        )
534        .eval_snippet(code, &mut gas)
535        .await
536        .unwrap();
537    }
538
539    #[tokio::test]
540    async fn cli_subprocess_captures_stdout_and_exit_code() {
541        let code = r#"
542            import std.process
543
544            let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
545              (process.wait p, process.stdout p, process.stderr p)
546        "#;
547
548        let mut engine = Engine::with_prelude(()).unwrap();
549        engine.add_default_resolvers();
550        inject_cli_prelude_engine(&mut engine).unwrap();
551        let mut gas = unlimited_gas();
552        let (value, ty) = rexlang::Evaluator::new_with_compiler(
553            rexlang::RuntimeEnv::new(engine.clone()),
554            rexlang::Compiler::new(engine.clone()),
555        )
556        .eval_snippet(code, &mut gas)
557        .await
558        .unwrap();
559        assert_eq!(
560            ty,
561            Type::tuple(vec![
562                Type::builtin(BuiltinTypeId::I32),
563                array_type(Type::builtin(BuiltinTypeId::U8)),
564                array_type(Type::builtin(BuiltinTypeId::U8)),
565            ])
566        );
567        let value = engine
568            .heap
569            .get(&value)
570            .map(|value| value.as_ref().clone())
571            .unwrap();
572        let Value::Tuple(xs) = value else {
573            panic!("expected tuple");
574        };
575        assert_pointer_eq!(
576            &engine.heap,
577            xs[0].clone(),
578            engine.heap.alloc_i32(0).unwrap()
579        );
580
581        let Value::Array(out) = engine
582            .heap
583            .get(&xs[1])
584            .map(|value| value.as_ref().clone())
585            .unwrap()
586        else {
587            panic!("expected stdout bytes");
588        };
589        let got: Vec<u8> = out
590            .iter()
591            .map(|v| {
592                match engine
593                    .heap
594                    .get(v)
595                    .map(|value| value.as_ref().clone())
596                    .unwrap()
597                {
598                    Value::U8(b) => b,
599                    other => panic!("expected u8, got {}", value_type_name(&other)),
600                }
601            })
602            .collect();
603        assert_eq!(got, b"hi");
604
605        let Value::Array(err) = engine
606            .heap
607            .get(&xs[2])
608            .map(|value| value.as_ref().clone())
609            .unwrap()
610        else {
611            panic!("expected stderr bytes");
612        };
613        assert!(err.is_empty());
614    }
615}