1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
/// Scoped thread.
///
/// # Examples
///
/// A basic scoped thread:
///
/// ```
/// crossbeam_utils::thread::scope(|scope| {
///     scope.spawn(|| {
///         println!("Hello from a scoped thread!");
///     });
/// });
/// ```
///
/// When writing concurrent Rust programs, you'll sometimes see a pattern like this, using
/// [`std::thread::spawn`][spawn]:
///
/// ```ignore
/// let array = [1, 2, 3];
/// let mut guards = vec![];
///
/// for i in &array {
///     let guard = std::thread::spawn(move || {
///         println!("element: {}", i);
///     });
///
///     guards.push(guard);
/// }
///
/// for guard in guards {
///     guard.join().unwrap();
/// }
/// ```
///
/// The basic pattern is:
///
/// 1. Iterate over some collection.
/// 2. Spin up a thread to operate on each part of the collection.
/// 3. Join all the threads.
///
/// However, this code actually gives an error:
///
/// ```text
/// error: `array` does not live long enough
/// for i in &array {
///           ^~~~~
/// in expansion of for loop expansion
/// note: expansion site
/// note: reference must be valid for the static lifetime...
/// note: ...but borrowed value is only valid for the block suffix following statement 0 at ...
///     let array = [1, 2, 3];
///     let mut guards = vec![];
///
///     for i in &array {
///         let guard = std::thread::spawn(move || {
///             println!("element: {}", i);
/// ...
/// error: aborting due to previous error
/// ```
///
/// Because [`std::thread::spawn`][spawn] doesn't know about this scope, it requires a
/// `'static` lifetime. One way of giving it a proper lifetime is to use an [`Arc`][arc]:
///
/// [arc]: http://doc.rust-lang.org/stable/std/sync/struct.Arc.html
/// [spawn]: https://doc.rust-lang.org/stable/std/thread/fn.spawn.html
///
/// ```
/// use std::sync::Arc;
///
/// let array = Arc::new([1, 2, 3]);
/// let mut guards = vec![];
///
/// for i in 0..array.len() {
///     let a = array.clone();
///
///     let guard = std::thread::spawn(move || {
///         println!("element: {}", a[i]);
///     });
///
///     guards.push(guard);
/// }
///
/// for guard in guards {
///     guard.join().unwrap();
/// }
/// ```
///
/// But this introduces unnecessary allocation, as `Arc<T>` puts its data on the heap, and we
/// also end up dealing with reference counts. We know that we're joining the threads before
/// our function returns, so just taking a reference _should_ be safe. Rust can't know that,
/// though.
///
/// Enter scoped threads. Here's our original example, using `spawn` from crossbeam rather
/// than from `std::thread`:
///
/// ```
/// let array = [1, 2, 3];
///
/// crossbeam_utils::thread::scope(|scope| {
///     for i in &array {
///         scope.spawn(move || {
///             println!("element: {}", i);
///         });
///     }
/// });
/// ```
///
/// Much more straightforward.
// FIXME(jeehoonkang): maybe we should create a new crate for scoped threads.

use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::ops::DerefMut;
use std::rc::Rc;
use std::thread;
use std::io;

#[doc(hidden)]
trait FnBox<T> {
    fn call_box(self: Box<Self>) -> T;
}

impl<T, F: FnOnce() -> T> FnBox<T> for F {
    fn call_box(self: Box<Self>) -> T {
        (*self)()
    }
}

/// Like `std::thread::spawn`, but without the closure bounds.
pub unsafe fn spawn_unchecked<'a, F>(f: F) -> thread::JoinHandle<()>
where
    F: FnOnce() + Send + 'a,
{
    let builder = thread::Builder::new();
    builder_spawn_unchecked(builder, f).unwrap()
}

/// Like `std::thread::Builder::spawn`, but without the closure bounds.
pub unsafe fn builder_spawn_unchecked<'a, F>(
    builder: thread::Builder,
    f: F,
) -> io::Result<thread::JoinHandle<()>>
where
    F: FnOnce() + Send + 'a,
{
    let closure: Box<FnBox<()> + 'a> = Box::new(f);
    let closure: Box<FnBox<()> + Send> = mem::transmute(closure);
    builder.spawn(move || closure.call_box())
}

pub struct Scope<'a> {
    /// The list of the deferred functions and thread join jobs.
    dtors: RefCell<Option<DtorChain<'a, ()>>>,
    // !Send + !Sync
    _marker: PhantomData<*const ()>,
}

struct DtorChain<'a, T> {
    dtor: Box<FnBox<T> + 'a>,
    next: Option<Box<DtorChain<'a, T>>>,
}

impl<'a, T> DtorChain<'a, T> {
    pub fn pop(chain: &mut Option<DtorChain<'a, T>>) -> Option<Box<FnBox<T> + 'a>> {
        chain.take().map(|mut node| {
            *chain = node.next.take().map(|b| *b);
            node.dtor
        })
    }
}

struct JoinState<T> {
    join_handle: thread::JoinHandle<()>,
    result: usize,
    _marker: PhantomData<T>,
}

impl<T: Send> JoinState<T> {
    fn new(join_handle: thread::JoinHandle<()>, result: usize) -> JoinState<T> {
        JoinState {
            join_handle: join_handle,
            result: result,
            _marker: PhantomData,
        }
    }

    fn join(self) -> thread::Result<T> {
        let result = self.result;
        self.join_handle.join().map(|_| {
            unsafe { ManuallyDrop::into_inner(*Box::from_raw(result as *mut ManuallyDrop<T>))}
        })
    }
}

/// A handle to a scoped thread
pub struct ScopedJoinHandle<'a, T: 'a> {
    // !Send + !Sync
    inner: Rc<RefCell<Option<JoinState<T>>>>,
    thread: thread::Thread,
    _marker: PhantomData<&'a T>,
}

/// Create a new `scope`, for deferred destructors.
///
/// Scopes, in particular, support [*scoped thread spawning*](struct.Scope.html#method.spawn).
///
/// # Examples
///
/// Creating and using a scope:
///
/// ```
/// crossbeam_utils::thread::scope(|scope| {
///     scope.defer(|| println!("Exiting scope"));
///     scope.spawn(|| println!("Running child thread in scope"))
/// });
/// // Prints messages in the reverse order written
/// ```
///
/// # Panics
///
/// `thread::scope()` panics if a spawned thread panics but it is not joined inside the scope.
pub fn scope<'a, F, R>(f: F) -> R
where
    F: FnOnce(&Scope<'a>) -> R,
{
    let mut scope = Scope {
        dtors: RefCell::new(None),
        _marker: PhantomData,
    };
    let ret = f(&scope);
    scope.drop_all();
    ret
}

impl<'a> fmt::Debug for Scope<'a> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Scope {{ ... }}")
    }
}

impl<'a, T> fmt::Debug for ScopedJoinHandle<'a, T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "ScopedJoinHandle {{ ... }}")
    }
}

impl<'a> Scope<'a> {
    // This method is carefully written in a transactional style, so
    // that it can be called directly and, if any dtor panics, can be
    // resumed in the unwinding this causes. By initially running the
    // method outside of any destructor, we avoid any leakage problems
    // due to @rust-lang/rust#14875.
    fn drop_all(&mut self) {
        while let Some(dtor) = DtorChain::pop(&mut self.dtors.borrow_mut()) {
            dtor.call_box();
        }
    }

    /// Schedule code to be executed when exiting the scope.
    ///
    /// This is akin to having a destructor on the stack, except that it is
    /// *guaranteed* to be run. It is guaranteed that the function is called
    /// after all the spawned threads are joined.
    pub fn defer<F>(&self, f: F)
    where
        F: FnOnce() + 'a,
    {
        let mut dtors = self.dtors.borrow_mut();
        *dtors = Some(DtorChain {
            dtor: Box::new(f),
            next: dtors.take().map(Box::new),
        });
    }

    /// Create a scoped thread.
    ///
    /// `spawn` is similar to the [`spawn`][spawn] function in Rust's standard library. The
    /// difference is that this thread is scoped, meaning that it's guaranteed to terminate
    /// before the current stack frame goes away, allowing you to reference the parent stack frame
    /// directly. This is ensured by having the parent thread join on the child thread before the
    /// scope exits.
    ///
    /// [spawn]: http://doc.rust-lang.org/std/thread/fn.spawn.html
    pub fn spawn<'s, F, T>(&'s self, f: F) -> ScopedJoinHandle<'a, T>
    where
        'a: 's,
        F: FnOnce() -> T + Send + 'a,
        T: Send + 'a,
    {
        self.builder().spawn(f).unwrap()
    }

    /// Generates the base configuration for spawning a scoped thread, from which configuration
    /// methods can be chained.
    pub fn builder<'s>(&'s self) -> ScopedThreadBuilder<'s, 'a> {
        ScopedThreadBuilder {
            scope: self,
            builder: thread::Builder::new(),
        }
    }
}

/// Scoped thread configuration. Provides detailed control over the properties and behavior of new
/// scoped threads.
pub struct ScopedThreadBuilder<'s, 'a: 's> {
    scope: &'s Scope<'a>,
    builder: thread::Builder,
}

impl<'s, 'a: 's> ScopedThreadBuilder<'s, 'a> {
    /// Names the thread-to-be. Currently the name is used for identification only in panic
    /// messages.
    pub fn name(mut self, name: String) -> ScopedThreadBuilder<'s, 'a> {
        self.builder = self.builder.name(name);
        self
    }

    /// Sets the size of the stack for the new thread.
    pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'s, 'a> {
        self.builder = self.builder.stack_size(size);
        self
    }

    /// Spawns a new thread, and returns a join handle for it.
    pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'a, T>>
    where
        F: FnOnce() -> T + Send + 'a,
        T: Send + 'a,
    {
        // The `Box` constructed below is written only by the spawned thread,
        // and read by the current thread only after the spawned thread is
        // joined (`JoinState::join()`). Thus there are no data races.
        let result = Box::into_raw(Box::<ManuallyDrop<T>>::new(unsafe { mem::uninitialized() })) as usize;

        let join_handle = try!(unsafe {
            builder_spawn_unchecked(self.builder, move || {
                let mut result = Box::from_raw(result as *mut ManuallyDrop<T>);
                *result = ManuallyDrop::new(f());
                mem::forget(result);
            })
        });
        let thread = join_handle.thread().clone();

        let join_state = JoinState::<T>::new(join_handle, result);
        let deferred_handle = Rc::new(RefCell::new(Some(join_state)));
        let my_handle = deferred_handle.clone();

        self.scope.defer(move || {
            let state = mem::replace(deferred_handle.borrow_mut().deref_mut(), None);
            if let Some(state) = state {
                state.join().unwrap();
            }
        });

        Ok(ScopedJoinHandle {
            inner: my_handle,
            thread: thread,
            _marker: PhantomData,
        })
    }
}

impl<'a, T: Send + 'a> ScopedJoinHandle<'a, T> {
    /// Join the scoped thread, returning the result it produced.
    pub fn join(self) -> thread::Result<T> {
        let state = mem::replace(self.inner.borrow_mut().deref_mut(), None);
        state.unwrap().join()
    }

    /// Get the underlying thread handle.
    pub fn thread(&self) -> &thread::Thread {
        &self.thread
    }
}

impl<'a> Drop for Scope<'a> {
    fn drop(&mut self) {
        self.drop_all()
    }
}