bagpipe 0.1.0

A concurrent bag datastructure.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
// Copyright 2017 the authors. See the 'Copyright and license' section of the
// README.md file at the top-level directory of this repository.
//
// Licensed under the Apache License, Version 2.0 (the LICENSE file). This file
// may not be copied, modified, or distributed except according to those terms.

//! Implements the `BagPipe` data structure, along with its core components.
//!
//! A `BagPipe` is a concurrent pool data-structure optimized for
//! throughput and not much else. The core idea is to have a large
//! number of concurrent queues and stacks, along with a way of
//! load-balancing them in a coordination-free way that avoids wasting
//! resources and keeping contention low.
//!
//! # Example
//! For general-purpose use, the `BagPipe` has a somewhat low-level
//! interface. Here is some simple example usage:
//!
//! ```rust,ignore
//! let bp = BagPipe::<GeneralYC<T>>::new();
//! for _ in 0..NUM_THREADS {
//!     let mut my_bp = bp.clone();
//!     thread::spawn(move || {
//!         // ... do work
//!         // loop until push is successful
//!         my_bp.push_mut()
//!         // ... more work
//!         if let PopResult::There(item) = my_bp.try_pop_mut() {
//!             // use item
//!         }
//!     });
//! }
//! ```
//!
//! If you are passing a word-sized type, it is possible to reduce
//! allocation overhead by storing the data in-line. To do this,
//! replace `GeneralYC` with `YangCrummeyQueue` in the type parameter
//! for `BagPipe`: this will switch out the underlying backing
//! data-structure.
//!
//! The API currently supports `try...` versions of methods,
//! allowing data-structures to signal lack of progress due to high
//! contention. It also provides `push` and `pop` methods that will loop
//! until they succeed (or do something more intelligent).
//!
//! # Guarantees
//!
//! The data-structures given here are all non-blocking. The `try`
//! methods using `YangCrummeyQueue` and `GeneralYC` will return in a
//! bounded number of steps, but there is no guarantee they will succeed
//! except if they execute in isolation (i.e. Obstruction Freedom when
//! called in a loop). In constrast, those using `FAAArrayQueue` have a
//! lock-free progress guarantee. In general, a `BagPipe` inherits its
//! progress guarantees from its underlying `SharedWeakBag`, but it may
//! return arbitrary values with respect to their ordering guarantees.
//!
//! `BagPipe`'s emptiness check is not currently linearizable, but I
//! believe it is still serializable. In other words, it is possible
//! to re-order the execution history of the data-structure to respect
//! program order, but calls returning `Empty` may be re-ordered to a
//! time before or after the real execution time of the operation. A
//! marginally slower linearizable emptiness check would not be
//! difficult to engineer, and it will hopefully be added to the API
//! soon.

extern crate crossbeam;
extern crate num_cpus;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicIsize, Ordering, fence};
use bag::{WeakBag, SharedWeakBag, RevocableWeakBag, Revocable, PopResult, PopStatus};
use crossbeam::mem::CachePadded;
use std::mem;

pub mod queue;
pub mod bag;

#[cfg(feature="prime_schedules")]
mod primes;

// Counters for the `size_guess` protocol.
const THRESHOLD_DIFF: isize = 4;
const N_COUNTERS: usize = 4;

/// A concurrent bag data-structure built from sharding requests over
/// other bags.
///
/// `BagPipe` implements both `SharedWeakBag` and `WeakBag`. Using this
/// as a `SharedWeakBag` tends to perform worse. Note that this should
/// never be used with `Arc<BagPipe>`, which will be much slower and
/// have an increased failure rate.
pub struct BagPipe<B: SharedWeakBag> {
    pipes: Arc<BagPipeState<B>>,
    offset: usize,
    stride: usize,
    push_failures: usize,
    pop_failures: usize,
    cur_diff: isize,
}

impl<B: SharedWeakBag> Drop for BagPipe<B> {
    fn drop(&mut self) {
        if self.cur_diff != 0 {
            self.push_diff();
        }
    }
}

impl<B: SharedWeakBag> Clone for BagPipe<B> {
    fn clone(&self) -> Self {
        #[cfg(feature="prime_schedules")]
        let offset = {
            primes::get(self.pipes.all_refs.fetch_add(1, Ordering::Relaxed) + 1)
        };
        #[cfg(not(feature="prime_schedules"))]
        let offset = {
            let seed = self.pipes.all_refs.fetch_add(1, Ordering::Relaxed) + 1;
            seed * 2 + 1
        };
        BagPipe {
            pipes: self.pipes.clone(),
            offset: offset & (self.pipes.pipes.len() - 1),
            stride: offset,
            push_failures: 0,
            pop_failures: 0,
            cur_diff: 0,
        }
    }
}

impl<B: SharedWeakBag> BagPipe<B> {
    fn push_diff(&mut self) {
        unsafe {
            self.pipes
                .counters
                .get_unchecked(self.offset % N_COUNTERS)
                .fetch_add(self.cur_diff, Ordering::Release);
        }
    }

    /// Create a new `BagPipe` with `size` pipes.
    pub fn new_size(size: usize) -> Self {
        #[cfg(feature="prime_schedules")]
        let offset = primes::get(1);
        #[cfg(not(feature="prime_schedules"))]
        let offset = 1;
        debug_assert!(size > 0);
        BagPipe {
            pipes: Arc::new(BagPipeState::new_size(size)),
            offset: offset,
            stride: offset,
            push_failures: 0,
            pop_failures: 0,
            cur_diff: 0,
        }
    }

    fn propagate_diff(&mut self, d: isize) {
        self.cur_diff += d;
        let thresh = THRESHOLD_DIFF;
        if self.cur_diff >= thresh || self.cur_diff <= -thresh {
            self.push_diff();
            self.cur_diff = 0;
        }
    }

    /// Return a guess of the current `BagPipe` size.
    ///
    /// This is implemented in a way that seeks to reduce overhead as
    /// much as possible. Currently there are 4 `AtomicIsize` counters
    /// in the global `BagPipeState` struct; one of these is updated
    /// when a thread accumulates a local diff (i.e. net pushes or pops)
    /// greater than a certain small constant. A thread querying the
    /// size of the `BagPipe` then simply sums these counters.
    ///
    /// Given this algorithm there are no consistency guarantees on
    /// this counter, not even eventual consistency. A counter is
    /// updated upon drop, so one "consistency guarantee" is that of
    /// a very weak quiescent consistency when "quiesce" means all
    /// threads that have pushed or popped from the data-structure have
    /// relinquished a handle on it. Note that this term usually refers
    /// to a sufficiently long period of inactivity.
    pub fn size_guess(&self) -> isize {
        use std::cmp;
        let mut total = 0;
        for ctr in &self.pipes.counters {
            total += ctr.load(Ordering::Acquire);
        }
        cmp::max(0, total)
    }
}

impl<B: SharedWeakBag> WeakBag for BagPipe<B> {
    type Item = B::Item;
    fn new() -> Self {
        #[cfg(feature="prime_schedules")]
        let offset = primes::get(1);
        #[cfg(not(feature="prime_schedules"))]
        let offset = 1;
        BagPipe {
            pipes: Arc::new(BagPipeState::new()),
            offset: offset,
            stride: offset,
            push_failures: 0,
            pop_failures: 0,
            cur_diff: 0,
        }
    }


    fn try_push_mut(&mut self, it: Self::Item) -> Result<(), Self::Item> {
        match self.pipes
                  .try_push_internal(it,
                                     self.offset,
                                     self.stride,
                                     self.push_failures + 1,
                                     false) {
            Ok(_) => {
                self.push_failures >>= 1;
                self.propagate_diff(1);
                Ok(())
            }
            Err(item) => {
                self.offset += self.stride;
                self.push_failures += 1;
                Err(item)
            }
        }
    }

    fn try_pop_mut(&mut self) -> PopResult<Self::Item> {
        let res = self.pipes
            .try_pop_internal(self.offset, self.stride, (self.pop_failures * 2) + 1);
        match res {
            Err(PopStatus::TransientFailure) => {
                self.offset += self.stride;
                self.pop_failures += 1;
                Err(PopStatus::TransientFailure)
            }
            Err(PopStatus::Empty) => {
                self.pop_failures >>= 1;
                Err(PopStatus::Empty)
            }
            Ok(item) => {
                self.pop_failures >>= 1;
                self.propagate_diff(-1);
                Ok(item)
            }
        }
    }

    fn push_mut(&mut self, it: Self::Item) {
        if let Err(it) = self.try_push_mut(it) {
            match self.pipes
                      .try_push_internal(it,
                                         self.offset,
                                         self.stride,
                                         self.push_failures + 1,
                                         true) {
                Ok(true) => {
                    self.push_failures >>= 1;
                }
                Ok(false) => {
                    self.offset += self.stride;
                    self.push_failures += 1;
                }
                Err(_) => unreachable!(),
            }
            self.propagate_diff(1);
        }
    }


    fn bulk_add<I: Iterator<Item = Self::Item>>(&mut self, iter: I) {
        let mut cur_index = self.offset;
        let p_len = self.pipes.pipes.len();
        let mut n_iters = 0;
        for item in iter {
            let mut it = item;
            loop {
                cur_index &= p_len - 1;
                let res = unsafe { self.pipes.pipes.get_unchecked(cur_index).try_push(it) };
                cur_index += self.stride;
                match res {
                    Ok(_) => {
                        n_iters += 1;
                        break;
                    }
                    Err(old_item) => {
                        it = old_item;
                    }
                }
            }
        }
        self.propagate_diff(n_iters)
    }
}

impl<B: RevocableWeakBag> BagPipe<B>
    where B::Item: Revocable
{
    /// Attempt to revoke `it` from membership in the `BagPipe`.
    ///
    /// This is simply an implementation of portions of the `Revocable`
    /// trait for a `BagPipe`.  We don't implement the trait here
    /// because it inherits from SharedWeakBag and BagPipes have
    /// unexpected behavior when used without the _mut() methods. This
    /// problem could be solved if we had "or" trait inheritance, but
    /// that could be more trouble than its worth :-)
    pub unsafe fn revoke(it: &B::Item) -> bool {
        B::revoke(it)
    }
}

struct BagPipeState<B: SharedWeakBag> {
    all_refs: AtomicUsize,
    counters: [CachePadded<AtomicIsize>; N_COUNTERS],
    pipes: Vec<B>,
}

impl<B: SharedWeakBag> BagPipeState<B> {
    // Creates a new `BagPipeState` with `sz` pipes, rounded up to the
    // next power of two.
    pub fn new_size(sz: usize) -> Self {
        let len = sz.next_power_of_two();
        let mut res = BagPipeState {
            all_refs: AtomicUsize::new(1),
            counters: unsafe { mem::transmute([[0 as usize; 32]; N_COUNTERS]) },
            pipes: Vec::with_capacity(len),
        };
        for _ in 0..len {
            res.pipes.push(B::new())
        }
        fence(Ordering::Acquire);
        res
    }

    // Creates a new `BagPipeState` with a small number of pipes.
    pub fn new() -> Self {
        Self::new_size(num_cpus::get() * 2)
    }

    // Attempts to push `it` down a pipe, following a schedule specified
    // by offset, allowing for at most `max_failures` failures.
    pub fn try_push_internal(&self,
                             it: B::Item,
                             offset: usize,
                             stride: usize,
                             max_failures: usize,
                             succeed_final: bool)
                             -> Result<bool, B::Item> {
        let mut ix = offset;
        let mut remaining = max_failures;
        let len = self.pipes.len();
        debug_assert!(len.is_power_of_two());
        let mut cur_item = it;
        while remaining > 0 {
            ix &= len - 1;
            unsafe {
                if succeed_final && remaining == 1 {
                    self.pipes.get_unchecked(ix).push(cur_item);
                    // indicates whether the final cell was used
                    return Ok(false);
                } else {
                    match self.pipes.get_unchecked(ix).try_push(cur_item) {
                        Ok(()) => return Ok(true),
                        Err(item) => cur_item = item,
                    }
                }
            }
            ix += stride;
            remaining -= 1;
        }
        Err(cur_item)
    }

    pub fn try_pop_internal(&self,
                            offset: usize,
                            stride: usize,
                            max_failures: usize)
                            -> PopResult<B::Item> {
        let mut ix = offset;
        let mut remaining = max_failures;
        let mut empties = 0;
        let len = self.pipes.len();
        debug_assert!(len.is_power_of_two());
        debug_assert!(max_failures > 0);
        #[cfg(debug_assertions)]
        let mut seen = Vec::new();
        loop {
            ix &= len - 1;
            unsafe {
                match self.pipes.get_unchecked(ix).try_pop() {
                    Ok(it) => return Ok(it),
                    Err(PopStatus::Empty) => {
                        #[cfg(debug_assertions)]
                        seen.push(ix);
                        empties += 1
                    }
                    Err(PopStatus::TransientFailure) => empties = 0,
                }
            };
            ix += stride;
            remaining -= 1;
            if remaining == 0 {
                return Err(PopStatus::TransientFailure);
            }
            // This is not linearizable, but I believe it is
            // sequentially consistent.
            if empties == len {
                #[cfg(debug_assertions)]
                {
                    seen.sort();
                    seen.dedup();
                    let expected: Vec<usize> = (0..len).collect();
                    assert_eq!(seen,
                               expected,
                               "got {:?} but expected {:?}, with offset={} and stride={}",
                               seen,
                               expected,
                               offset,
                               stride);
                }
                return Err(PopStatus::Empty);
            }
        }
    }
}