build_parallel/
lib.rs

1use crossbeam_utils::thread;
2use std::any::Any;
3use std::env;
4use std::io;
5
6/// Represents the types of errors that may occur while using build-parallel.
7#[derive(Debug)]
8pub enum Error<E> {
9    /// Error occurred while internally performing I/O.
10    IOError(io::Error),
11    /// Error occurred during build callback.
12    BuildError(E),
13    /// Panic occurred during build callback.
14    BuildPanic(Box<dyn Any + Send + 'static>),
15}
16
17fn compile_object<T, R, E, F>(f: F, obj: &T) -> Result<R, Error<E>>
18where
19    T: 'static + Sync,
20    R: 'static + Sync + Send,
21    E: 'static + Sync + Send,
22    F: Fn(&T) -> Result<R, E> + Sync + Send,
23{
24    f(obj).map_err(Error::BuildError)
25}
26
27pub fn compile_objects<T, R, E, F>(f: &F, objs: &[T]) -> Result<Vec<R>, Error<E>>
28where
29    T: 'static + Sync,
30    R: 'static + Sync + Send,
31    E: 'static + Sync + Send,
32    F: Fn(&T) -> Result<R, E> + Sync + Send,
33{
34    use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
35    use std::sync::Once;
36
37    // Limit our parallelism globally with a jobserver. Start off by
38    // releasing our own token for this process so we can have a bit of an
39    // easier to write loop below. If this fails, though, then we're likely
40    // on Windows with the main implicit token, so we just have a bit extra
41    // parallelism for a bit and don't reacquire later.
42    let server = jobserver();
43    let reacquire = server.release_raw().is_ok();
44
45    let res = thread::scope(|s| {
46        // When compiling objects in parallel we do a few dirty tricks to speed
47        // things up:
48        //
49        // * First is that we use the `jobserver` crate to limit the parallelism
50        //   of this build script. The `jobserver` crate will use a jobserver
51        //   configured by Cargo for build scripts to ensure that parallelism is
52        //   coordinated across C compilations and Rust compilations. Before we
53        //   compile anything we make sure to wait until we acquire a token.
54        //
55        //   Note that this jobserver is cached globally so we only used one per
56        //   process and only worry about creating it once.
57        //
58        // * Next we use a raw `thread::spawn` per thread to actually compile
59        //   objects in parallel. We only actually spawn a thread after we've
60        //   acquired a token to perform some work
61        //
62        // * Finally though we want to keep the dependencies of this crate
63        //   pretty light, so we avoid using a safe abstraction like `rayon` and
64        //   instead rely on some bits of `unsafe` code. We know that this stack
65        //   frame persists while everything is compiling so we use all the
66        //   stack-allocated objects without cloning/reallocating. We use a
67        //   transmute to `State` with a `'static` lifetime to persist
68        //   everything we need across the boundary, and the join-on-drop
69        //   semantics of `JoinOnDrop` should ensure that our stack frame is
70        //   alive while threads are alive.
71        //
72        // With all that in mind we compile all objects in a loop here, after we
73        // acquire the appropriate tokens, Once all objects have been compiled
74        // we join on all the threads and propagate the results of compilation.
75        //
76        // Note that as a slight optimization we try to break out as soon as
77        // possible as soon as any compilation fails to ensure that errors get
78        // out to the user as fast as possible.
79        let error = AtomicBool::new(false);
80        let mut handles = Vec::new();
81        for obj in objs {
82            if error.load(SeqCst) {
83                break;
84            }
85            let token = server.acquire().map_err(Error::IOError)?;
86            let state = State { obj, error: &error };
87            let state = unsafe { std::mem::transmute::<State<T>, State<'static, T>>(state) };
88            handles.push(s.spawn(|_| {
89                let state: State<T> = state; // erase the `'static` lifetime
90                let result = compile_object(f, state.obj);
91                if result.is_err() {
92                    state.error.store(true, SeqCst);
93                }
94                drop(token); // make sure our jobserver token is released after the compile
95                result
96            }));
97        }
98
99        let mut output = Vec::new();
100        for handle in handles {
101            match handle.join().map_err(Error::BuildPanic)? {
102                Ok(r) => output.push(r),
103                Err(err) => return Err(err),
104            }
105        }
106
107        Ok(output)
108    })
109    .map_err(Error::BuildPanic)?;
110
111    // Reacquire our process's token before we proceed, which we released
112    // before entering the loop above.
113    if reacquire {
114        server.acquire_raw().map_err(Error::IOError)?;
115    }
116
117    return res;
118
119    /// Shared state from the parent thread to the child thread. This
120    /// package of pointers is temporarily transmuted to a `'static`
121    /// lifetime to cross the thread boundary and then once the thread is
122    /// running we erase the `'static` to go back to an anonymous lifetime.
123    struct State<'a, O> {
124        obj: &'a O,
125        error: &'a AtomicBool,
126    }
127
128    /// Returns a suitable `jobserver::Client` used to coordinate
129    /// parallelism between build scripts.
130    fn jobserver() -> &'static jobserver::Client {
131        static INIT: Once = Once::new();
132        static mut JOBSERVER: Option<jobserver::Client> = None;
133
134        fn _assert_sync<T: Sync>() {}
135        _assert_sync::<jobserver::Client>();
136
137        unsafe {
138            INIT.call_once(|| {
139                let server = default_jobserver();
140                JOBSERVER = Some(server);
141            });
142            JOBSERVER.as_ref().unwrap()
143        }
144    }
145
146    unsafe fn default_jobserver() -> jobserver::Client {
147        // Try to use the environmental jobserver which Cargo typically
148        // initializes for us...
149        if let Some(client) = jobserver::Client::from_env() {
150            return client;
151        }
152
153        // ... but if that fails for whatever reason fall back to the number
154        // of cpus on the system or the `NUM_JOBS` env var.
155        let mut parallelism = num_cpus::get();
156        if let Ok(amt) = env::var("NUM_JOBS") {
157            if let Ok(amt) = amt.parse() {
158                parallelism = amt;
159            }
160        }
161
162        // If we create our own jobserver then be sure to reserve one token
163        // for ourselves.
164        let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
165        client.acquire_raw().expect("failed to acquire initial");
166        client
167    }
168}
169
170#[test]
171fn it_works() {
172    struct Object;
173    let mut v = Vec::new();
174    for _ in 0..4000 {
175        v.push(Object);
176    }
177    compile_objects::<Object, (), (), _>(
178        &|_| {
179            println!("compile {:?}", std::thread::current().id());
180            Ok(())
181        },
182        &v,
183    )
184    .unwrap();
185}
186
187#[test]
188fn test_build_error() {
189    struct Object;
190    let mut v = Vec::new();
191    v.push(Object);
192    let err = compile_objects::<Object, (), (), _>(
193        &|_| {
194            return Err(());
195        },
196        &v,
197    )
198    .unwrap_err();
199
200    match err {
201        Error::BuildError(_) => {},
202        _ => panic!("Unexpected error."),
203    }
204}
205
206#[test]
207fn test_build_panic() {
208    struct Object;
209    let mut v = Vec::new();
210    v.push(Object);
211    let err = compile_objects::<Object, (), (), _>(
212        &|_| {
213            panic!("Panic.");
214        },
215        &v,
216    )
217    .unwrap_err();
218
219    match err {
220        Error::BuildPanic(_) => {},
221        _ => panic!("Unexpected error."),
222    }
223}