maniac_runtime/sync/multishot/
mod.rs

1//! An async, lock-free, reusable channel for sending single values to
2//! asynchronous tasks.
3//!
4//! In a multi-shot channel, the receiver half is reusable and able to recycle
5//! the sender half without ever re-allocating. Sending, polling and recycling
6//! the sender are all lock-free operations.
7//!
8//! # Example
9//!
10//!
11//! ```
12//! # use pollster;
13//! use std::thread;
14//!
15//! # maniac::block_on(
16//! async {
17//!     let (s, mut r) = multishot::channel();
18//!
19//!     // Send a value to the channel from another thread.
20//!     thread::spawn(move || {
21//!         s.send("42");
22//!     });
23//!
24//!     // Receive the value.
25//!     let res = r.recv().await;
26//!     assert_eq!(res, Ok("42"));
27//!
28//!     // Recycle the sender. This is guaranteed to succeed if the previous
29//!     // message has been read.
30//!     let s = r.sender().unwrap();
31//!
32//!     // Drop the sender on another thread without sending a message.
33//!     thread::spawn(move || {
34//!         drop(s);
35//!     });
36//!
37//!     // Receive an error.
38//!     let res = r.recv().await;
39//!     assert_eq!(res, Err(multishot::RecvError {}));
40//! }
41//! # );
42//! ```
43
44#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
45
46// The implementation of a reusable, lock-free, reallocation-free, one-shot
47// channel is surprisingly tricky. In a regular, non-reusable one-shot channel,
48// it is relatively easy to avoid missed notifications while also avoiding races
49// between waker registration and waker invocation: it is enough for the sender
50// to retrieve the waker only once the presence of a value has been signaled to
51// the receiver, which implicitly signals at the same time the acquisition of a
52// lock on the waker. This means, however, that the receiving side may read the
53// value before the waker is consumed by the sender. This is a problem for a
54// reusable channel as it means that the creation of a new sender after reading
55// a value may block until the previous sender has indeed surrendered
56// exclusivity on the waker.
57//
58// One workaround would be to allocate a new waker if the old sender still holds
59// the lock on the previous waker, but such reallocation would somewhat defeat
60// the idea of a reusable channel. The idea in this implementation is to instead
61// signal the presence of a value only once the waker has been moved out from
62// the waker slot, and then use the moved waker to wake the receiver task. This
63// requires, however, giving the sender exclusivity on the waker before the
64// value is sent. In order not to block a receiver attempting to register a new
65// waker at the same time, the channel uses two waker slots so the receiver can
66// always store a new waker in the unused slot. Missed notifications are avoided
67// by having the sender check the availability of an updated waker before
68// signaling the presence of a value.
69//
70// Despite the relative complexity of the state machine, the implementation is
71// fairly efficient. Polling requires no read-modify-write (RMW) operation if
72// the value is readily available, 1 RMW if this is the first waker update and 2
73// RMWs otherwise. Sending needs 1 RMW if no waker was registered, and typically
74// 2 RMW if one was registered. Compared to a non-reusable one-shot channel such
75// as Tokio's, the only extra cost is 1 RMW in case the waker was updated (which
76// is rare in practice). Also, the implementation of `multishot` partially
77// offsets this extra cost by using arithmetic atomic operations when sending
78// rather than the typically more expensive compare-and-swap operation.
79//
80// Sending, receiving and recycling a sender are lock-free operations; the last
81// two are additionally wait-free.
82//
83// The state of the channel is tracked by the following bit flags:
84//
85// * INDEX [I]: index of the current waker slot (0 or 1)
86// * OPEN [O]: the channel is open, both the receiver and the sender are alive
87// * EMPTY [E]: the meaning of this flag is contextual:
88//    - if OPEN==0: indicates that the value slot is empty
89//    - if OPEN==1: indicates that the redundant waker slot at 1 - INDEX is
90//      empty
91//
92// Summary of possible states (excluding unobservable states)
93//
94// |  I  O  E  | Observer |                  Meaning                    |
95// |-----------|----------|---------------------------------------------|
96// |  x  0  0  |  Sender  |         channel closed by receiver          |
97// |  x  0  0  | Receiver |  channel closed by sender, value awaiting   |
98// |  x  0  1  | Receiver |     channel closed by sender, no value      |
99// |  x  1  0  |   Any    | an updated waker is registered at index !x  |
100// |  x  1  1  |   Any    |     a waker may be registered at index x    |
101
102mod loom_exports;
103
104use std::error::Error;
105use std::fmt;
106use std::future::Future;
107use std::marker::PhantomData;
108use std::mem::{ManuallyDrop, MaybeUninit};
109use std::panic::{RefUnwindSafe, UnwindSafe};
110use std::pin::Pin;
111use std::ptr::{self, NonNull};
112use std::sync::atomic::Ordering;
113use std::task::{Context, Poll, Waker};
114
115use crate::loom_exports::cell::UnsafeCell;
116use crate::loom_exports::sync::atomic::AtomicUsize;
117
118// Note: the order of the flags is NOT arbitrary (see comments in the
119// `Sender::send` method for the rationale).
120//
121// [E] Case O==0: indicates whether the value slot is empty. Case O==1:
122// indicates whether the redundant waker slot at 1 - INDEX is empty.
123const EMPTY: usize = 0b001;
124// [O] Indicates whether the channel is open, i.e. whether both the receiver or
125// the sender are alive.
126const OPEN: usize = 0b010;
127// [I] Index of the current waker (0 or 1).
128const INDEX: usize = 0b100;
129
130/// The shared data of `Receiver` and `Sender`.
131struct Inner<T> {
132    /// A bit field for `INDEX`, `OPEN` and `EMPTY`.
133    state: AtomicUsize,
134    /// The value, if any.
135    value: UnsafeCell<MaybeUninit<T>>,
136    /// Redundant cells for the waker.
137    waker: [UnsafeCell<Option<Waker>>; 2],
138}
139
140impl<T> Inner<T> {
141    // Sets the value without dropping the previous content.
142    //
143    // Safety: the caller must have exclusive access to the value.
144    unsafe fn write_value(&self, t: T) {
145        unsafe {
146            self.value.with_mut(|value| (*value).write(t));
147        }
148    }
149
150    // Reads the value without moving it.
151    //
152    // Safety: the value must be initialized and the caller must have exclusive
153    // access to the value. After the call, the value slot within `Inner` should
154    // be considered uninitialized in order to avoid a double-drop.
155    unsafe fn read_value(&self) -> T {
156        unsafe { self.value.with(|value| (*value).as_ptr().read()) }
157    }
158
159    // Drops the value in place without deallocation.
160    //
161    // Safety: the value must be initialized and the caller must have exclusive
162    // access to the value.
163    unsafe fn drop_value_in_place(&self) {
164        unsafe {
165            self.value
166                .with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr()));
167        }
168    }
169
170    // Sets the waker at index `idx`.
171    //
172    // Safety: the caller must have exclusive access to the waker at index
173    // `idx`.
174    unsafe fn set_waker(&self, idx: usize, new: Option<Waker>) {
175        unsafe {
176            self.waker[idx].with_mut(|waker| (*waker) = new);
177        }
178    }
179
180    // Takes the waker out of the waker slot at index `idx`.
181    //
182    // Safety: the caller must have exclusive access to the waker at index
183    // `idx`.
184    unsafe fn take_waker(&self, idx: usize) -> Option<Waker> {
185        unsafe { self.waker[idx].with_mut(|waker| (*waker).take()) }
186    }
187}
188
189/// Reusable receiver of a multi-shot channel.
190///
191/// A `Receiver` can be used to receive a value using the `Future` returned by
192/// [`recv`](Receiver::recv). It can also produce a one-shot [`Sender`] with the
193/// [`sender`](Receiver::sender) method, provided that there is currently no
194/// live sender.
195///
196/// A receiver can be created with the [`channel`] function or with the
197/// [`new`](Receiver::new) method.
198#[derive(Debug)]
199pub struct Receiver<T> {
200    /// The shared data.
201    inner: NonNull<Inner<T>>,
202    /// Drop checker hint: we may drop an `Inner<T>`.
203    _phantom: PhantomData<Inner<T>>,
204}
205
206impl<T> Receiver<T> {
207    /// Creates a new receiver.
208    pub fn new() -> Self {
209        Self {
210            inner: NonNull::new(Box::into_raw(Box::new(Inner {
211                state: AtomicUsize::new(EMPTY),
212                value: UnsafeCell::new(MaybeUninit::uninit()),
213                waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
214            })))
215            .unwrap(),
216            _phantom: PhantomData,
217        }
218    }
219
220    /// Returns a new sender if there is currently no live sender.
221    ///
222    /// This operation is wait-free. It is guaranteed to succeed (i) on its
223    /// first invocation and (ii) on further invocations if the future returned
224    /// by [`recv`](Receiver::recv) has been `await`ed (i.e. polled to
225    /// completion) after the previous sender was created.
226    pub fn sender(&mut self) -> Option<Sender<T>> {
227        // A sender is created only if no sender is alive.
228        //
229        // Transitions:
230        //
231        // |  I  O  E  |  I  O  E  |
232        // |-----------|-----------|
233        // |  x  0  0  |  0  1  1  | -> Return Some(Sender)
234        // |  x  0  1  |  0  1  1  | -> Return Some(Sender)
235        // |  x  1  1  |  x  1  1  | -> Return None
236        // |  x  1  0  |  x  1  0  | -> Return None
237
238        // Retrieve the current state.
239        //
240        // Ordering: This load synchronizes with the Release store in the
241        // `Sender::send` method to ensure that the value (if any) is visible
242        // and can be safely dropped.
243        //
244        // Safety: it is safe to access `inner` as we did not clear the `OPEN`
245        // flag.
246        let state = unsafe { self.inner.as_ref().state.load(Ordering::Acquire) };
247
248        // Verify that there is no live sender.
249        if state & OPEN == 0 {
250            // Safety: the previous sender (if any) known to be consumed and the
251            // Acquire ordering on the state ensures that all previous memory
252            // operations by the previous sender are visible.
253            Some(unsafe { self.sender_with_waker(state, None) })
254        } else {
255            None
256        }
257    }
258
259    /// Receives a message asynchronously.
260    ///
261    /// If the channel is empty, the future returned by this method waits until
262    /// there is a message. If there is no live sender and no message, the
263    /// future completes and returns an error.
264    pub fn recv(&mut self) -> Recv<'_, T> {
265        Recv { receiver: self }
266    }
267
268    /// Initialize the waker in slot 0, set the state to `OPEN | EMPTY` and
269    /// return a sender.
270    ///
271    /// Safety: `inner` must be allocated. The sender must have been consumed
272    /// and all memory operations by the sender on the value and on the waker
273    /// must be visible in this thread.
274    unsafe fn sender_with_waker(&mut self, state: usize, waker: Option<Waker>) -> Sender<T> {
275        // Only create a sender if there is no live sender.
276        debug_assert!(state & OPEN == 0);
277
278        // If there is an unread value, drop it.
279        if state & EMPTY == 0 {
280            // Safety: the presence of an initialized value was just
281            // checked and there is no risk of race since there is no live
282            // sender.
283            unsafe { self.inner.as_ref().drop_value_in_place() };
284        }
285
286        // Set the waker in slot 0.
287        unsafe { self.inner.as_ref().set_waker(0, waker) };
288
289        // Open the channel and set the current waker slot index to 0.
290        //
291        // Ordering: since the sender is created right now on this thread,
292        // Relaxed ordering is sufficient.
293        unsafe {
294            self.inner
295                .as_ref()
296                .state
297                .store(OPEN | EMPTY, Ordering::Relaxed)
298        };
299
300        Sender {
301            inner: self.inner,
302            _phantom: PhantomData,
303        }
304    }
305}
306
307unsafe impl<T: Send> Send for Receiver<T> {}
308unsafe impl<T: Send> Sync for Receiver<T> {}
309
310impl<T> UnwindSafe for Receiver<T> {}
311impl<T> RefUnwindSafe for Receiver<T> {}
312
313impl<T> Default for Receiver<T> {
314    fn default() -> Self {
315        Self::new()
316    }
317}
318
319impl<T> Drop for Receiver<T> {
320    fn drop(&mut self) {
321        // The drop handler clears the `INDEX`, `OPEN` and `EMPTY` flags. If the
322        // channel was already closed by the sender, it drops the value (if any)
323        // and deallocates `inner`.
324        //
325        // Transitions:
326        //
327        // |  I  O  E  |  I  O  E  |
328        // |-----------|-----------|
329        // |  x  0  1  | unobserv. | -> Deallocate
330        // |  x  0  0  | unobserv. | -> Deallocate
331        // |  x  1  1  |  0  0  0  |
332        // |  x  1  0  |  0  0  0  |
333
334        // Ordering: the value and wakers may need to be dropped prior to
335        // deallocation in case the sender was dropped too, so Acquire ordering
336        // is necessary to synchronize with the Release store in `Sender::send`.
337        // Release ordering is in turn necessary in case the sender is still
338        // alive: it synchronizes with the Acquire operations in either
339        // `Sender::send` or in the drop handler of the sender to ensure that
340        // `inner` can be dropped safely.
341
342        // Safety: it is safe to access `inner` as we have not yet cleared the
343        // `OPEN` flag.
344        let state = unsafe { self.inner.as_ref().state.swap(0, Ordering::AcqRel) };
345
346        // If the sender is alive, let it handle cleanup.
347        if state & OPEN == OPEN {
348            return;
349        }
350
351        // Deallocate the channel since it was closed by the sender.
352        //
353        // Safety: `inner` will no longer be used once deallocated.
354        unsafe {
355            // If there is an unread value, drop it first.
356            if state & EMPTY == 0 {
357                // Safety: the presence of an initialized value was just
358                // checked and there is no live receiver so no risk of race.
359                self.inner.as_ref().drop_value_in_place();
360            }
361
362            // Deallocate inner.
363            drop(Box::from_raw(self.inner.as_ptr()));
364        }
365    }
366}
367
368/// Future returned by [`Receiver::recv()`].
369#[derive(Debug)]
370pub struct Recv<'a, T> {
371    /// The shared data.
372    receiver: &'a mut Receiver<T>,
373}
374
375impl<'a, T> Recv<'a, T> {
376    /// Return `Poll::Ready` with either the value (if any) or an error and
377    /// change the state to `EMPTY`.
378    fn poll_complete(self: Pin<&mut Self>, state: usize) -> Poll<Result<T, RecvError>> {
379        debug_assert!(state & OPEN == 0);
380
381        let ret = if state & EMPTY == 0 {
382            // Safety: the presence of an initialized value was just checked and
383            // there is no live sender so no risk of race. It is safe to access
384            // `inner` since we are now its single owner.
385            let value = unsafe { self.receiver.inner.as_ref().read_value() };
386
387            Ok(value)
388        } else {
389            Err(RecvError {})
390        };
391
392        // Set the state to indicate that the sender has been dropped and the
393        // message (if any) has been moved out.
394        //
395        // Ordering: Relaxed is enough since the sender was dropped and
396        // therefore no other thread can observe the state.
397        //
398        // Safety: It is safe to access `inner` since we are now its single owner.
399        unsafe {
400            self.receiver
401                .inner
402                .as_ref()
403                .state
404                .store(EMPTY, Ordering::Relaxed);
405        }
406
407        Poll::Ready(ret)
408    }
409}
410
411impl<'a, T> Future for Recv<'a, T> {
412    type Output = Result<T, RecvError>;
413
414    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415        // The poll method proceeds in two steps. In the first step, the `EMPTY`
416        // flag is set to make sure that a concurrent sender operation does not
417        // try to access the redundant waker slot while a new waker is being
418        // registered. The `EMPTY` flag is then cleared in Step 2 once the new
419        // waker is registered, checking at the same time whether the sender has
420        // not been consumed while the new waker was being registered. This
421        // check is necessary because if it was consumed, the sender may not
422        // have been able to send a notification (if the current waker slot was
423        // empty) or may have sent one using an outdated waker.
424        //
425        // Transitions:
426        //
427        // Step 1
428        //
429        // |  I  O  E  |  I  O  E  |
430        // |-----------|-----------|
431        // |  x  0  0  |  0  0  1  | -> Return Ready(Message)
432        // |  x  0  1  |  0  0  1  | -> Return Ready(Error)
433        // |  x  1  0  |  x  1  1  | -> Step 2
434        // |  x  1  1  |  x  1  1  | -> Step 2
435        //
436        // Step 2
437        //
438        // |  I  O  E  |  I  O  E  |
439        // |-----------|-----------|
440        // |  x  0  0  |  0  0  1  | -> Return Ready(Error)
441        // |  x  0  1  |  0  0  1  | -> Return Ready(Message)
442        // |  x  1  1  |  x  1  0  | -> Return Pending
443
444        // Fast path: this is an optimization in case the sender has already
445        // been consumed and has closed the channel.
446        //
447        // Safety: It is safe to access `inner` since we did not clear the
448        // `OPEN` flag.
449        let mut state = unsafe { self.receiver.inner.as_ref().state.load(Ordering::Acquire) };
450        if state & OPEN == 0 {
451            return self.poll_complete(state);
452        }
453
454        // Set the `EMPTY` flag to prevent the sender from updating the current
455        // waker slot. This is not necessary if `EMPTY` was already set because
456        // in such case the sender will never try to concurrently access the
457        // redundant waker slot.
458        if state & EMPTY == 0 {
459            // Ordering: Acquire ordering is necessary since some member data may be
460            // read and/or modified after reading the state.
461            //
462            // Safety: it is safe to access `inner` since we did not clear the
463            // `OPEN` flag.
464            unsafe {
465                state = self
466                    .receiver
467                    .inner
468                    .as_ref()
469                    .state
470                    .fetch_or(EMPTY, Ordering::Acquire);
471            }
472
473            // Check whether the sender has closed the channel.
474            if state & OPEN == 0 {
475                return self.poll_complete(state);
476            }
477        }
478
479        // The waker will be stored in the redundant slot.
480        let current_idx = state_to_index(state);
481        let new_idx = 1 - current_idx;
482
483        // Store the new waker.
484        //
485        // Safety: the sender thread never accesses the waker stored in the slot
486        // not pointed to by `INDEX` and it does not modify `INDEX` as long as
487        // the `READY` flag is set. It is safe to access `inner` since we did
488        // not clear the `OPEN` flag.
489        //
490        // Unwind safety: even if `Waker::clone` panics, the state will be
491        // consistent since `OPEN` and `EMPTY` are both set, meaning that the
492        // redundant waker will not be accessed when the receiver is dropped.
493        unsafe {
494            self.receiver
495                .inner
496                .as_ref()
497                .set_waker(new_idx, Some(cx.waker().clone()));
498        }
499
500        // Make the new waker visible to the sender.
501        //
502        // Note: this could (should) be a `fetch_or(!EMPTY)` but `swap` may be
503        // faster on some platforms and the result will only be invalid if the
504        // sender has been in the meantime consumed, in which case the new state
505        // will not be observable anyway.
506        //
507        // Ordering: the waker may have been modified above so Release ordering
508        // is necessary to synchronize with the Acquire load in `Sender::send`
509        // or `Sender::drop`. Acquire ordering is also necessary since the
510        // message may be loaded. It is safe to access `inner` since we did not
511        // clear the `OPEN` flag.
512        let state = unsafe {
513            self.receiver
514                .inner
515                .as_ref()
516                .state
517                .swap(index_to_state(current_idx) | OPEN, Ordering::AcqRel)
518        };
519
520        // It is necessary to check again whether the sender has closed the
521        // channel, because if it did, the sender may not have been able to send
522        // a notification (if the current waker slot was empty) or may have sent
523        // one using an outdated waker.
524        if state & OPEN == 0 {
525            return self.poll_complete(state);
526        }
527
528        Poll::Pending
529    }
530}
531
532/// Single-use sender of a multi-shot channel.
533///
534/// A `Sender` can be created with the [`channel`]  function or by recycling a
535/// previously consumed sender with the [`Receiver::sender`] method.
536#[derive(Debug)]
537pub struct Sender<T> {
538    /// The shared data.
539    inner: NonNull<Inner<T>>,
540    /// Drop checker hint: we may drop an `Inner<T>` and thus a `T`.
541    _phantom: PhantomData<Inner<T>>,
542}
543
544impl<T> Sender<T> {
545    /// Sends a value to the receiver and consume the sender.
546    pub fn send(self, t: T) {
547        // The send method is iterative. At each iteration, the `EMPTY` flag is
548        // checked. If set, the presence of a value is signaled immediately and
549        // the function returns after sending a notification. If the `EMPTY`
550        // flag is cleared, it is set again and the index of the current waker
551        // becomes that of the redundant slot in the next iteration.
552        //
553        // Transitions:
554        //
555        // |  I  O  E  |  I  O  E  |
556        // |-----------|-----------|
557        // |  0  0  0  | unobserv. | -> Deallocate
558        // |  x  1  1  |  x  0  0  | -> End
559        // |  x  1  0  | !x  1  1  | -> Retry
560
561        // It is necessary to make sure that the destructor is not run: since
562        // the `OPEN` flag will be cleared once the value is sent,
563        // `Sender::drop` would otherwise wrongly consider the channel as closed
564        // by the receiver and would deallocate `inner` while the receiver is
565        // alive.
566        let this = ManuallyDrop::new(self);
567
568        // Store the value.
569        //
570        // Note: there is no need to drop a previous value since the
571        // `Receiver::sender` method would have dropped the value if there was
572        // one.
573        //
574        // Safety: no race for accessing the value is possible since there can
575        // be at most one sender alive at a time and the receiver will not read
576        // the value before the `OPEN` flag is cleared. It is safe to access
577        // `inner` since we did not clear the `OPEN` flag.
578        unsafe { this.inner.as_ref().write_value(t) };
579
580        // Retrieve the index of the current waker.
581        //
582        // Ordering: Relaxed ordering is sufficient since only the sender can
583        // modify `INDEX`.
584        //
585        // Safety: it is safe to access `inner` since we did not clear the
586        // `OPEN` flag.
587        let mut idx = state_to_index(unsafe { this.inner.as_ref().state.load(Ordering::Relaxed) });
588
589        loop {
590            // Take the current waker.
591            //
592            // Safety: the receiver thread never accesses the waker stored at
593            // `INDEX`. It is safe to access `inner` since we did not
594            // clear the `OPEN` flag.
595            let waker = unsafe { this.inner.as_ref().take_waker(idx) };
596
597            // Rather than a Compare-And-Swap, a `fetch_sub` operation is used
598            // as it is faster on many platforms. The specific order of the
599            // flags and the wrapping underflow behavior of `fetch_sub` are
600            // exploited to implement the transition table.
601            //
602            // Ordering: Acquire is necessary to synchronize with the Release
603            // store in the `Receiver::poll` method in case an updated waker
604            // needs to be taken, or with the Acquire operation in the drop
605            // handler of the receiver in case the channel was closed and
606            // `inner` needs to be dropped. Release is in turn necessary to
607            // ensure the visibility of both (i) the value and (ii) the
608            // consumption of the waker in the previous loop iteration (if any).
609            // The Release synchronizes with the Acquire load of the state in
610            // the receiver `poll` and `sender` methods.
611            //
612            // Safety: it is safe to access `inner` since we did not clear the
613            // `OPEN` flag.
614            let state = unsafe {
615                this.inner
616                    .as_ref()
617                    .state
618                    .fetch_sub(OPEN | EMPTY, Ordering::AcqRel)
619            };
620
621            // Deallocate the channel if closed.
622            //
623            // Safety: it is safe to access `inner` since we did not clear the
624            // `OPEN` flag. `inner` will no longer be used once deallocated. In
625            // particular, the sender destructor will not be called.
626            unsafe {
627                if state & OPEN == 0 {
628                    // Safety: a value was just written and there is no live
629                    // receiver so no risk of a race.
630                    this.inner.as_ref().drop_value_in_place();
631                    drop(Box::from_raw(this.inner.as_ptr()));
632                    return;
633                }
634            }
635
636            // If the waker was not updated, notify the receiver and return.
637            if state & EMPTY == EMPTY {
638                // Unwind safety: the state has already been updated, so
639                // panicking in `Waker::wake` is OK.
640                if let Some(waker) = waker {
641                    waker.wake()
642                }
643                return;
644            }
645
646            // Update the local waker index to the current value of `INDEX`.
647            idx = 1 - idx;
648        }
649    }
650}
651
652unsafe impl<T: Send> Send for Sender<T> {}
653unsafe impl<T: Send> Sync for Sender<T> {}
654
655impl<T> UnwindSafe for Sender<T> {}
656impl<T> RefUnwindSafe for Sender<T> {}
657
658impl<T> Drop for Sender<T> {
659    fn drop(&mut self) {
660        // The drop handler is iterative. At each iteration, the `EMPTY` flag is
661        // checked. If set, the closure of the channel is signaled immediately
662        // and the function returns after sending a notification. If the `EMPTY`
663        // flag is cleared, it is set and the index of the current waker becomes
664        // that of the redundant slot in the next iteration.
665        //
666        // Transitions:
667        //
668        // |  I  O  E  |  I  O  E  |
669        // |-----------|-----------|
670        // |  x  0  0  | unobserv. | -> Deallocate
671        // |  x  1  1  |  0  0  1  | -> End
672        // |  x  1  0  | !x  1  1  | -> Retry
673
674        // Retrieve the state and the current index.
675        //
676        // Ordering: Relaxed ordering is sufficient since only the sender can
677        // modify `INDEX`.
678        let mut state = unsafe { self.inner.as_ref().state.load(Ordering::Relaxed) };
679        let mut idx = state_to_index(state);
680
681        loop {
682            // Take the current waker.
683            //
684            // Safety: the receiver thread never accesses the waker stored at
685            // `INDEX`. It is safe to access `inner` since we did not clear the
686            // `OPEN` flag.
687            let waker = unsafe { self.inner.as_ref().take_waker(idx) };
688
689            loop {
690                // Modify the state according to the transition table.
691                let new_state = if state & EMPTY == EMPTY {
692                    EMPTY
693                } else {
694                    state ^ (EMPTY | INDEX)
695                };
696
697                // Ordering: Acquire is necessary to synchronize with the
698                // Release store in the `Receiver::poll` method in case an
699                // updated waker needs to be taken or with the Acquire operation
700                // in the drop handler of the receiver in case the channel was
701                // closed and `inner` needs to be dropped. Release is in turn
702                // necessary to ensure the visibility of the consumption of the
703                // waker in the previous loop iteration (if any). The Release
704                // synchronizes with the Acquire load of the state in the
705                // receiver `poll` and `sender` methods.
706                //
707                // Safety: it is safe to access `inner` since we have not yet
708                // cleared the `OPEN` flag.
709                unsafe {
710                    match self.inner.as_ref().state.compare_exchange_weak(
711                        state,
712                        new_state,
713                        Ordering::AcqRel,
714                        Ordering::Relaxed,
715                    ) {
716                        Ok(s) => {
717                            state = s;
718                            break;
719                        }
720                        Err(s) => state = s,
721                    }
722                }
723            }
724
725            // Deallocate the channel if it was already closed by the receiver.
726            //
727            // Safety: `inner` will no longer be used once deallocated.
728            unsafe {
729                if state & OPEN == 0 {
730                    drop(Box::from_raw(self.inner.as_ptr()));
731                    return;
732                }
733            }
734
735            // If the waker was not updated, notify the receiver and return.
736            if state & EMPTY == EMPTY {
737                // Unwind safety: the state has already been updated, so
738                // panicking in `Waker::wake` is OK.
739                if let Some(waker) = waker {
740                    waker.wake()
741                }
742                return;
743            }
744
745            // Update the local waker index to the current value of `INDEX`.
746            idx = 1 - idx;
747        }
748    }
749}
750
751/// Error signaling that the sender was dropped without sending a value.
752#[derive(Debug, PartialEq, Eq, Clone, Copy)]
753pub struct RecvError {}
754
755impl fmt::Display for RecvError {
756    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
757        write!(fmt, "channel closed")
758    }
759}
760
761impl Error for RecvError {}
762
763/// Creates a new multi-shot channel.
764pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
765    let mut receiver = Receiver::new();
766    let sender = receiver.sender().unwrap();
767
768    (sender, receiver)
769}
770
771fn state_to_index(state: usize) -> usize {
772    (state & INDEX) >> 2
773}
774fn index_to_state(index: usize) -> usize {
775    index << 2
776}
777
778#[cfg(all(test, not(multishot_loom)))]
779mod tests {
780    use super::*;
781
782    use std::sync::Arc;
783    use std::task::Wake;
784    use std::thread;
785
786    // Dumb waker counting notifications.
787    struct TestWaker {
788        count: AtomicUsize,
789    }
790    impl TestWaker {
791        fn new() -> Self {
792            Self {
793                count: AtomicUsize::new(0),
794            }
795        }
796        fn take_count(&self) -> usize {
797            self.count.swap(0, Ordering::Acquire)
798        }
799    }
800    impl Wake for TestWaker {
801        fn wake(self: Arc<Self>) {
802            self.count.fetch_add(1, Ordering::Release);
803        }
804    }
805
806    // Executes a closure consuming the sender and checks the result of the
807    // completed future.
808    fn multishot_notify_single_threaded<F>(f: F, expect: Result<i32, RecvError>)
809    where
810        F: FnOnce(Sender<Box<i32>>) + Send + Copy + 'static,
811    {
812        let test_waker = Arc::new(TestWaker::new());
813        let waker = test_waker.clone().into();
814        let mut cx = Context::from_waker(&waker);
815        let mut receiver: Receiver<Box<i32>> = Receiver::new();
816
817        // Consume sender before polling.
818        {
819            let sender = receiver.sender().expect("could not create sender");
820            let mut fut = receiver.recv();
821            let mut fut = Pin::new(&mut fut);
822
823            f(sender);
824
825            let res = fut.as_mut().poll(&mut cx);
826            assert_eq!(test_waker.take_count(), 0);
827            assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
828        }
829
830        // Consume sender after polling.
831        {
832            let sender = receiver.sender().expect("could not create sender");
833            let mut fut = receiver.recv();
834            let mut fut = Pin::new(&mut fut);
835
836            let res = fut.as_mut().poll(&mut cx);
837            assert_eq!(res, Poll::Pending);
838            f(sender);
839            assert_eq!(test_waker.take_count(), 1);
840            let res = fut.as_mut().poll(&mut cx);
841            assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
842        }
843    }
844
845    #[test]
846    /// Sends a message.
847    fn multishot_send_notify_single_threaded() {
848        multishot_notify_single_threaded(|sender| sender.send(Box::new(42)), Ok(42));
849    }
850    #[test]
851    /// Drops the sender.
852    fn multishot_drop_notify_single_threaded() {
853        multishot_notify_single_threaded(|sender| drop(sender), Err(RecvError {}));
854    }
855
856    // Changes the waker before executing a closure consuming the sender.
857    fn multishot_change_waker_single_threaded<F>(f: F, expect: Result<i32, RecvError>)
858    where
859        F: FnOnce(Sender<Box<i32>>) + Send + Copy + 'static,
860    {
861        let test_waker1 = Arc::new(TestWaker::new());
862        let waker1 = test_waker1.clone().into();
863        let mut cx1 = Context::from_waker(&waker1);
864        let test_waker2 = Arc::new(TestWaker::new());
865        let waker2 = test_waker2.clone().into();
866        let mut cx2 = Context::from_waker(&waker2);
867        let test_waker3 = Arc::new(TestWaker::new());
868        let waker3 = test_waker3.clone().into();
869        let mut cx3 = Context::from_waker(&waker3);
870
871        // Change waker and consume sender.
872        {
873            let (sender, mut receiver) = channel::<Box<i32>>();
874            let mut fut = receiver.recv();
875            let mut fut = Pin::new(&mut fut);
876
877            let res = fut.as_mut().poll(&mut cx1);
878            assert_eq!(res, Poll::Pending);
879            let res = fut.as_mut().poll(&mut cx2);
880            assert_eq!(res, Poll::Pending);
881            f(sender);
882            assert_eq!(test_waker2.take_count(), 1);
883            let res = fut.as_mut().poll(&mut cx1);
884            assert_eq!(test_waker1.take_count(), 0);
885            assert_eq!(test_waker2.take_count(), 0);
886            assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
887        }
888
889        // Change waker twice and consume sender.
890        {
891            let (sender, mut receiver) = channel::<Box<i32>>();
892            let mut fut = receiver.recv();
893            let mut fut = Pin::new(&mut fut);
894
895            let res = fut.as_mut().poll(&mut cx1);
896            assert_eq!(res, Poll::Pending);
897            let res = fut.as_mut().poll(&mut cx2);
898            assert_eq!(res, Poll::Pending);
899            let res = fut.as_mut().poll(&mut cx3);
900            assert_eq!(res, Poll::Pending);
901            f(sender);
902            assert_eq!(test_waker3.take_count(), 1);
903            let res = fut.as_mut().poll(&mut cx2);
904            assert_eq!(test_waker1.take_count(), 0);
905            assert_eq!(test_waker2.take_count(), 0);
906            assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
907        }
908    }
909
910    #[test]
911    /// Sends a message after changing the waker.
912    fn multishot_send_change_waker_single_threaded() {
913        multishot_change_waker_single_threaded(|sender| sender.send(Box::new(42)), Ok(42));
914    }
915    #[test]
916    /// Drops the sender after changing the waker.
917    fn multishot_drop_change_waker_single_threaded() {
918        multishot_change_waker_single_threaded(|sender| drop(sender), Err(RecvError {}));
919    }
920
921    // Executes a closure consuming the sender on a separate thread and checks
922    // the result of the completed future.
923    fn multishot_notify_multi_threaded<F>(f: F, expect: Result<i32, RecvError>)
924    where
925        F: FnOnce(Sender<Box<i32>>) + Send + Copy + 'static,
926    {
927        let test_waker = Arc::new(TestWaker::new());
928        let waker = test_waker.clone().into();
929        let mut cx = Context::from_waker(&waker);
930        let mut receiver: Receiver<Box<i32>> = Receiver::new();
931
932        let sender = receiver.sender().expect("could not create sender");
933        let mut fut = receiver.recv();
934        let mut fut = Pin::new(&mut fut);
935
936        let th = thread::spawn(move || f(sender));
937
938        let res = fut.as_mut().poll(&mut cx);
939
940        th.join().unwrap();
941
942        match res {
943            Poll::Pending => {
944                assert_eq!(test_waker.take_count(), 1);
945                assert_eq!(
946                    fut.as_mut().poll(&mut cx).map_ok(|v| *v),
947                    Poll::Ready(expect)
948                );
949            }
950            Poll::Ready(res) => assert_eq!(res.map(|v| *v), expect),
951        }
952    }
953
954    #[test]
955    /// Sends a message from another thread.
956    fn multishot_send_notify_multi_threaded() {
957        multishot_notify_multi_threaded(|sender| sender.send(Box::new(42)), Ok(42));
958    }
959    #[test]
960    /// Drops the sender on another thread.
961    fn multishot_drop_notify_multi_threaded() {
962        multishot_notify_multi_threaded(|sender| drop(sender), Err(RecvError {}));
963    }
964
965    #[test]
966    // Drop both the sender and receiver concurrently. This test is mainly meant
967    // for MIRI.
968    fn multishot_drop_both_multi_threaded() {
969        let mut receiver: Receiver<Box<i32>> = Receiver::new();
970
971        let sender = receiver.sender().expect("could not create sender");
972
973        let th = thread::spawn(move || drop(sender));
974        drop(receiver);
975
976        th.join().unwrap();
977    }
978
979    #[test]
980    // Consume the sender and drop the receiver concurrently. This test is
981    // mainly meant for MIRI.
982    fn multishot_send_and_drop_multi_threaded() {
983        let mut receiver: Receiver<Box<i32>> = Receiver::new();
984
985        let sender = receiver.sender().expect("could not create sender");
986
987        let th = thread::spawn(move || sender.send(Box::new(123)));
988        drop(receiver);
989
990        th.join().unwrap();
991    }
992}
993
994#[cfg(all(test, multishot_loom))]
995mod tests {
996    use super::*;
997
998    use std::future::Future;
999    use std::sync::Arc;
1000    use std::task::{Context, Poll, Wake};
1001
1002    use loom::sync::atomic::{AtomicBool, AtomicUsize};
1003    use loom::thread;
1004
1005    // Dumb waker counting notifications.
1006    struct TestWaker {
1007        count: AtomicUsize,
1008    }
1009    impl TestWaker {
1010        fn new() -> Self {
1011            Self {
1012                count: AtomicUsize::new(0),
1013            }
1014        }
1015        fn take_count(&self) -> usize {
1016            self.count.swap(0, Ordering::Acquire)
1017        }
1018    }
1019    impl Wake for TestWaker {
1020        fn wake(self: Arc<Self>) {
1021            self.count.fetch_add(1, Ordering::Release);
1022        }
1023    }
1024
1025    // Executes a closure consuming the sender and checks the result of the
1026    // completed future.
1027    fn multishot_loom_notify<F>(f: F, expect: Result<i32, RecvError>)
1028    where
1029        F: FnOnce(Sender<i32>) + Send + Sync + Copy + 'static,
1030    {
1031        loom::model(move || {
1032            let test_waker = Arc::new(TestWaker::new());
1033            let waker = test_waker.clone().into();
1034            let mut cx = Context::from_waker(&waker);
1035
1036            let (sender, mut receiver) = channel();
1037
1038            let has_message = Arc::new(AtomicBool::new(false));
1039            thread::spawn({
1040                let has_message = has_message.clone();
1041                move || {
1042                    f(sender);
1043                    has_message.store(true, Ordering::Release);
1044                }
1045            });
1046
1047            let mut fut = receiver.recv();
1048            let mut fut = Pin::new(&mut fut);
1049
1050            let res = fut.as_mut().poll(&mut cx);
1051
1052            match res {
1053                Poll::Pending => {
1054                    let msg = has_message.load(Ordering::Acquire);
1055                    let event_count = test_waker.take_count();
1056                    if event_count == 0 {
1057                        // Make sure that if the waker was not notified, then no
1058                        // message was sent (or equivalently, if a message was sent,
1059                        // the waker was notified).
1060                        assert_eq!(msg, false);
1061                    } else {
1062                        assert_eq!(event_count, 1);
1063                        // Make sure that if the waker was notified, the message
1064                        // can be retrieved (this is crucial to ensure that
1065                        // notifications are not lost).
1066                        let res = fut.as_mut().poll(&mut cx);
1067                        assert_eq!(test_waker.take_count(), 0);
1068                        assert_eq!(res, Poll::Ready(expect));
1069                    }
1070                }
1071                Poll::Ready(val) => {
1072                    assert_eq!(val, expect);
1073                }
1074            }
1075        });
1076    }
1077
1078    // Executes a closure consuming the sender and checks the result of the
1079    // completed future, changing the waker several times.
1080    fn multishot_loom_change_waker<F>(f: F, expect: Result<i32, RecvError>)
1081    where
1082        F: FnOnce(Sender<i32>) + Send + Sync + Copy + 'static,
1083    {
1084        loom::model(move || {
1085            let test_waker1 = Arc::new(TestWaker::new());
1086            let waker1 = test_waker1.clone().into();
1087            let mut cx1 = Context::from_waker(&waker1);
1088
1089            let test_waker2 = Arc::new(TestWaker::new());
1090            let waker2 = test_waker2.clone().into();
1091            let mut cx2 = Context::from_waker(&waker2);
1092
1093            let (sender, mut receiver) = channel();
1094
1095            thread::spawn({
1096                move || {
1097                    f(sender);
1098                }
1099            });
1100
1101            let mut fut = receiver.recv();
1102            let mut fut = Pin::new(&mut fut);
1103
1104            // Attempt to poll the future to completion with the provided context.
1105            fn try_complete(
1106                fut: &mut Pin<&mut Recv<i32>>,
1107                cx: &mut Context,
1108                other_cx: &mut Context,
1109                test_waker: &TestWaker,
1110                other_test_waker: &TestWaker,
1111                expect: Result<i32, RecvError>,
1112            ) -> bool {
1113                let res = fut.as_mut().poll(cx);
1114
1115                // If `Ready` is returned we are done.
1116                if let Poll::Ready(val) = res {
1117                    assert_eq!(val, expect);
1118                    return true;
1119                }
1120
1121                // The sender should not have used the other waker even if it
1122                // was registered before the last call to `poll`.
1123                assert_eq!(other_test_waker.take_count(), 0);
1124
1125                // Although the last call to `poll` has returned `Pending`, the
1126                // sender may have been consumed in the meantime so check
1127                // whether there is a notification.
1128                let event_count = test_waker.take_count();
1129                if event_count != 0 {
1130                    // Expect only one notification.
1131                    assert_eq!(event_count, 1);
1132
1133                    // Since the task was notified it is expected that the
1134                    // future is now ready.
1135                    let res = fut.as_mut().poll(other_cx);
1136                    assert_eq!(test_waker.take_count(), 0);
1137                    assert_eq!(other_test_waker.take_count(), 0);
1138                    assert_eq!(res, Poll::Ready(expect));
1139                    return true;
1140                }
1141
1142                // The future was not polled to completion.
1143                false
1144            }
1145
1146            // Poll with cx1.
1147            if try_complete(
1148                &mut fut,
1149                &mut cx1,
1150                &mut cx2,
1151                &test_waker1,
1152                &test_waker2,
1153                expect,
1154            ) {
1155                return;
1156            }
1157            // Poll with cx2.
1158            if try_complete(
1159                &mut fut,
1160                &mut cx2,
1161                &mut cx1,
1162                &test_waker2,
1163                &test_waker1,
1164                expect,
1165            ) {
1166                return;
1167            }
1168            // Poll again with cx1.
1169            if try_complete(
1170                &mut fut,
1171                &mut cx1,
1172                &mut cx2,
1173                &test_waker1,
1174                &test_waker2,
1175                expect,
1176            ) {
1177                return;
1178            }
1179        });
1180    }
1181
1182    // Executes a closure consuming the sender and attempts to reuse the
1183    // channel.
1184    fn multishot_loom_recycle<F>(f: F)
1185    where
1186        F: FnOnce(Sender<i32>) + Send + Sync + Copy + 'static,
1187    {
1188        loom::model(move || {
1189            let test_waker = Arc::new(TestWaker::new());
1190            let waker = test_waker.clone().into();
1191            let mut cx = Context::from_waker(&waker);
1192
1193            let (sender, mut receiver) = channel();
1194
1195            {
1196                thread::spawn({
1197                    move || {
1198                        f(sender);
1199                    }
1200                });
1201
1202                let mut fut = receiver.recv();
1203                let mut fut = Pin::new(&mut fut);
1204
1205                // Poll up to twice.
1206                let res = fut.as_mut().poll(&mut cx);
1207                if res == Poll::Pending {
1208                    let res = fut.as_mut().poll(&mut cx);
1209                    if res == Poll::Pending {
1210                        return;
1211                    }
1212                }
1213            }
1214
1215            // The future was polled to completion, meaning that the sender was
1216            // consumed and should be immediately recyclable.
1217            let sender = receiver
1218                .sender()
1219                .expect("Could not recycle the sender after it was consumed");
1220
1221            // It's all downhill from here, just make sure the recycled sender
1222            // works correctly.
1223            {
1224                thread::spawn({
1225                    move || {
1226                        sender.send(13);
1227                    }
1228                });
1229
1230                let mut fut = receiver.recv();
1231                let mut fut = Pin::new(&mut fut);
1232
1233                let res = fut.as_mut().poll(&mut cx);
1234                if let Poll::Ready(val) = res {
1235                    assert_eq!(val, Ok(13));
1236                }
1237            }
1238        });
1239    }
1240
1241    #[test]
1242    /// Sends a message.
1243    fn multishot_loom_send_notify() {
1244        multishot_loom_notify(|sender| sender.send(42), Ok(42));
1245    }
1246    #[test]
1247    /// Drops the sender.
1248    fn multishot_loom_drop_notify() {
1249        multishot_loom_notify(|sender| drop(sender), Err(RecvError {}));
1250    }
1251    #[test]
1252    /// Changes the waker while sending a message.
1253    fn multishot_loom_send_change_waker() {
1254        multishot_loom_change_waker(|sender| sender.send(42), Ok(42));
1255    }
1256    #[test]
1257    /// Changes the waker while dropping the sender.
1258    fn multishot_loom_drop_change_waker() {
1259        multishot_loom_change_waker(|sender| drop(sender), Err(RecvError {}));
1260    }
1261    #[test]
1262    /// Recycles the sender after sending a message.
1263    fn multishot_loom_send_recycle() {
1264        multishot_loom_recycle(|sender| sender.send(42));
1265    }
1266    #[test]
1267    /// Recycles the sender after dropping the previous sender.
1268    fn multishot_loom_drop_recycle() {
1269        multishot_loom_recycle(|sender| drop(sender));
1270    }
1271}