fork_map/
lib.rs

1use anyhow::anyhow;
2use serde::{Deserialize, Serialize};
3
4/// Forks, and runs function F in a child process.
5/// Waits for the child to terminate and returns the result of F.
6///
7/// # Example
8///
9/// ```
10/// use fork_map::fork_map;
11///
12/// // Result type needs to implement serde::Serialize and serde::Deserialize
13/// pub fn do_with_fork(value: u64) -> u64 {
14///     // Spawn a child process with a copy-on-write copy of memory
15///     unsafe {
16///         fork_map(|| {
17///             // Do some obnoxious operation with `value`
18///             // * Maybe it leaks memory
19///             // * Maybe it uses static resources unsafely and
20///             //   prevents multi-threaded operation
21///             // * Maybe you couldn't figure out how to
22///             //   send your data to a thread
23///             Ok(value * 10)
24///         }).unwrap()
25///         // Execution continues after the child process has exited
26///     }
27/// }
28/// ```
29///
30/// Often used in conjunction with `rayon` since `fork_map` will block until the child terminates,
31/// so you can construct a worker pool where each job is executed in a child process:
32///
33/// # Example
34/// ```
35/// use fork_map::fork_map;
36/// use rayon::prelude::*;
37///
38/// pub fn main() {
39///     let my_big_list = [ /* ... */ ];
40///
41///     // Create a worker pool with rayon's into_par_iter
42///     let results = my_big_list.into_par_iter().map(|item| {
43///         // Have each worker spawn a child process for the
44///         // operations we don't want polluting the parent's memory
45///         unsafe {
46///             fork_map(|| {
47///                 // Do your ugly operations here
48///                 Ok(item * 1234)
49///             }).expect("fork_map succeeded")
50///         }
51///     }).collect::<Vec<_>>();
52///
53///     // Use results here
54/// }
55/// ```
56///
57/// # Safety
58///
59/// Due to the nature of `fork()`, this function is very unsound and likely violates most of Rust's
60/// guarantees about lifetimes, considering all of your memory gets duplicated into a second
61/// process, even though it calls `exit(0)` after your closure is executed. Any threads other than
62/// the one calling `fork_map` will not be present in the new process, so threaded lifetime
63/// guarantees are also violated. Don't even think about using async executors with this.
64pub unsafe fn fork_map<F, R>(func: F) -> anyhow::Result<R>
65    where
66        F: Fn() -> anyhow::Result<R>,
67        R: Serialize + for<'a> Deserialize<'a>,
68{
69    // Pipe for sending the result from child to parent
70    let mut pipe: [libc::c_int; 2] = [0; 2];
71    libc::pipe(pipe.as_mut_ptr());
72
73    // Here we go
74    let pid = libc::fork();
75    if pid == 0 {
76        // Child
77        libc::close(pipe[0]);
78        let result = func().map_err(|e| serde_error::Error::new(&*e));
79        let ser = serde_json::to_string(&result).unwrap_or("".to_string());
80        libc::write(pipe[1], ser.as_ptr() as *const libc::c_void, ser.len());
81        libc::close(pipe[1]);
82        libc::exit(0);
83    }
84
85    // Parent
86    libc::close(pipe[1]);
87
88    // Read result from pipe
89    let mut des = vec![];
90    let des = loop {
91        const BUF_SIZE: usize = 0x1000;
92        let mut buf: [u8; BUF_SIZE] = [0; BUF_SIZE];
93        let count = libc::read(pipe[0], buf.as_mut_ptr() as *mut libc::c_void, BUF_SIZE);
94        if count < 0 {
95            break Err(anyhow!("io error: {}", std::io::Error::last_os_error()));
96        }
97        des.extend_from_slice(&buf[0..(count as usize)]);
98        // EOF signalled by less than the max bytes
99        if (count as usize) < BUF_SIZE {
100            break Ok(des);
101        }
102    };
103
104    let mut status = 0;
105    libc::waitpid(pid, &mut status, 0);
106
107    if status != 0 {
108        return Err(anyhow!("Process returned non-zero status code {}", status));
109    }
110
111    des.and_then(|des| {
112        serde_json::from_slice::<Result<R, serde_error::Error>>(des.as_slice())
113            .map_err(|e| anyhow!("{}", e))
114            .and_then(|se| match se {
115                Ok(i) => Ok(i),
116                Err(e) => Err(anyhow::Error::from(e)),
117            })
118    })
119}