rustler/
thread.rs

1use crate::env::OwnedEnv;
2use crate::sys::enif_thread_type;
3use crate::{Atom, Encoder, Env, Term};
4use std::panic;
5use std::thread;
6
7/// A `JobSpawner` is a value that can run Rust code on non-Erlang system threads.
8/// Abstracts away details of thread management for `spawn()`.
9///
10/// Note: Implementations of `spawn()` must call the closure on a thread that is **not** managed by
11/// the Erlang VM's scheduler. Otherwise, `rustler::thread::spawn()` would try to send a message
12/// from an `OwnedEnv` on an Erlang thread, which would panic.
13pub trait JobSpawner {
14    /// Run the given closure on another thread.
15    fn spawn<F: FnOnce() + Send + panic::UnwindSafe + 'static>(job: F);
16}
17
18/// A `JobSpawner` that uses a separate system thread for each job.
19pub struct ThreadSpawner;
20
21impl JobSpawner for ThreadSpawner {
22    /// This delegates to `std::thread::spawn()`.
23    fn spawn<F: FnOnce() + Send + panic::UnwindSafe + 'static>(job: F) {
24        thread::spawn(job);
25    }
26}
27
28/// Implements threaded NIFs.
29///
30/// This spawns a thread that calls the given closure `thread_fn`. When the closure returns, the
31/// thread sends its return value back to the calling process.  If the closure panics, an `{error,
32/// Reason}` tuple is sent instead.
33///
34/// Note that the thread creates a new `Env` and passes it to the closure, so the closure
35/// runs under a separate environment, not under `env`.
36///
37pub fn spawn<'a, S, F>(env: Env<'a>, thread_fn: F)
38where
39    F: for<'b> FnOnce(Env<'b>) -> Term<'b> + Send + panic::UnwindSafe + 'static,
40    S: JobSpawner,
41{
42    let pid = env.pid();
43    S::spawn(move || {
44        let _ = OwnedEnv::new().send_and_clear(&pid, |env| {
45            match panic::catch_unwind(|| thread_fn(env)) {
46                Ok(term) => term,
47                Err(err) => {
48                    // Try to get an error message from Rust.
49                    let reason = if let Some(string) = err.downcast_ref::<String>() {
50                        string.encode(env)
51                    } else if let Some(&s) = err.downcast_ref::<&'static str>() {
52                        s.encode(env)
53                    } else {
54                        Atom::from_bytes(env, b"nif_panic")
55                            .ok()
56                            .unwrap()
57                            .to_term(env)
58                    };
59                    env.error_tuple(reason)
60                }
61            }
62        });
63    });
64}
65
66/// Check if the currently running thread is managed by the ERTS.
67///
68/// This is relevant for (e.g.) `enif_send` or `enif_monitor_process` as
69pub fn is_scheduler_thread() -> bool {
70    unsafe { enif_thread_type() > 0 }
71}