build_parallel/lib.rs
1use crossbeam_utils::thread;
2use std::any::Any;
3use std::env;
4use std::io;
5
6/// Represents the types of errors that may occur while using build-parallel.
7#[derive(Debug)]
8pub enum Error<E> {
9 /// Error occurred while internally performing I/O.
10 IOError(io::Error),
11 /// Error occurred during build callback.
12 BuildError(E),
13 /// Panic occurred during build callback.
14 BuildPanic(Box<dyn Any + Send + 'static>),
15}
16
17fn compile_object<T, R, E, F>(f: F, obj: &T) -> Result<R, Error<E>>
18where
19 T: 'static + Sync,
20 R: 'static + Sync + Send,
21 E: 'static + Sync + Send,
22 F: Fn(&T) -> Result<R, E> + Sync + Send,
23{
24 f(obj).map_err(Error::BuildError)
25}
26
27pub fn compile_objects<T, R, E, F>(f: &F, objs: &[T]) -> Result<Vec<R>, Error<E>>
28where
29 T: 'static + Sync,
30 R: 'static + Sync + Send,
31 E: 'static + Sync + Send,
32 F: Fn(&T) -> Result<R, E> + Sync + Send,
33{
34 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
35 use std::sync::Once;
36
37 // Limit our parallelism globally with a jobserver. Start off by
38 // releasing our own token for this process so we can have a bit of an
39 // easier to write loop below. If this fails, though, then we're likely
40 // on Windows with the main implicit token, so we just have a bit extra
41 // parallelism for a bit and don't reacquire later.
42 let server = jobserver();
43 let reacquire = server.release_raw().is_ok();
44
45 let res = thread::scope(|s| {
46 // When compiling objects in parallel we do a few dirty tricks to speed
47 // things up:
48 //
49 // * First is that we use the `jobserver` crate to limit the parallelism
50 // of this build script. The `jobserver` crate will use a jobserver
51 // configured by Cargo for build scripts to ensure that parallelism is
52 // coordinated across C compilations and Rust compilations. Before we
53 // compile anything we make sure to wait until we acquire a token.
54 //
55 // Note that this jobserver is cached globally so we only used one per
56 // process and only worry about creating it once.
57 //
58 // * Next we use a raw `thread::spawn` per thread to actually compile
59 // objects in parallel. We only actually spawn a thread after we've
60 // acquired a token to perform some work
61 //
62 // * Finally though we want to keep the dependencies of this crate
63 // pretty light, so we avoid using a safe abstraction like `rayon` and
64 // instead rely on some bits of `unsafe` code. We know that this stack
65 // frame persists while everything is compiling so we use all the
66 // stack-allocated objects without cloning/reallocating. We use a
67 // transmute to `State` with a `'static` lifetime to persist
68 // everything we need across the boundary, and the join-on-drop
69 // semantics of `JoinOnDrop` should ensure that our stack frame is
70 // alive while threads are alive.
71 //
72 // With all that in mind we compile all objects in a loop here, after we
73 // acquire the appropriate tokens, Once all objects have been compiled
74 // we join on all the threads and propagate the results of compilation.
75 //
76 // Note that as a slight optimization we try to break out as soon as
77 // possible as soon as any compilation fails to ensure that errors get
78 // out to the user as fast as possible.
79 let error = AtomicBool::new(false);
80 let mut handles = Vec::new();
81 for obj in objs {
82 if error.load(SeqCst) {
83 break;
84 }
85 let token = server.acquire().map_err(Error::IOError)?;
86 let state = State { obj, error: &error };
87 let state = unsafe { std::mem::transmute::<State<T>, State<'static, T>>(state) };
88 handles.push(s.spawn(|_| {
89 let state: State<T> = state; // erase the `'static` lifetime
90 let result = compile_object(f, state.obj);
91 if result.is_err() {
92 state.error.store(true, SeqCst);
93 }
94 drop(token); // make sure our jobserver token is released after the compile
95 result
96 }));
97 }
98
99 let mut output = Vec::new();
100 for handle in handles {
101 match handle.join().map_err(Error::BuildPanic)? {
102 Ok(r) => output.push(r),
103 Err(err) => return Err(err),
104 }
105 }
106
107 Ok(output)
108 })
109 .map_err(Error::BuildPanic)?;
110
111 // Reacquire our process's token before we proceed, which we released
112 // before entering the loop above.
113 if reacquire {
114 server.acquire_raw().map_err(Error::IOError)?;
115 }
116
117 return res;
118
119 /// Shared state from the parent thread to the child thread. This
120 /// package of pointers is temporarily transmuted to a `'static`
121 /// lifetime to cross the thread boundary and then once the thread is
122 /// running we erase the `'static` to go back to an anonymous lifetime.
123 struct State<'a, O> {
124 obj: &'a O,
125 error: &'a AtomicBool,
126 }
127
128 /// Returns a suitable `jobserver::Client` used to coordinate
129 /// parallelism between build scripts.
130 fn jobserver() -> &'static jobserver::Client {
131 static INIT: Once = Once::new();
132 static mut JOBSERVER: Option<jobserver::Client> = None;
133
134 fn _assert_sync<T: Sync>() {}
135 _assert_sync::<jobserver::Client>();
136
137 unsafe {
138 INIT.call_once(|| {
139 let server = default_jobserver();
140 JOBSERVER = Some(server);
141 });
142 JOBSERVER.as_ref().unwrap()
143 }
144 }
145
146 unsafe fn default_jobserver() -> jobserver::Client {
147 // Try to use the environmental jobserver which Cargo typically
148 // initializes for us...
149 if let Some(client) = jobserver::Client::from_env() {
150 return client;
151 }
152
153 // ... but if that fails for whatever reason fall back to the number
154 // of cpus on the system or the `NUM_JOBS` env var.
155 let mut parallelism = num_cpus::get();
156 if let Ok(amt) = env::var("NUM_JOBS") {
157 if let Ok(amt) = amt.parse() {
158 parallelism = amt;
159 }
160 }
161
162 // If we create our own jobserver then be sure to reserve one token
163 // for ourselves.
164 let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
165 client.acquire_raw().expect("failed to acquire initial");
166 client
167 }
168}
169
170#[test]
171fn it_works() {
172 struct Object;
173 let mut v = Vec::new();
174 for _ in 0..4000 {
175 v.push(Object);
176 }
177 compile_objects::<Object, (), (), _>(
178 &|_| {
179 println!("compile {:?}", std::thread::current().id());
180 Ok(())
181 },
182 &v,
183 )
184 .unwrap();
185}
186
187#[test]
188fn test_build_error() {
189 struct Object;
190 let mut v = Vec::new();
191 v.push(Object);
192 let err = compile_objects::<Object, (), (), _>(
193 &|_| {
194 return Err(());
195 },
196 &v,
197 )
198 .unwrap_err();
199
200 match err {
201 Error::BuildError(_) => {},
202 _ => panic!("Unexpected error."),
203 }
204}
205
206#[test]
207fn test_build_panic() {
208 struct Object;
209 let mut v = Vec::new();
210 v.push(Object);
211 let err = compile_objects::<Object, (), (), _>(
212 &|_| {
213 panic!("Panic.");
214 },
215 &v,
216 )
217 .unwrap_err();
218
219 match err {
220 Error::BuildPanic(_) => {},
221 _ => panic!("Unexpected error."),
222 }
223}