1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use ::{ NifEnv, NifTerm, NifEncoder };
use ::env::OwnedEnv;
use ::types::atom::NifAtom;
use std::thread;
use std::panic;

/// A `JobSpawner` is a value that can run Rust code on non-Erlang system threads.
/// Abstracts away details of thread management for `spawn()`.
///
/// Note: Implementations of `spawn()` must call the closure on a thread that is **not** managed by
/// the Erlang VM's scheduler. Otherwise, `rustler::thread::spawn()` would try to send a message
/// from an `OwnedEnv` on an Erlang thread, which would panic.
pub trait JobSpawner {
    /// Run the given closure on another thread.
    fn spawn<F: FnOnce() + Send + panic::UnwindSafe + 'static>(job: F);
}

/// A `JobSpawner` that uses a separate system thread for each job.
pub struct ThreadSpawner;

impl JobSpawner for ThreadSpawner {
    /// This delegates to `std::thread::spawn()`.
    fn spawn<F: FnOnce() + Send + panic::UnwindSafe + 'static>(job: F) {
        thread::spawn(job);
    }
}

/// Implements threaded NIFs.
///
/// This spawns a thread that calls the given closure `thread_fn`. When the closure returns, the
/// thread sends its return value back to the calling process.  If the closure panics, an `{error,
/// Reason}` tuple is sent instead.
///
/// Note that the thread creates a new `NifEnv` and passes it to the closure, so the closure
/// runs under a separate environment, not under `env`.
///
pub fn spawn<'a, S, F>(env: NifEnv<'a>, thread_fn: F)
    where F: for<'b> FnOnce(NifEnv<'b>) -> NifTerm<'b> + Send + panic::UnwindSafe + 'static,
          S: JobSpawner,
{
    let pid = env.pid();
    S::spawn(move || {
        OwnedEnv::new().send_and_clear(&pid, |env| {
            match panic::catch_unwind(|| thread_fn(env)) {
                Ok(term) => term,
                Err(err) => {
                    // Try to get an error message from Rust.
                    let reason =
                        if let Some(string) = err.downcast_ref::<String>() {
                            string.encode(env)
                        } else if let Some(&s) = err.downcast_ref::<&'static str>() {
                            s.encode(env)
                        } else {
                            NifAtom::from_bytes(env, b"nif_panic").ok().unwrap().to_term(env)
                        };
                    env.error_tuple(reason)
                }
            }
        });
    });
}