1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use anyhow::anyhow;
use serde::{Deserialize, Serialize};

/// Forks, and runs function F in a child process.
/// Waits for the child to terminate and returns the result of F.
///
/// # Example
///
/// ```
/// use fork_map::fork_map;
///
/// pub fn do_with_fork(value: u64) -> u64 {
///     // Spawn a child process with a copy-on-write copy of memory
///     fork_map(|| {
///         // Do some obnoxious operation with `value`
///         // * Maybe it leaks memory
///         // * Maybe it uses static resources unsafely and
///         //   prevents multi-threaded operation
///         // * Maybe you couldn't figure out how to
///         //   send your data to a thread
///         Ok(value * 10)
///     }).unwrap()
///     // Execution continues after the child process has exited
/// }
/// ```
///
/// Often used in conjunction with `rayon` since `fork_map` will block until the child terminates,
/// so you can construct a worker pool where each job is executed in a child process:
///
/// # Example
/// ```
/// use fork_map::fork_map;
/// use rayon::prelude::*;
///
/// pub fn main() {
///     let my_big_list = [ /* ... */ ];
///
///     // Create a worker pool with rayon's into_par_iter
///     let results = my_big_list.into_par_iter().map(|item| {
///         // Have each worker spawn a child process for the
///         // operations we don't want polluting the parent's memory
///         fork_map(|| {
///             // Do your ugly operations here
///             Ok(item * 1234)
///         }).expect("fork_map succeeded")
///     }).collect::<Vec<_>>();
///
///     // Use results here
/// }
/// ```
pub fn fork_map<F, R>(func: F) -> anyhow::Result<R>
    where
        F: Fn() -> anyhow::Result<R>,
        R: Serialize + for<'a> Deserialize<'a>,
{
    // SAFETY: Probably not LOL, didn't crash on my box, use at your own risk, etc.

    // Pipe for sending the result from child to parent
    let mut pipe: [libc::c_int; 2] = [0; 2];
    unsafe {
        libc::pipe(pipe.as_mut_ptr());
    }

    // Here we go
    let pid = unsafe { libc::fork() };
    if pid == 0 {
        // Child
        unsafe { libc::close(pipe[0]) };
        let result = func().map_err(|e| serde_error::Error::new(&*e));
        let ser = serde_json::to_string(&result).unwrap_or("".to_string());
        unsafe { libc::write(pipe[1], ser.as_ptr() as *const libc::c_void, ser.len()) };
        unsafe { libc::close(pipe[1]) };
        unsafe { libc::exit(0) };
    }

    // Parent
    unsafe { libc::close(pipe[1]) };

    // Read result from pipe
    let mut des = vec![];
    let des = loop {
        const BUF_SIZE: usize = 0x1000;
        let mut buf: [u8; BUF_SIZE] = [0; BUF_SIZE];
        let count = unsafe { libc::read(pipe[0], buf.as_mut_ptr() as *mut libc::c_void, BUF_SIZE) };
        if count < 0 {
            break Err(anyhow!("io error: {}", unsafe { *libc::__error() }));
        }
        des.extend_from_slice(&buf[0..(count as usize)]);
        // EOF signalled by less than the max bytes
        if (count as usize) < BUF_SIZE {
            break Ok(des);
        }
    };

    let mut status = 0;
    unsafe { libc::waitpid(pid, &mut status, 0) };

    if status != 0 {
        return Err(anyhow!("Process returned non-zero status code {}", status));
    }

    des.and_then(|des| {
        serde_json::from_slice::<Result<R, serde_error::Error>>(des.as_slice())
            .map_err(|e| anyhow!("{}", e))
            .and_then(|se| match se {
                Ok(i) => Ok(i),
                Err(e) => Err(anyhow::Error::from(e)),
            })
    })
}