bombs 0.2.1

Efficient single-producer multi-consumer channel types.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
mod multi; pub use multi::*;

use std::{
    mem::MaybeUninit,
    ptr,
};

use crate::types::{
    sync::{
        atomic::{ AtomicPtr, AtomicBool, Ordering },
        Arc,
    },
    cell::{ Cell, UnsafeCell },
    thread::{ self, Thread },
    hint::spin_loop,
};

use sptr::Strict;

/// A type alias for [`Fuse<T>`].
pub type Fuze<T> = Fuse<T>;

/// A type alias for [`Flame`].
pub type Fire = Flame;

const EXPLODED_STATE: usize = 1 << 0;
const DROPPED_STATE: usize = 1 << 1;
const PARKED_STATE: usize = 1 << 2;

const STATE_MASK: usize = 0b111;

/// Linked list node structure for storing parked threads.

// Forces stack instances of this struct to be in a memory address
// which is a multiple of 8. Hence, the lower three bits of any pointer
// to this struct are free to use as state bits for our bomb.
#[repr(align(8))]
struct ParkedThread {
    thread: Cell<Option<Thread>>,

    /// Returns true after the thread has been unparked
    /// by the `Fuse`.
    signalled: AtomicBool,

    /// Pointer to the next node.
    next: *const ParkedThread,
}

struct Container<T> {
    /// Encoded atomic pointer to the head of the FILO linked list of parked threads,
    /// which contains the current state of the bombs in the lower two bits.
    ///
    /// Parked State | Dropped State | Exploded State | Explanation
    /// -------------|---------------|----------------|------------
    /// 0 | 0 | 0 | Bombs have not yet exploded, and the inner container has not yet been dropped. There is at least one bomb and/or fuse still alive keeping the inner container from being dropped.
    /// 0 | 0 | 1 | Bombs have exploded, but the inner container has not yet been dropped. There is at least one bomb and/or flame still alive keeping the inner container from being dropped.
    /// 1 | 0 | 1 | Bombs have exploded, but the inner container has not yet been dropped. A flame is parked, waiting for the container to be dropped.
    /// 0 | 1 | 1 | Bombs have exploded, and been dropped, along with the inner container and the fuse. No flame was parked.
    /// 0 | 1 | 0 | All bombs have been dropped before they ever exploded. All fuses have been dropped before they were ever lit. This state would not be observable, as this would also be dropped, but technically exists.
    /// 1 | 1 | 1 | All bombs have exploded and been dropped. A flame was parked but is now being unparked.
    /// 1 | x | 0 | Invalid state.
    state_and_queue: Arc<AtomicPtr<ParkedThread>>,

    // The field is wrapped in `ManuallyDrop` so that its lifetime can be controlled
    // in Container::drop.
    cell: UnsafeCell<MaybeUninit<T>>,
}

impl<T> Container<T> {
    pub fn new() -> Self {
        Self {
            state_and_queue: Arc::new(AtomicPtr::new(ptr::null_mut())),

            cell: UnsafeCell::new(MaybeUninit::uninit()),
        }
    }
}

impl<T> Drop for Container<T> {
    fn drop(&mut self) {
        // Set state to dropped.
        let state_and_thread = self.state_and_queue.swap(
            sptr::invalid_mut(DROPPED_STATE),
            Ordering::AcqRel
        );

        let state = Strict::addr(state_and_thread) & STATE_MASK;

        debug_assert!(state & DROPPED_STATE == 0, "Bomb container has already been dropped.");

        // If the bomb has exploded, drop the inner value.
        if state & EXPLODED_STATE != 0 {
            // SAFETY: Guaranteed to have a value, it is safe to assume init.

            unsafe {
                // Replace with uninitialised memory, which is safe to not drop.
                #[cfg(not(loom))]
                let value = self.cell.get().replace(MaybeUninit::uninit());
                #[cfg(loom)]
                let value = self.cell.get_mut().with(|p| p.replace(MaybeUninit::uninit()));
                // Acquire value and drop.
                value.assume_init();
            }
        }

        if state & PARKED_STATE == 0 {
            // Flame thread was not parked, we can return early.
            return;
        }

        let thread = Strict::with_addr(state_and_thread,
                Strict::addr(state_and_thread) & !STATE_MASK
            ) as *const ParkedThread;

        // Unpark flame thread.
        unsafe {
            if let Some(parked) = thread.as_ref() {
                // Move the thread into the scope in case
                // the thread wakes up and drops the instance
                // before we can call `unpark`.
                let thread = parked.thread.take().unwrap();

                // Signal the thread.
                parked.signalled.store(true, Ordering::Release);

                // Unpark the thread.
                thread.unpark();
            }
        }
    }
}

/// A one-time use blocking bomb.
///
/// The [`Fuse<T>`] for this `Bomb` can be lit only once,
/// and the bomb will remain in an exploded state for the
/// remainder of the existence of this instance.
///
/// `Bomb`s can be safely cloned and sent between threads.
pub struct Bomb<T> {
    /// Inner data container.
    /// Shared across all bombs and fuses.
    data: Arc<Container<T>>,
}

// SAFETY: Since `Bomb`s give access to a `&T`, it is required
// that `T` is `Sync`. Also, since the final `Bomb` drops the
// value (gains ownership of it), it is required that `T` is also `Send`.
unsafe impl<T: Send + Sync> Send for Bomb<T> { }
// SAFETY: Since `&Bomb<T>` is `Clone`, `T` must be `Send` for
// `Bomb<T>` to be `Sync`, so that it is safe to clone
// on different threads to which it was initialised.
unsafe impl<T: Send + Sync> Sync for Bomb<T> { }

// Derive requires `T: Clone` which is not necessary.
// Implement manually instead.
impl<T> Clone for Bomb<T> {
    fn clone(&self) -> Self {
        Self { data: self.data.clone() }
    }
}

/// A one-time use fuse.
///
/// This `Fuse` can only be lit once, and will explode all
/// [`Bomb<T>`] instances associated with it. The `Fuse`
/// instance is consumed on use.
///
/// `Fuse`s cannot be cloned, but may be moved between threads.
pub struct Fuse<T> {
    /// Inner data container.
    /// Shared across all bombs and fuses.
    data: Arc<Container<T>>,
}

// SAFETY: Since `Bomb`s give access to a `&T`, it is required
// that `T` is `Sync`, otherwise this `Fuse` cannot be transferred
// to different threads. Also, since the final `Bomb` drops the value
// (gains ownership of it), it is required that `T` is also `Send`.
unsafe impl<T: Send + Sync> Send for Fuse<T> { }
// SAFETY: Lighting a `Fuse` is consuming and requires ownership, which
// ensures that there is only one instance modifying the internal state.
// `&Fuse<T>` cannot access the internal state.
unsafe impl<T> Sync for Fuse<T> { }

/// Result of a bomb explosion.
///
/// `Flame` is a handle returned by [`Fuse<T>::light()`], which
/// allows to check whether all bombs have been extinguished (dropped).
///
/// [`Fuse<T>::light()`]: ./struct.Fuse.html#method.light
pub struct Flame {
    /// Encoded atomic pointer to a parked thread, which contains the current
    /// state of the bombs in the lower two bits.
    /// See [`Container`] for further details.
    state_and_thread: Arc<AtomicPtr<ParkedThread>>,
}

impl<T> Bomb<T> {
    /// Creates a new single producer [`Fuse<T>`], and a multi-consumer `Bomb<T>`.
    ///
    /// Instances of `Bomb<T>` may be safely cloned and sent between threads.
    pub fn new() -> (Fuse<T>, Bomb<T>) {
        let fuse = Fuse::new();

        let bomb = Self {
            data: fuse.data.clone(),
        };

        (fuse, bomb)
    }

    /// Returns `Some` if the `Bomb` has exploded.
    ///
    /// Once the `Bomb` has exploded, it will remain
    /// in an exploded state for the remainder of the
    /// existence of this `Bomb` instance.
    /// It is expected that the `Bomb` is dropped after explosion,
    /// to notify the original [`Fuse`] (now [`Flame`]) that this `Bomb`
    /// has completed processing the data.
    pub fn exploded(&self) -> Option<&T> {
        // Check if the `Bomb` has exploded yet.
        // Return early if it has not.
        // This avoids the need to grab a raw pointer to the data cell.
        if self.data.state_and_queue.load(Ordering::Acquire) as usize & EXPLODED_STATE == 0 {
            None
        }
        else {
            // The `Bomb` has exploded, we have data.

            // SAFETY: Due to the implementation of `Fuse`,
            // if `exploded` is true, then there are no other
            // mutable references for the pointer to the cell,
            // hence it is safe to read across multiple threads.
            #[cfg(not(loom))]
            unsafe { Some(self.data.cell.get().as_ref().unwrap().assume_init_ref()) }
            #[cfg(loom)]
            unsafe { Some(self.data.cell.get().with(|p| p.as_ref().unwrap().assume_init_ref())) }
        }
    }

    /// Blocks the current thread and waits to receive data from explosion.
    ///
    /// If the `Bomb` has already exploded, this method returns immediately, and
    /// does not block.
    pub fn wait_for_explosion(&self) -> &T {
        // Spin a couple times to wait and check for explosion.
        // Return the value if it has exploded while spinning.
        if let Some(value) = spin(SPIN_ITERATIONS, || self.exploded()) {
            return value;
        }

        // It is taking longer to explode, block the current thread.
        self.block();

        self.exploded().unwrap()
    }

    fn block(&self) {
        // Use Relaxed ordering as at this stage as we do not need
        // to access any other data.
        let mut state_and_queue = self.data.state_and_queue.load(Ordering::Relaxed);

        loop {
            // Check for explosion.
            // Return if we no longer need to attempt to block.
            if state_and_queue as usize & EXPLODED_STATE != 0 {
                return;
            }

            // Bomb has not yet exploded, try to push node to the queue.

            let top = Strict::addr(state_and_queue) & !STATE_MASK;
            let next = Strict::with_addr(state_and_queue, top) as *const _ ;

            // Create a new parked thread node.
            let node = ParkedThread {
                thread: Cell::new(Some(thread::current())),

                signalled: AtomicBool::new(false),

                // Set the next pointer to be the current
                // head of the linked list.
                next,
            };

            let node_pointer = &node as *const _ as *mut _;

            let exchange = self.data.state_and_queue.compare_exchange(
                state_and_queue,
                node_pointer,
                Ordering::Release,
                Ordering::Relaxed
            );

            // If the state has changed since our initial
            // load, retry pushing to the list.
            if let Err(new_state) = exchange {
                state_and_queue = new_state;
                continue;
            }

            // We have successfully pushed our node to the linked list.

            // Park the thread until we are signalled to break.
            // This is inside a while loop in case other sources
            // wake this thread, but we still need to block.
            while !node.signalled.load(Ordering::Acquire) {
                // todo: can this ever fail and loop indefinitely?
                thread::park();
            }
            // todo: is it guaranteed here that the atomic load of the state
            // will return exploded?

            break;
        }
    }
}

impl<T> Fuse<T> {
    fn new() -> Self {
        Self {
            data: Arc::new(Container::new()),
        }
    }

    /// Ignites the fuse.
    ///
    /// Explodes all [`Bomb`]s associated with this `Fuse`.
    /// Each [`Bomb`] receives `value`.
    ///
    /// Alias to [`light`](#method.light)
    pub fn ignite(self, value: T) -> Flame {
        self.light(value)
    }

    /// Lights the fuse.
    ///
    /// Explodes all [`Bomb`]s associated with this `Fuse`.
    /// Each [`Bomb`] receives `value`.
    pub fn light(self, value: T) -> Flame {
        // SAFETY: We know that there do not exist
        // other references to the value of the cell,
        // since `state`(`_and_queue`) is still not exploded,
        // and therefore `Bomb`s will not attempt to get
        // a reference to the inner cell.
        #[cfg(not(loom))]
        unsafe { *self.data.cell.get() = MaybeUninit::new(value); }
        #[cfg(loom)]
        unsafe { self.data.cell.get_mut().with(|p| { *p = MaybeUninit::new(value); }); }

        // Now we can set the exploded bit to true, and
        // allow other `Bomb`s to get a reference safely.
        // Use Release ordering for the store to ensure that modifying
        // the cell is not ordered after this store.
        let new_state = self.data.state_and_queue.swap(
            sptr::invalid_mut(EXPLODED_STATE),
            Ordering::AcqRel
        );

        // Set the next pointer to be the first node in the linked list.
        let mut next = Strict::with_addr(
            new_state,
            Strict::addr(new_state) & !STATE_MASK
        ) as *const ParkedThread;

        // While we have not yet reached the end of the list...
        while !next.is_null() {
            let current = unsafe { &*next };

            // Set the next pointer for our traversal to be the next
            // pointer of the current node.
            next = current.next;

            // Move the thread into the scope in case
            // the thread wakes up and drops the node
            // before we can call `unpark`.
            let thread = current.thread.take().unwrap();

            // Signal the thread.
            current.signalled.store(true, Ordering::Release);

            // Unpark the thread.
            thread.unpark();
        }

        // At this point it is safe to drop this `Fuse`,
        // since the inner containers will remain in memory
        // due to other `Arc`s existing in the `Bomb`s.

        // Create `Flame` handle.
        Flame {
            state_and_thread: self.data.state_and_queue.clone(),
        }
    }
}

impl Flame {
    /// Checks if the exploded [`Bomb`]s have been extinguished (dropped).
    ///
    /// For [`MultiBomb`]s, this will return `true` as soon as each [`MultiBomb`]
    /// has exploded with the value associated with this `Flame`,
    /// as the inner [`Bomb`] is dropped immediately (and replaced with a new one).
    pub fn extinguished(&self) -> bool {
        self.state_and_thread.load(Ordering::Acquire) as usize & DROPPED_STATE != 0
    }

    /// Blocks the current thread and waits until all exploded [`Bomb`]s
    /// have been extinguished (dropped).
    ///
    /// If all [`Bomb`]s have been dropped already, this method returns immediately,
    /// and does not block.
    ///
    /// For [`MultiBomb`]s, this will return as soon as each [`MultiBomb`]
    /// has exploded with the value associated with this `Flame`,
    /// as the inner [`Bomb`] is dropped immediately (and replaced with a new one).
    pub fn wait_for_extinguish(&self) {
        // Return early if the `Flame` has already extinguished.
        if spin(SPIN_ITERATIONS, || self.extinguished().then_some(())).is_some() { return; }

        // Attempt to block the thread.

        let mut state_and_thread = self.state_and_thread.load(Ordering::Relaxed);

        loop {
            // Check for extinguish.
            // Return if we no longer need to attempt to block.
            if state_and_thread as usize & DROPPED_STATE != 0 {
                return;
            }

            // Bombs have not yet been extinguished.
            // Try to set parked thread node.

            let thread = ParkedThread {
                thread: Cell::new(Some(thread::current())),
                signalled: AtomicBool::new(false),
                next: ptr::null(),
            };

            let thread_pointer = &thread as *const ParkedThread;

            // Transfer old state, and mark as parked.
            let state = Strict::addr(state_and_thread) & STATE_MASK | PARKED_STATE;
            let new_state_and_thread = state | Strict::addr(thread_pointer);
            let new_state_and_thread = Strict::with_addr(
                thread_pointer,
                new_state_and_thread
            );

            let exchange = self.state_and_thread.compare_exchange(
                state_and_thread,
                new_state_and_thread as *mut _,
                Ordering::Release,
                Ordering::Relaxed
            );

            // If the state has changed since our initial
            // load, retry setting the parked thread.
            if let Err(new_state) = exchange {
                state_and_thread = new_state;
                continue;
            }

            // We have successfully set our parked thread node.

            // Park the thread until we are signalled to break.
            // This is inside a while loop in case other sources
            // wake this thread, but we still need to block.
            while !thread.signalled.load(Ordering::Acquire) {
                // todo: can this ever fail and loop indefinitely?
                thread::park();
            }
            // todo: is it guaranteed here that the atomic load of the state
            // will return dropped?
            break;
        }
    }
}

const SPIN_ITERATIONS: usize = 10;

fn spin<T>(iterations: usize, mut f: impl FnMut() -> Option<T>) -> Option<T> {
    for _ in 0..iterations {
        match f() {
            None => spin_loop(),
            opt => return opt,
        }
    }

    None
}