1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
#[cfg(rayon_unstable)] use future::{self, Future, RayonFuture}; #[allow(unused_imports)] use latch::{Latch, SpinLatch}; use job::*; use registry::Registry; use std::any::Any; use std::mem; use std::sync::Arc; use unwind; /// Fires off a task into the Rayon threadpool in the "static" or /// "global" scope. Just like a standard thread, this task is not /// tied to the current stack frame, and hence it cannot hold any /// references other than those with `'static` lifetime. If you want /// to spawn a task that references stack data, use [the `scope()` /// function][scope] to create a scope. /// /// [scope]: fn.scope.html /// /// Since tasks spawned with this function cannot hold references into /// the enclosing stack frame, you almost certainly want to use a /// `move` closure as their argument (otherwise, the closure will /// typically hold references to any variables from the enclosing /// function that you happen to use). /// /// This API assumes that the closure is executed purely for its /// side-effects (i.e., it might send messages, modify data protected /// by a mutex, or some such thing). If you want to compute a result, /// consider `spawn_future()`. /// /// # Panic handling /// /// If this closure should panic, the resulting panic will be /// propagated to the panic handler registered in the `Configuration`, /// if any. See [`Configuration::panic_handler()`][ph] for more /// details. /// /// [ph]: struct.Configuration.html#method.panic_handler /// /// # Examples /// /// This code creates a Rayon task that increments a global counter. /// /// ```rust /// # use rayon_core as rayon; /// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; /// /// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; /// /// rayon::spawn(move || { /// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst); /// }); /// ``` pub fn spawn<F>(func: F) where F: FnOnce() + Send + 'static { // We assert that current registry has not terminated. unsafe { spawn_in(func, &Registry::current()) } } /// Spawn an asynchronous job in `registry.` /// /// Unsafe because `registry` must not yet have terminated. /// /// Not a public API, but used elsewhere in Rayon. pub unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>) where F: FnOnce() + Send + 'static { // Ensure that registry cannot terminate until this job has // executed. This ref is decremented at the (*) below. registry.increment_terminate_count(); let async_job = Box::new(HeapJob::new({ let registry = registry.clone(); move || { match unwind::halt_unwinding(func) { Ok(()) => { } Err(err) => { registry.handle_panic(err); } } registry.terminate(); // (*) permit registry to terminate now } })); // We assert that this does not hold any references (we know // this because of the `'static` bound in the inferface); // moreover, we assert that the code below is not supposed to // be able to panic, and hence the data won't leak but will be // enqueued into some deque for later execution. let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic let job_ref = HeapJob::as_job_ref(async_job); registry.inject_or_push(job_ref); mem::forget(abort_guard); } /// Spawns a future in the static scope, scheduling it to execute on /// Rayon's threadpool. Returns a new future that can be used to poll /// for the result. Since this future is executing in the static scope, /// it cannot hold references to things in the enclosing stack frame; /// if you would like to hold such references, use [the `scope()` /// function][scope] to create a scope. /// /// [scope]: fn.scope.html /// /// # Panic handling /// /// If this future should panic, that panic will be propagated when /// `poll()` is invoked on the return value. #[cfg(rayon_unstable)] pub fn spawn_future<F>(future: F) -> RayonFuture<F::Item, F::Error> where F: Future + Send + 'static { /// We assert that the current registry cannot yet have terminated. unsafe { spawn_future_in(future, Registry::current()) } } /// Internal helper function. /// /// Unsafe because caller must guarantee that `registry` has not yet terminated. #[cfg(rayon_unstable)] pub unsafe fn spawn_future_in<F>(future: F, registry: Arc<Registry>) -> RayonFuture<F::Item, F::Error> where F: Future + Send + 'static { let scope = StaticFutureScope::new(registry.clone()); future::new_rayon_future(future, scope) } #[cfg(rayon_unstable)] struct StaticFutureScope { registry: Arc<Registry> } #[cfg(rayon_unstable)] impl StaticFutureScope { /// Caller asserts that the registry has not yet terminated. unsafe fn new(registry: Arc<Registry>) -> Self { registry.increment_terminate_count(); StaticFutureScope { registry: registry } } } /// We assert that: /// /// (a) the scope valid remains valid until a completion method /// is called. In this case, "remains valid" means that the /// registry is not terminated. This is true because we /// acquire a "termination count" in `StaticFutureScope::new()` /// which is not released until `future_panicked()` or /// `future_completed()` is invoked. /// (b) the lifetime `'static` will not end until a completion /// method is called. This is true because `'static` doesn't /// end until the end of the program. #[cfg(rayon_unstable)] unsafe impl future::FutureScope<'static> for StaticFutureScope { fn registry(&self) -> Arc<Registry> { self.registry.clone() } fn future_panicked(self, err: Box<Any + Send>) { self.registry.handle_panic(err); self.registry.terminate(); } fn future_completed(self) { self.registry.terminate(); } } #[cfg(test)] mod test;