Skip to main content

rexlang_cli/
cli_prelude.rs

1use std::collections::{BTreeMap, HashMap};
2use std::io::{self, Read, Write};
3use std::process::{Command, Stdio};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex, OnceLock};
6use std::thread::{self, JoinHandle};
7
8use rexlang::{
9    BuiltinTypeId, Engine, EngineError, FromPointer, Heap, Library, Pointer, Scheme, Symbol, Type,
10    Value, sym, virtual_export_name,
11};
12use uuid::Uuid;
13
14fn lock_mutex<'a, T>(
15    m: &'a Mutex<T>,
16    context: &str,
17) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
18    m.lock()
19        .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
20}
21
22fn lock_arc_mutex<'a, T>(
23    m: &'a Arc<Mutex<T>>,
24    context: &str,
25) -> Result<std::sync::MutexGuard<'a, T>, EngineError> {
26    m.lock()
27        .map_err(|_| EngineError::Internal(format!("{context}: mutex poisoned (this is a bug)")))
28}
29
30fn unit_pointer(heap: &Heap) -> Result<Pointer, EngineError> {
31    heap.alloc_tuple(vec![])
32}
33
34#[cfg(test)]
35fn value_type_name(value: &Value) -> &'static str {
36    value.value_type_name()
37}
38
39fn unit_type() -> Type {
40    Type::tuple(vec![])
41}
42
43fn array_type(elem: Type) -> Type {
44    Type::app(Type::builtin(BuiltinTypeId::Array), elem)
45}
46
47fn list_to_vec(heap: &Heap, pointer: &Pointer) -> Result<Vec<Pointer>, EngineError> {
48    let mut out = Vec::new();
49    let mut cursor = *pointer;
50    loop {
51        let value = heap.get(&cursor)?;
52        match value.as_ref() {
53            Value::Adt(tag, args) if tag.as_ref() == "Empty" && args.is_empty() => return Ok(out),
54            Value::Adt(tag, args) if tag.as_ref() == "Cons" && args.len() == 2 => {
55                out.push(args[0]);
56                cursor = args[1];
57            }
58            _ => {
59                return Err(EngineError::NativeType {
60                    expected: "List".into(),
61                    got: heap.type_name(&cursor)?.into(),
62                });
63            }
64        }
65    }
66}
67
68fn array_u8_to_bytes(heap: &Heap, pointer: &Pointer) -> Result<Vec<u8>, EngineError> {
69    let elems = heap.pointer_as_array(pointer)?;
70    let mut out = Vec::with_capacity(elems.len());
71    for elem in &elems {
72        out.push(heap.pointer_as_u8(elem)?);
73    }
74    Ok(out)
75}
76
77fn bytes_to_array_u8(heap: &Heap, bytes: Vec<u8>) -> Result<Pointer, EngineError> {
78    let out = bytes
79        .into_iter()
80        .map(|b| heap.alloc_u8(b))
81        .collect::<Result<Vec<_>, _>>()?;
82    heap.alloc_array(out)
83}
84
85#[derive(Default)]
86struct SubprocessRegistry {
87    procs: Mutex<HashMap<Uuid, Arc<SubprocessEntry>>>,
88}
89
90struct SubprocessEntry {
91    exit_code: Mutex<Option<i32>>,
92    child: Mutex<Option<std::process::Child>>,
93    stdout: Arc<Mutex<Vec<u8>>>,
94    stderr: Arc<Mutex<Vec<u8>>>,
95    stdout_done: AtomicBool,
96    stderr_done: AtomicBool,
97    stdout_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
98    stderr_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
99}
100
101impl SubprocessEntry {
102    fn new(child: std::process::Child) -> Self {
103        Self {
104            exit_code: Mutex::new(None),
105            child: Mutex::new(Some(child)),
106            stdout: Arc::new(Mutex::new(Vec::new())),
107            stderr: Arc::new(Mutex::new(Vec::new())),
108            stdout_done: AtomicBool::new(false),
109            stderr_done: AtomicBool::new(false),
110            stdout_thread: Mutex::new(None),
111            stderr_thread: Mutex::new(None),
112        }
113    }
114}
115
116static SUBPROCESSES: OnceLock<SubprocessRegistry> = OnceLock::new();
117
118fn subprocess_registry() -> &'static SubprocessRegistry {
119    SUBPROCESSES.get_or_init(SubprocessRegistry::default)
120}
121
122pub fn inject_cli_prelude_engine(engine: &mut Engine) -> Result<(), EngineError> {
123    engine.inject_tracing_log_function(&virtual_export_name("std.io", "debug"), |s| {
124        tracing::debug!("{s}")
125    })?;
126    engine.inject_tracing_log_function(&virtual_export_name("std.io", "info"), |s| {
127        tracing::info!("{s}")
128    })?;
129    engine.inject_tracing_log_function(&virtual_export_name("std.io", "warn"), |s| {
130        tracing::warn!("{s}")
131    })?;
132    engine.inject_tracing_log_function(&virtual_export_name("std.io", "error"), |s| {
133        tracing::error!("{s}")
134    })?;
135
136    inject_cli_io_natives(engine)?;
137    inject_cli_process_natives(engine)?;
138    Ok(())
139}
140
141fn inject_cli_io_natives(engine: &mut Engine) -> Result<(), EngineError> {
142    let i32_ty = Type::builtin(BuiltinTypeId::I32);
143    let u8_ty = Type::builtin(BuiltinTypeId::U8);
144    let array_u8 = array_type(u8_ty);
145    let mut library = Library::new("std.io");
146
147    let read_all_sym = sym("read_all");
148    library.export_native_async(
149        "read_all",
150        Scheme::new(vec![], vec![], Type::fun(i32_ty.clone(), array_u8.clone())),
151        1,
152        move |engine, _, args| {
153            let read_all_sym = read_all_sym.clone();
154            Box::pin(async move {
155                if args.len() != 1 {
156                    return Err(EngineError::NativeArity {
157                        name: read_all_sym,
158                        expected: 1,
159                        got: args.len(),
160                    });
161                }
162                let fd = i32::from_pointer(&engine.heap, &args[0])?;
163
164                if fd != 0 {
165                    return Err(EngineError::Internal(format!(
166                        "read_all only supports fd 0 (stdin), got {fd}"
167                    )));
168                }
169
170                let mut buf = Vec::new();
171                io::stdin()
172                    .read_to_end(&mut buf)
173                    .map_err(|e| EngineError::Internal(format!("read_all failed: {e}")))?;
174                bytes_to_array_u8(&engine.heap, buf)
175            })
176        },
177    )?;
178
179    let write_all_sym = sym("write_all");
180    library.export_native_async(
181        "write_all",
182        Scheme::new(
183            vec![],
184            vec![],
185            Type::fun(i32_ty, Type::fun(array_u8, unit_type())),
186        ),
187        2,
188        move |engine, _, args| {
189            let write_all_sym = write_all_sym.clone();
190            Box::pin(async move {
191                if args.len() != 2 {
192                    return Err(EngineError::NativeArity {
193                        name: write_all_sym,
194                        expected: 2,
195                        got: args.len(),
196                    });
197                }
198                let fd = i32::from_pointer(&engine.heap, &args[0])?;
199                let bytes = array_u8_to_bytes(&engine.heap, &args[1])?;
200
201                match fd {
202                    1 => {
203                        let mut out = io::stdout().lock();
204                        out.write_all(&bytes)
205                            .and_then(|()| out.flush())
206                            .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
207                    }
208                    2 => {
209                        let mut out = io::stderr().lock();
210                        out.write_all(&bytes)
211                            .and_then(|()| out.flush())
212                            .map_err(|e| EngineError::Internal(format!("write_all failed: {e}")))?;
213                    }
214                    _ => {
215                        return Err(EngineError::Internal(format!(
216                            "write_all only supports fd 1 (stdout) and 2 (stderr), got {fd}"
217                        )));
218                    }
219                }
220
221                unit_pointer(&engine.heap)
222            })
223        },
224    )?;
225
226    engine.inject_library(library)
227}
228
229fn inject_cli_process_natives(engine: &mut Engine) -> Result<(), EngineError> {
230    let subprocess_name = virtual_export_name("std.process", "Subprocess");
231    let subprocess_ctor = sym(&subprocess_name);
232    let subprocess = Type::con(&subprocess_name, 0);
233    let string = Type::builtin(BuiltinTypeId::String);
234    let i32_ty = Type::builtin(BuiltinTypeId::I32);
235    let list_string = Type::app(Type::builtin(BuiltinTypeId::List), string.clone());
236    let mut library = Library::new("std.process");
237    let opts = Type::record(vec![
238        (sym("cmd"), string.clone()),
239        (sym("args"), list_string),
240    ]);
241
242    let spawn_sym = sym("spawn");
243    let subprocess_ctor_for_spawn = subprocess_ctor.clone();
244    library.export_native_async(
245        "spawn",
246        Scheme::new(vec![], vec![], Type::fun(opts, subprocess.clone())),
247        1,
248        move |engine, _, args| {
249            let spawn_sym = spawn_sym.clone();
250            let subprocess_ctor = subprocess_ctor_for_spawn.clone();
251            Box::pin(async move {
252                if args.len() != 1 {
253                    return Err(EngineError::NativeArity {
254                        name: spawn_sym.clone(),
255                        expected: 1,
256                        got: args.len(),
257                    });
258                }
259                let map = engine.heap.pointer_as_dict(&args[0])?;
260
261                let cmd_pointer = map
262                    .get(&sym("cmd"))
263                    .cloned()
264                    .ok_or_else(|| EngineError::Internal("spawn missing `cmd`".into()))?;
265                let cmd = String::from_pointer(&engine.heap, &cmd_pointer)?;
266
267                let args_pointer = map
268                    .get(&sym("args"))
269                    .cloned()
270                    .ok_or_else(|| EngineError::Internal("spawn missing `args`".into()))?;
271                let args_list = list_to_vec(&engine.heap, &args_pointer)?;
272                let mut args_vec = Vec::with_capacity(args_list.len());
273                for arg in args_list {
274                    args_vec.push(String::from_pointer(&engine.heap, &arg)?);
275                }
276
277                let mut child = Command::new(cmd)
278                    .args(args_vec)
279                    .stdin(Stdio::null())
280                    .stdout(Stdio::piped())
281                    .stderr(Stdio::piped())
282                    .spawn()
283                    .map_err(|e| EngineError::Internal(format!("spawn failed: {e}")))?;
284
285                let mut stdout = child.stdout.take().ok_or_else(|| {
286                    EngineError::Internal("spawn failed to capture stdout".into())
287                })?;
288                let mut stderr = child.stderr.take().ok_or_else(|| {
289                    EngineError::Internal("spawn failed to capture stderr".into())
290                })?;
291
292                let id = Uuid::new_v4();
293                let entry = Arc::new(SubprocessEntry::new(child));
294
295                {
296                    let stdout_buf = entry.stdout.clone();
297                    let entry_for_done = entry.clone();
298                    let handle = thread::spawn(move || {
299                        let mut tmp = [0u8; 8192];
300                        loop {
301                            let n = stdout.read(&mut tmp)?;
302                            if n == 0 {
303                                break;
304                            }
305                            if let Ok(mut buf) = stdout_buf.lock() {
306                                buf.extend_from_slice(&tmp[..n]);
307                            }
308                        }
309                        entry_for_done.stdout_done.store(true, Ordering::Release);
310                        Ok(())
311                    });
312                    *lock_mutex(&entry.stdout_thread, "std.process.spawn stdout_thread")? =
313                        Some(handle);
314                }
315
316                {
317                    let stderr_buf = entry.stderr.clone();
318                    let entry_for_done = entry.clone();
319                    let handle = thread::spawn(move || {
320                        let mut tmp = [0u8; 8192];
321                        loop {
322                            let n = stderr.read(&mut tmp)?;
323                            if n == 0 {
324                                break;
325                            }
326                            if let Ok(mut buf) = stderr_buf.lock() {
327                                buf.extend_from_slice(&tmp[..n]);
328                            }
329                        }
330                        entry_for_done.stderr_done.store(true, Ordering::Release);
331                        Ok(())
332                    });
333                    *lock_mutex(&entry.stderr_thread, "std.process.spawn stderr_thread")? =
334                        Some(handle);
335                }
336
337                subprocess_registry()
338                    .procs
339                    .lock()
340                    .map_err(|_| {
341                        EngineError::Internal(
342                            "std.process.spawn: subprocess registry mutex poisoned (this is a bug)"
343                                .into(),
344                        )
345                    })?
346                    .insert(id, entry);
347
348                let mut payload = BTreeMap::new();
349                payload.insert(sym("id"), engine.heap.alloc_uuid(id)?);
350                let payload = engine.heap.alloc_dict(payload)?;
351                engine.heap.alloc_adt(subprocess_ctor, vec![payload])
352            })
353        },
354    )?;
355
356    let wait_sym = sym("wait");
357    let subprocess_ctor_for_wait = subprocess_ctor.clone();
358    library.export_native_async(
359        "wait",
360        Scheme::new(vec![], vec![], Type::fun(subprocess.clone(), i32_ty)),
361        1,
362        move |engine, _, args| {
363            let wait_sym = wait_sym.clone();
364            let subprocess_ctor = subprocess_ctor_for_wait.clone();
365            Box::pin(async move {
366                if args.len() != 1 {
367                    return Err(EngineError::NativeArity {
368                        name: wait_sym.clone(),
369                        expected: 1,
370                        got: args.len(),
371                    });
372                }
373                let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
374                let entry = subprocess_get(&id, wait_sym.as_ref())?;
375
376                if let Some(code) = *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? {
377                    return engine.heap.alloc_i32(code);
378                }
379
380                let status = {
381                    let mut child_guard = lock_mutex(&entry.child, "std.process.wait child")?;
382                    let Some(child) = child_guard.as_mut() else {
383                        return Err(EngineError::Internal("subprocess already reaped".into()));
384                    };
385                    child
386                        .wait()
387                        .map_err(|e| EngineError::Internal(format!("wait failed: {e}")))?
388                };
389
390                let code = status.code().unwrap_or(-1);
391                *lock_mutex(&entry.exit_code, "std.process.wait exit_code")? = Some(code);
392
393                // Ensure pipes are drained.
394                if let Some(handle) = entry
395                    .stdout_thread
396                    .lock()
397                    .map_err(|_| {
398                        EngineError::Internal(
399                            "std.process.wait: stdout_thread mutex poisoned (this is a bug)".into(),
400                        )
401                    })?
402                    .take()
403                {
404                    let _ = handle.join();
405                }
406                if let Some(handle) = entry
407                    .stderr_thread
408                    .lock()
409                    .map_err(|_| {
410                        EngineError::Internal(
411                            "std.process.wait: stderr_thread mutex poisoned (this is a bug)".into(),
412                        )
413                    })?
414                    .take()
415                {
416                    let _ = handle.join();
417                }
418
419                engine.heap.alloc_i32(code)
420            })
421        },
422    )?;
423
424    let stdout_sym = sym("stdout");
425    let subprocess_ctor_for_stdout = subprocess_ctor.clone();
426    library.export_native_async(
427        "stdout",
428        Scheme::new(
429            vec![],
430            vec![],
431            Type::fun(
432                subprocess.clone(),
433                array_type(Type::builtin(BuiltinTypeId::U8)),
434            ),
435        ),
436        1,
437        move |engine, _, args| {
438            let stdout_sym = stdout_sym.clone();
439            let subprocess_ctor = subprocess_ctor_for_stdout.clone();
440            Box::pin(async move {
441                if args.len() != 1 {
442                    return Err(EngineError::NativeArity {
443                        name: stdout_sym.clone(),
444                        expected: 1,
445                        got: args.len(),
446                    });
447                }
448                let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
449                let entry = subprocess_get(&id, stdout_sym.as_ref())?;
450                let bytes = lock_arc_mutex(&entry.stdout, "std.process.stdout buffer")?.clone();
451                bytes_to_array_u8(&engine.heap, bytes)
452            })
453        },
454    )?;
455
456    let stderr_sym = sym("stderr");
457    let subprocess_ctor_for_stderr = subprocess_ctor.clone();
458    library.export_native_async(
459        "stderr",
460        Scheme::new(
461            vec![],
462            vec![],
463            Type::fun(subprocess, array_type(Type::builtin(BuiltinTypeId::U8))),
464        ),
465        1,
466        move |engine, _, args| {
467            let stderr_sym = stderr_sym.clone();
468            let subprocess_ctor = subprocess_ctor_for_stderr.clone();
469            Box::pin(async move {
470                if args.len() != 1 {
471                    return Err(EngineError::NativeArity {
472                        name: stderr_sym.clone(),
473                        expected: 1,
474                        got: args.len(),
475                    });
476                }
477                let id = subprocess_id(&engine.heap, &args[0], &subprocess_ctor)?;
478                let entry = subprocess_get(&id, stderr_sym.as_ref())?;
479                let bytes = lock_arc_mutex(&entry.stderr, "std.process.stderr buffer")?.clone();
480                bytes_to_array_u8(&engine.heap, bytes)
481            })
482        },
483    )?;
484
485    engine.inject_library(library)
486}
487
488fn subprocess_id(heap: &Heap, pointer: &Pointer, tag: &Symbol) -> Result<Uuid, EngineError> {
489    let (got_tag, args) = heap.pointer_as_adt(pointer)?;
490    if &got_tag != tag || args.len() != 1 {
491        return Err(EngineError::NativeType {
492            expected: "Subprocess".into(),
493            got: heap.type_name(pointer)?.into(),
494        });
495    }
496    let map = heap.pointer_as_dict(&args[0])?;
497    let id_pointer = map
498        .get(&sym("id"))
499        .cloned()
500        .ok_or_else(|| EngineError::Internal("Subprocess missing id".into()))?;
501    heap.pointer_as_uuid(&id_pointer)
502}
503
504fn subprocess_get(id: &Uuid, name: &str) -> Result<Arc<SubprocessEntry>, EngineError> {
505    subprocess_registry()
506        .procs
507        .lock()
508        .map_err(|_| {
509            EngineError::Internal(format!(
510                "{name}: subprocess registry mutex poisoned (this is a bug)"
511            ))
512        })?
513        .get(id)
514        .cloned()
515        .ok_or_else(|| EngineError::Internal(format!("{name}: unknown subprocess id {id}")))
516}
517
518#[cfg(test)]
519mod tests {
520    use rexlang::{Engine, GasMeter, assert_pointer_eq};
521
522    use super::*;
523
524    fn unlimited_gas() -> GasMeter {
525        GasMeter::default()
526    }
527
528    #[tokio::test]
529    async fn cli_prelude_typecheck_smoke() {
530        let code = r#"
531            import std.process
532            import std.io
533
534            let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
535              io.write_all 1 (process.stdout p)
536        "#;
537
538        let mut engine = Engine::with_prelude(()).unwrap();
539        engine.add_default_resolvers();
540        inject_cli_prelude_engine(&mut engine).unwrap();
541        let mut gas = unlimited_gas();
542        rexlang::Evaluator::new_with_compiler(
543            rexlang::RuntimeEnv::new(engine.clone()),
544            rexlang::Compiler::new(engine.clone()),
545        )
546        .eval_snippet(code, &mut gas)
547        .await
548        .unwrap();
549    }
550
551    #[tokio::test]
552    async fn cli_subprocess_captures_stdout_and_exit_code() {
553        let code = r#"
554            import std.process
555
556            let p = process.spawn { cmd = "sh", args = ["-c", "printf hi"] } in
557              (process.wait p, process.stdout p, process.stderr p)
558        "#;
559
560        let mut engine = Engine::with_prelude(()).unwrap();
561        engine.add_default_resolvers();
562        inject_cli_prelude_engine(&mut engine).unwrap();
563        let mut gas = unlimited_gas();
564        let (value, ty) = rexlang::Evaluator::new_with_compiler(
565            rexlang::RuntimeEnv::new(engine.clone()),
566            rexlang::Compiler::new(engine.clone()),
567        )
568        .eval_snippet(code, &mut gas)
569        .await
570        .unwrap();
571        assert_eq!(
572            ty,
573            Type::tuple(vec![
574                Type::builtin(BuiltinTypeId::I32),
575                array_type(Type::builtin(BuiltinTypeId::U8)),
576                array_type(Type::builtin(BuiltinTypeId::U8)),
577            ])
578        );
579        let value = engine
580            .heap
581            .get(&value)
582            .map(|value| value.as_ref().clone())
583            .unwrap();
584        let Value::Tuple(xs) = value else {
585            panic!("expected tuple");
586        };
587        assert_pointer_eq!(
588            &engine.heap,
589            xs[0].clone(),
590            engine.heap.alloc_i32(0).unwrap()
591        );
592
593        let Value::Array(out) = engine
594            .heap
595            .get(&xs[1])
596            .map(|value| value.as_ref().clone())
597            .unwrap()
598        else {
599            panic!("expected stdout bytes");
600        };
601        let got: Vec<u8> = out
602            .iter()
603            .map(|v| {
604                match engine
605                    .heap
606                    .get(v)
607                    .map(|value| value.as_ref().clone())
608                    .unwrap()
609                {
610                    Value::U8(b) => b,
611                    other => panic!("expected u8, got {}", value_type_name(&other)),
612                }
613            })
614            .collect();
615        assert_eq!(got, b"hi");
616
617        let Value::Array(err) = engine
618            .heap
619            .get(&xs[2])
620            .map(|value| value.as_ref().clone())
621            .unwrap()
622        else {
623            panic!("expected stderr bytes");
624        };
625        assert!(err.is_empty());
626    }
627}