fork_map/lib.rs
1use anyhow::anyhow;
2use serde::{Deserialize, Serialize};
3
4/// Forks, and runs function F in a child process.
5/// Waits for the child to terminate and returns the result of F.
6///
7/// # Example
8///
9/// ```
10/// use fork_map::fork_map;
11///
12/// // Result type needs to implement serde::Serialize and serde::Deserialize
13/// pub fn do_with_fork(value: u64) -> u64 {
14/// // Spawn a child process with a copy-on-write copy of memory
15/// unsafe {
16/// fork_map(|| {
17/// // Do some obnoxious operation with `value`
18/// // * Maybe it leaks memory
19/// // * Maybe it uses static resources unsafely and
20/// // prevents multi-threaded operation
21/// // * Maybe you couldn't figure out how to
22/// // send your data to a thread
23/// Ok(value * 10)
24/// }).unwrap()
25/// // Execution continues after the child process has exited
26/// }
27/// }
28/// ```
29///
30/// Often used in conjunction with `rayon` since `fork_map` will block until the child terminates,
31/// so you can construct a worker pool where each job is executed in a child process:
32///
33/// # Example
34/// ```
35/// use fork_map::fork_map;
36/// use rayon::prelude::*;
37///
38/// pub fn main() {
39/// let my_big_list = [ /* ... */ ];
40///
41/// // Create a worker pool with rayon's into_par_iter
42/// let results = my_big_list.into_par_iter().map(|item| {
43/// // Have each worker spawn a child process for the
44/// // operations we don't want polluting the parent's memory
45/// unsafe {
46/// fork_map(|| {
47/// // Do your ugly operations here
48/// Ok(item * 1234)
49/// }).expect("fork_map succeeded")
50/// }
51/// }).collect::<Vec<_>>();
52///
53/// // Use results here
54/// }
55/// ```
56///
57/// # Safety
58///
59/// Due to the nature of `fork()`, this function is very unsound and likely violates most of Rust's
60/// guarantees about lifetimes, considering all of your memory gets duplicated into a second
61/// process, even though it calls `exit(0)` after your closure is executed. Any threads other than
62/// the one calling `fork_map` will not be present in the new process, so threaded lifetime
63/// guarantees are also violated. Don't even think about using async executors with this.
64pub unsafe fn fork_map<F, R>(func: F) -> anyhow::Result<R>
65 where
66 F: Fn() -> anyhow::Result<R>,
67 R: Serialize + for<'a> Deserialize<'a>,
68{
69 // Pipe for sending the result from child to parent
70 let mut pipe: [libc::c_int; 2] = [0; 2];
71 libc::pipe(pipe.as_mut_ptr());
72
73 // Here we go
74 let pid = libc::fork();
75 if pid == 0 {
76 // Child
77 libc::close(pipe[0]);
78 let result = func().map_err(|e| serde_error::Error::new(&*e));
79 let ser = serde_json::to_string(&result).unwrap_or("".to_string());
80 libc::write(pipe[1], ser.as_ptr() as *const libc::c_void, ser.len());
81 libc::close(pipe[1]);
82 libc::exit(0);
83 }
84
85 // Parent
86 libc::close(pipe[1]);
87
88 // Read result from pipe
89 let mut des = vec![];
90 let des = loop {
91 const BUF_SIZE: usize = 0x1000;
92 let mut buf: [u8; BUF_SIZE] = [0; BUF_SIZE];
93 let count = libc::read(pipe[0], buf.as_mut_ptr() as *mut libc::c_void, BUF_SIZE);
94 if count < 0 {
95 break Err(anyhow!("io error: {}", std::io::Error::last_os_error()));
96 }
97 des.extend_from_slice(&buf[0..(count as usize)]);
98 // EOF signalled by less than the max bytes
99 if (count as usize) < BUF_SIZE {
100 break Ok(des);
101 }
102 };
103
104 let mut status = 0;
105 libc::waitpid(pid, &mut status, 0);
106
107 if status != 0 {
108 return Err(anyhow!("Process returned non-zero status code {}", status));
109 }
110
111 des.and_then(|des| {
112 serde_json::from_slice::<Result<R, serde_error::Error>>(des.as_slice())
113 .map_err(|e| anyhow!("{}", e))
114 .and_then(|se| match se {
115 Ok(i) => Ok(i),
116 Err(e) => Err(anyhow::Error::from(e)),
117 })
118 })
119}