1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
use crate::env::OwnedEnv;
use crate::types::atom::Atom;
use crate::{Encoder, Env, Term};
use std::panic;
use std::thread;
/// A `JobSpawner` is a value that can run Rust code on non-Erlang system threads.
/// Abstracts away details of thread management for `spawn()`.
///
/// Note: Implementations of `spawn()` must call the closure on a thread that is **not** managed by
/// the Erlang VM's scheduler. Otherwise, `rustler::thread::spawn()` would try to send a message
/// from an `OwnedEnv` on an Erlang thread, which would panic.
pub trait JobSpawner {
/// Run the given closure on another thread.
fn spawn<F: FnOnce() + Send + panic::UnwindSafe + 'static>(job: F);
}
/// A `JobSpawner` that uses a separate system thread for each job.
pub struct ThreadSpawner;
impl JobSpawner for ThreadSpawner {
/// This delegates to `std::thread::spawn()`.
fn spawn<F: FnOnce() + Send + panic::UnwindSafe + 'static>(job: F) {
thread::spawn(job);
}
}
/// Implements threaded NIFs.
///
/// This spawns a thread that calls the given closure `thread_fn`. When the closure returns, the
/// thread sends its return value back to the calling process. If the closure panics, an `{error,
/// Reason}` tuple is sent instead.
///
/// Note that the thread creates a new `Env` and passes it to the closure, so the closure
/// runs under a separate environment, not under `env`.
///
pub fn spawn<'a, S, F>(env: Env<'a>, thread_fn: F)
where
F: for<'b> FnOnce(Env<'b>) -> Term<'b> + Send + panic::UnwindSafe + 'static,
S: JobSpawner,
{
let pid = env.pid();
S::spawn(move || {
OwnedEnv::new().send_and_clear(&pid, |env| {
match panic::catch_unwind(|| thread_fn(env)) {
Ok(term) => term,
Err(err) => {
// Try to get an error message from Rust.
let reason = if let Some(string) = err.downcast_ref::<String>() {
string.encode(env)
} else if let Some(&s) = err.downcast_ref::<&'static str>() {
s.encode(env)
} else {
Atom::from_bytes(env, b"nif_panic")
.ok()
.unwrap()
.to_term(env)
};
env.error_tuple(reason)
}
}
});
});
}