maniac_runtime/sync/multishot/mod.rs
1//! An async, lock-free, reusable channel for sending single values to
2//! asynchronous tasks.
3//!
4//! In a multi-shot channel, the receiver half is reusable and able to recycle
5//! the sender half without ever re-allocating. Sending, polling and recycling
6//! the sender are all lock-free operations.
7//!
8//! # Example
9//!
10//!
11//! ```
12//! # use pollster;
13//! use std::thread;
14//!
15//! # maniac::block_on(
16//! async {
17//! let (s, mut r) = multishot::channel();
18//!
19//! // Send a value to the channel from another thread.
20//! thread::spawn(move || {
21//! s.send("42");
22//! });
23//!
24//! // Receive the value.
25//! let res = r.recv().await;
26//! assert_eq!(res, Ok("42"));
27//!
28//! // Recycle the sender. This is guaranteed to succeed if the previous
29//! // message has been read.
30//! let s = r.sender().unwrap();
31//!
32//! // Drop the sender on another thread without sending a message.
33//! thread::spawn(move || {
34//! drop(s);
35//! });
36//!
37//! // Receive an error.
38//! let res = r.recv().await;
39//! assert_eq!(res, Err(multishot::RecvError {}));
40//! }
41//! # );
42//! ```
43
44#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
45
46// The implementation of a reusable, lock-free, reallocation-free, one-shot
47// channel is surprisingly tricky. In a regular, non-reusable one-shot channel,
48// it is relatively easy to avoid missed notifications while also avoiding races
49// between waker registration and waker invocation: it is enough for the sender
50// to retrieve the waker only once the presence of a value has been signaled to
51// the receiver, which implicitly signals at the same time the acquisition of a
52// lock on the waker. This means, however, that the receiving side may read the
53// value before the waker is consumed by the sender. This is a problem for a
54// reusable channel as it means that the creation of a new sender after reading
55// a value may block until the previous sender has indeed surrendered
56// exclusivity on the waker.
57//
58// One workaround would be to allocate a new waker if the old sender still holds
59// the lock on the previous waker, but such reallocation would somewhat defeat
60// the idea of a reusable channel. The idea in this implementation is to instead
61// signal the presence of a value only once the waker has been moved out from
62// the waker slot, and then use the moved waker to wake the receiver task. This
63// requires, however, giving the sender exclusivity on the waker before the
64// value is sent. In order not to block a receiver attempting to register a new
65// waker at the same time, the channel uses two waker slots so the receiver can
66// always store a new waker in the unused slot. Missed notifications are avoided
67// by having the sender check the availability of an updated waker before
68// signaling the presence of a value.
69//
70// Despite the relative complexity of the state machine, the implementation is
71// fairly efficient. Polling requires no read-modify-write (RMW) operation if
72// the value is readily available, 1 RMW if this is the first waker update and 2
73// RMWs otherwise. Sending needs 1 RMW if no waker was registered, and typically
74// 2 RMW if one was registered. Compared to a non-reusable one-shot channel such
75// as Tokio's, the only extra cost is 1 RMW in case the waker was updated (which
76// is rare in practice). Also, the implementation of `multishot` partially
77// offsets this extra cost by using arithmetic atomic operations when sending
78// rather than the typically more expensive compare-and-swap operation.
79//
80// Sending, receiving and recycling a sender are lock-free operations; the last
81// two are additionally wait-free.
82//
83// The state of the channel is tracked by the following bit flags:
84//
85// * INDEX [I]: index of the current waker slot (0 or 1)
86// * OPEN [O]: the channel is open, both the receiver and the sender are alive
87// * EMPTY [E]: the meaning of this flag is contextual:
88// - if OPEN==0: indicates that the value slot is empty
89// - if OPEN==1: indicates that the redundant waker slot at 1 - INDEX is
90// empty
91//
92// Summary of possible states (excluding unobservable states)
93//
94// | I O E | Observer | Meaning |
95// |-----------|----------|---------------------------------------------|
96// | x 0 0 | Sender | channel closed by receiver |
97// | x 0 0 | Receiver | channel closed by sender, value awaiting |
98// | x 0 1 | Receiver | channel closed by sender, no value |
99// | x 1 0 | Any | an updated waker is registered at index !x |
100// | x 1 1 | Any | a waker may be registered at index x |
101
102mod loom_exports;
103
104use std::error::Error;
105use std::fmt;
106use std::future::Future;
107use std::marker::PhantomData;
108use std::mem::{ManuallyDrop, MaybeUninit};
109use std::panic::{RefUnwindSafe, UnwindSafe};
110use std::pin::Pin;
111use std::ptr::{self, NonNull};
112use std::sync::atomic::Ordering;
113use std::task::{Context, Poll, Waker};
114
115use crate::loom_exports::cell::UnsafeCell;
116use crate::loom_exports::sync::atomic::AtomicUsize;
117
118// Note: the order of the flags is NOT arbitrary (see comments in the
119// `Sender::send` method for the rationale).
120//
121// [E] Case O==0: indicates whether the value slot is empty. Case O==1:
122// indicates whether the redundant waker slot at 1 - INDEX is empty.
123const EMPTY: usize = 0b001;
124// [O] Indicates whether the channel is open, i.e. whether both the receiver or
125// the sender are alive.
126const OPEN: usize = 0b010;
127// [I] Index of the current waker (0 or 1).
128const INDEX: usize = 0b100;
129
130/// The shared data of `Receiver` and `Sender`.
131struct Inner<T> {
132 /// A bit field for `INDEX`, `OPEN` and `EMPTY`.
133 state: AtomicUsize,
134 /// The value, if any.
135 value: UnsafeCell<MaybeUninit<T>>,
136 /// Redundant cells for the waker.
137 waker: [UnsafeCell<Option<Waker>>; 2],
138}
139
140impl<T> Inner<T> {
141 // Sets the value without dropping the previous content.
142 //
143 // Safety: the caller must have exclusive access to the value.
144 unsafe fn write_value(&self, t: T) {
145 unsafe {
146 self.value.with_mut(|value| (*value).write(t));
147 }
148 }
149
150 // Reads the value without moving it.
151 //
152 // Safety: the value must be initialized and the caller must have exclusive
153 // access to the value. After the call, the value slot within `Inner` should
154 // be considered uninitialized in order to avoid a double-drop.
155 unsafe fn read_value(&self) -> T {
156 unsafe { self.value.with(|value| (*value).as_ptr().read()) }
157 }
158
159 // Drops the value in place without deallocation.
160 //
161 // Safety: the value must be initialized and the caller must have exclusive
162 // access to the value.
163 unsafe fn drop_value_in_place(&self) {
164 unsafe {
165 self.value
166 .with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr()));
167 }
168 }
169
170 // Sets the waker at index `idx`.
171 //
172 // Safety: the caller must have exclusive access to the waker at index
173 // `idx`.
174 unsafe fn set_waker(&self, idx: usize, new: Option<Waker>) {
175 unsafe {
176 self.waker[idx].with_mut(|waker| (*waker) = new);
177 }
178 }
179
180 // Takes the waker out of the waker slot at index `idx`.
181 //
182 // Safety: the caller must have exclusive access to the waker at index
183 // `idx`.
184 unsafe fn take_waker(&self, idx: usize) -> Option<Waker> {
185 unsafe { self.waker[idx].with_mut(|waker| (*waker).take()) }
186 }
187}
188
189/// Reusable receiver of a multi-shot channel.
190///
191/// A `Receiver` can be used to receive a value using the `Future` returned by
192/// [`recv`](Receiver::recv). It can also produce a one-shot [`Sender`] with the
193/// [`sender`](Receiver::sender) method, provided that there is currently no
194/// live sender.
195///
196/// A receiver can be created with the [`channel`] function or with the
197/// [`new`](Receiver::new) method.
198#[derive(Debug)]
199pub struct Receiver<T> {
200 /// The shared data.
201 inner: NonNull<Inner<T>>,
202 /// Drop checker hint: we may drop an `Inner<T>`.
203 _phantom: PhantomData<Inner<T>>,
204}
205
206impl<T> Receiver<T> {
207 /// Creates a new receiver.
208 pub fn new() -> Self {
209 Self {
210 inner: NonNull::new(Box::into_raw(Box::new(Inner {
211 state: AtomicUsize::new(EMPTY),
212 value: UnsafeCell::new(MaybeUninit::uninit()),
213 waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
214 })))
215 .unwrap(),
216 _phantom: PhantomData,
217 }
218 }
219
220 /// Returns a new sender if there is currently no live sender.
221 ///
222 /// This operation is wait-free. It is guaranteed to succeed (i) on its
223 /// first invocation and (ii) on further invocations if the future returned
224 /// by [`recv`](Receiver::recv) has been `await`ed (i.e. polled to
225 /// completion) after the previous sender was created.
226 pub fn sender(&mut self) -> Option<Sender<T>> {
227 // A sender is created only if no sender is alive.
228 //
229 // Transitions:
230 //
231 // | I O E | I O E |
232 // |-----------|-----------|
233 // | x 0 0 | 0 1 1 | -> Return Some(Sender)
234 // | x 0 1 | 0 1 1 | -> Return Some(Sender)
235 // | x 1 1 | x 1 1 | -> Return None
236 // | x 1 0 | x 1 0 | -> Return None
237
238 // Retrieve the current state.
239 //
240 // Ordering: This load synchronizes with the Release store in the
241 // `Sender::send` method to ensure that the value (if any) is visible
242 // and can be safely dropped.
243 //
244 // Safety: it is safe to access `inner` as we did not clear the `OPEN`
245 // flag.
246 let state = unsafe { self.inner.as_ref().state.load(Ordering::Acquire) };
247
248 // Verify that there is no live sender.
249 if state & OPEN == 0 {
250 // Safety: the previous sender (if any) known to be consumed and the
251 // Acquire ordering on the state ensures that all previous memory
252 // operations by the previous sender are visible.
253 Some(unsafe { self.sender_with_waker(state, None) })
254 } else {
255 None
256 }
257 }
258
259 /// Receives a message asynchronously.
260 ///
261 /// If the channel is empty, the future returned by this method waits until
262 /// there is a message. If there is no live sender and no message, the
263 /// future completes and returns an error.
264 pub fn recv(&mut self) -> Recv<'_, T> {
265 Recv { receiver: self }
266 }
267
268 /// Initialize the waker in slot 0, set the state to `OPEN | EMPTY` and
269 /// return a sender.
270 ///
271 /// Safety: `inner` must be allocated. The sender must have been consumed
272 /// and all memory operations by the sender on the value and on the waker
273 /// must be visible in this thread.
274 unsafe fn sender_with_waker(&mut self, state: usize, waker: Option<Waker>) -> Sender<T> {
275 // Only create a sender if there is no live sender.
276 debug_assert!(state & OPEN == 0);
277
278 // If there is an unread value, drop it.
279 if state & EMPTY == 0 {
280 // Safety: the presence of an initialized value was just
281 // checked and there is no risk of race since there is no live
282 // sender.
283 unsafe { self.inner.as_ref().drop_value_in_place() };
284 }
285
286 // Set the waker in slot 0.
287 unsafe { self.inner.as_ref().set_waker(0, waker) };
288
289 // Open the channel and set the current waker slot index to 0.
290 //
291 // Ordering: since the sender is created right now on this thread,
292 // Relaxed ordering is sufficient.
293 unsafe {
294 self.inner
295 .as_ref()
296 .state
297 .store(OPEN | EMPTY, Ordering::Relaxed)
298 };
299
300 Sender {
301 inner: self.inner,
302 _phantom: PhantomData,
303 }
304 }
305}
306
307unsafe impl<T: Send> Send for Receiver<T> {}
308unsafe impl<T: Send> Sync for Receiver<T> {}
309
310impl<T> UnwindSafe for Receiver<T> {}
311impl<T> RefUnwindSafe for Receiver<T> {}
312
313impl<T> Default for Receiver<T> {
314 fn default() -> Self {
315 Self::new()
316 }
317}
318
319impl<T> Drop for Receiver<T> {
320 fn drop(&mut self) {
321 // The drop handler clears the `INDEX`, `OPEN` and `EMPTY` flags. If the
322 // channel was already closed by the sender, it drops the value (if any)
323 // and deallocates `inner`.
324 //
325 // Transitions:
326 //
327 // | I O E | I O E |
328 // |-----------|-----------|
329 // | x 0 1 | unobserv. | -> Deallocate
330 // | x 0 0 | unobserv. | -> Deallocate
331 // | x 1 1 | 0 0 0 |
332 // | x 1 0 | 0 0 0 |
333
334 // Ordering: the value and wakers may need to be dropped prior to
335 // deallocation in case the sender was dropped too, so Acquire ordering
336 // is necessary to synchronize with the Release store in `Sender::send`.
337 // Release ordering is in turn necessary in case the sender is still
338 // alive: it synchronizes with the Acquire operations in either
339 // `Sender::send` or in the drop handler of the sender to ensure that
340 // `inner` can be dropped safely.
341
342 // Safety: it is safe to access `inner` as we have not yet cleared the
343 // `OPEN` flag.
344 let state = unsafe { self.inner.as_ref().state.swap(0, Ordering::AcqRel) };
345
346 // If the sender is alive, let it handle cleanup.
347 if state & OPEN == OPEN {
348 return;
349 }
350
351 // Deallocate the channel since it was closed by the sender.
352 //
353 // Safety: `inner` will no longer be used once deallocated.
354 unsafe {
355 // If there is an unread value, drop it first.
356 if state & EMPTY == 0 {
357 // Safety: the presence of an initialized value was just
358 // checked and there is no live receiver so no risk of race.
359 self.inner.as_ref().drop_value_in_place();
360 }
361
362 // Deallocate inner.
363 drop(Box::from_raw(self.inner.as_ptr()));
364 }
365 }
366}
367
368/// Future returned by [`Receiver::recv()`].
369#[derive(Debug)]
370pub struct Recv<'a, T> {
371 /// The shared data.
372 receiver: &'a mut Receiver<T>,
373}
374
375impl<'a, T> Recv<'a, T> {
376 /// Return `Poll::Ready` with either the value (if any) or an error and
377 /// change the state to `EMPTY`.
378 fn poll_complete(self: Pin<&mut Self>, state: usize) -> Poll<Result<T, RecvError>> {
379 debug_assert!(state & OPEN == 0);
380
381 let ret = if state & EMPTY == 0 {
382 // Safety: the presence of an initialized value was just checked and
383 // there is no live sender so no risk of race. It is safe to access
384 // `inner` since we are now its single owner.
385 let value = unsafe { self.receiver.inner.as_ref().read_value() };
386
387 Ok(value)
388 } else {
389 Err(RecvError {})
390 };
391
392 // Set the state to indicate that the sender has been dropped and the
393 // message (if any) has been moved out.
394 //
395 // Ordering: Relaxed is enough since the sender was dropped and
396 // therefore no other thread can observe the state.
397 //
398 // Safety: It is safe to access `inner` since we are now its single owner.
399 unsafe {
400 self.receiver
401 .inner
402 .as_ref()
403 .state
404 .store(EMPTY, Ordering::Relaxed);
405 }
406
407 Poll::Ready(ret)
408 }
409}
410
411impl<'a, T> Future for Recv<'a, T> {
412 type Output = Result<T, RecvError>;
413
414 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415 // The poll method proceeds in two steps. In the first step, the `EMPTY`
416 // flag is set to make sure that a concurrent sender operation does not
417 // try to access the redundant waker slot while a new waker is being
418 // registered. The `EMPTY` flag is then cleared in Step 2 once the new
419 // waker is registered, checking at the same time whether the sender has
420 // not been consumed while the new waker was being registered. This
421 // check is necessary because if it was consumed, the sender may not
422 // have been able to send a notification (if the current waker slot was
423 // empty) or may have sent one using an outdated waker.
424 //
425 // Transitions:
426 //
427 // Step 1
428 //
429 // | I O E | I O E |
430 // |-----------|-----------|
431 // | x 0 0 | 0 0 1 | -> Return Ready(Message)
432 // | x 0 1 | 0 0 1 | -> Return Ready(Error)
433 // | x 1 0 | x 1 1 | -> Step 2
434 // | x 1 1 | x 1 1 | -> Step 2
435 //
436 // Step 2
437 //
438 // | I O E | I O E |
439 // |-----------|-----------|
440 // | x 0 0 | 0 0 1 | -> Return Ready(Error)
441 // | x 0 1 | 0 0 1 | -> Return Ready(Message)
442 // | x 1 1 | x 1 0 | -> Return Pending
443
444 // Fast path: this is an optimization in case the sender has already
445 // been consumed and has closed the channel.
446 //
447 // Safety: It is safe to access `inner` since we did not clear the
448 // `OPEN` flag.
449 let mut state = unsafe { self.receiver.inner.as_ref().state.load(Ordering::Acquire) };
450 if state & OPEN == 0 {
451 return self.poll_complete(state);
452 }
453
454 // Set the `EMPTY` flag to prevent the sender from updating the current
455 // waker slot. This is not necessary if `EMPTY` was already set because
456 // in such case the sender will never try to concurrently access the
457 // redundant waker slot.
458 if state & EMPTY == 0 {
459 // Ordering: Acquire ordering is necessary since some member data may be
460 // read and/or modified after reading the state.
461 //
462 // Safety: it is safe to access `inner` since we did not clear the
463 // `OPEN` flag.
464 unsafe {
465 state = self
466 .receiver
467 .inner
468 .as_ref()
469 .state
470 .fetch_or(EMPTY, Ordering::Acquire);
471 }
472
473 // Check whether the sender has closed the channel.
474 if state & OPEN == 0 {
475 return self.poll_complete(state);
476 }
477 }
478
479 // The waker will be stored in the redundant slot.
480 let current_idx = state_to_index(state);
481 let new_idx = 1 - current_idx;
482
483 // Store the new waker.
484 //
485 // Safety: the sender thread never accesses the waker stored in the slot
486 // not pointed to by `INDEX` and it does not modify `INDEX` as long as
487 // the `READY` flag is set. It is safe to access `inner` since we did
488 // not clear the `OPEN` flag.
489 //
490 // Unwind safety: even if `Waker::clone` panics, the state will be
491 // consistent since `OPEN` and `EMPTY` are both set, meaning that the
492 // redundant waker will not be accessed when the receiver is dropped.
493 unsafe {
494 self.receiver
495 .inner
496 .as_ref()
497 .set_waker(new_idx, Some(cx.waker().clone()));
498 }
499
500 // Make the new waker visible to the sender.
501 //
502 // Note: this could (should) be a `fetch_or(!EMPTY)` but `swap` may be
503 // faster on some platforms and the result will only be invalid if the
504 // sender has been in the meantime consumed, in which case the new state
505 // will not be observable anyway.
506 //
507 // Ordering: the waker may have been modified above so Release ordering
508 // is necessary to synchronize with the Acquire load in `Sender::send`
509 // or `Sender::drop`. Acquire ordering is also necessary since the
510 // message may be loaded. It is safe to access `inner` since we did not
511 // clear the `OPEN` flag.
512 let state = unsafe {
513 self.receiver
514 .inner
515 .as_ref()
516 .state
517 .swap(index_to_state(current_idx) | OPEN, Ordering::AcqRel)
518 };
519
520 // It is necessary to check again whether the sender has closed the
521 // channel, because if it did, the sender may not have been able to send
522 // a notification (if the current waker slot was empty) or may have sent
523 // one using an outdated waker.
524 if state & OPEN == 0 {
525 return self.poll_complete(state);
526 }
527
528 Poll::Pending
529 }
530}
531
532/// Single-use sender of a multi-shot channel.
533///
534/// A `Sender` can be created with the [`channel`] function or by recycling a
535/// previously consumed sender with the [`Receiver::sender`] method.
536#[derive(Debug)]
537pub struct Sender<T> {
538 /// The shared data.
539 inner: NonNull<Inner<T>>,
540 /// Drop checker hint: we may drop an `Inner<T>` and thus a `T`.
541 _phantom: PhantomData<Inner<T>>,
542}
543
544impl<T> Sender<T> {
545 /// Sends a value to the receiver and consume the sender.
546 pub fn send(self, t: T) {
547 // The send method is iterative. At each iteration, the `EMPTY` flag is
548 // checked. If set, the presence of a value is signaled immediately and
549 // the function returns after sending a notification. If the `EMPTY`
550 // flag is cleared, it is set again and the index of the current waker
551 // becomes that of the redundant slot in the next iteration.
552 //
553 // Transitions:
554 //
555 // | I O E | I O E |
556 // |-----------|-----------|
557 // | 0 0 0 | unobserv. | -> Deallocate
558 // | x 1 1 | x 0 0 | -> End
559 // | x 1 0 | !x 1 1 | -> Retry
560
561 // It is necessary to make sure that the destructor is not run: since
562 // the `OPEN` flag will be cleared once the value is sent,
563 // `Sender::drop` would otherwise wrongly consider the channel as closed
564 // by the receiver and would deallocate `inner` while the receiver is
565 // alive.
566 let this = ManuallyDrop::new(self);
567
568 // Store the value.
569 //
570 // Note: there is no need to drop a previous value since the
571 // `Receiver::sender` method would have dropped the value if there was
572 // one.
573 //
574 // Safety: no race for accessing the value is possible since there can
575 // be at most one sender alive at a time and the receiver will not read
576 // the value before the `OPEN` flag is cleared. It is safe to access
577 // `inner` since we did not clear the `OPEN` flag.
578 unsafe { this.inner.as_ref().write_value(t) };
579
580 // Retrieve the index of the current waker.
581 //
582 // Ordering: Relaxed ordering is sufficient since only the sender can
583 // modify `INDEX`.
584 //
585 // Safety: it is safe to access `inner` since we did not clear the
586 // `OPEN` flag.
587 let mut idx = state_to_index(unsafe { this.inner.as_ref().state.load(Ordering::Relaxed) });
588
589 loop {
590 // Take the current waker.
591 //
592 // Safety: the receiver thread never accesses the waker stored at
593 // `INDEX`. It is safe to access `inner` since we did not
594 // clear the `OPEN` flag.
595 let waker = unsafe { this.inner.as_ref().take_waker(idx) };
596
597 // Rather than a Compare-And-Swap, a `fetch_sub` operation is used
598 // as it is faster on many platforms. The specific order of the
599 // flags and the wrapping underflow behavior of `fetch_sub` are
600 // exploited to implement the transition table.
601 //
602 // Ordering: Acquire is necessary to synchronize with the Release
603 // store in the `Receiver::poll` method in case an updated waker
604 // needs to be taken, or with the Acquire operation in the drop
605 // handler of the receiver in case the channel was closed and
606 // `inner` needs to be dropped. Release is in turn necessary to
607 // ensure the visibility of both (i) the value and (ii) the
608 // consumption of the waker in the previous loop iteration (if any).
609 // The Release synchronizes with the Acquire load of the state in
610 // the receiver `poll` and `sender` methods.
611 //
612 // Safety: it is safe to access `inner` since we did not clear the
613 // `OPEN` flag.
614 let state = unsafe {
615 this.inner
616 .as_ref()
617 .state
618 .fetch_sub(OPEN | EMPTY, Ordering::AcqRel)
619 };
620
621 // Deallocate the channel if closed.
622 //
623 // Safety: it is safe to access `inner` since we did not clear the
624 // `OPEN` flag. `inner` will no longer be used once deallocated. In
625 // particular, the sender destructor will not be called.
626 unsafe {
627 if state & OPEN == 0 {
628 // Safety: a value was just written and there is no live
629 // receiver so no risk of a race.
630 this.inner.as_ref().drop_value_in_place();
631 drop(Box::from_raw(this.inner.as_ptr()));
632 return;
633 }
634 }
635
636 // If the waker was not updated, notify the receiver and return.
637 if state & EMPTY == EMPTY {
638 // Unwind safety: the state has already been updated, so
639 // panicking in `Waker::wake` is OK.
640 if let Some(waker) = waker {
641 waker.wake()
642 }
643 return;
644 }
645
646 // Update the local waker index to the current value of `INDEX`.
647 idx = 1 - idx;
648 }
649 }
650}
651
652unsafe impl<T: Send> Send for Sender<T> {}
653unsafe impl<T: Send> Sync for Sender<T> {}
654
655impl<T> UnwindSafe for Sender<T> {}
656impl<T> RefUnwindSafe for Sender<T> {}
657
658impl<T> Drop for Sender<T> {
659 fn drop(&mut self) {
660 // The drop handler is iterative. At each iteration, the `EMPTY` flag is
661 // checked. If set, the closure of the channel is signaled immediately
662 // and the function returns after sending a notification. If the `EMPTY`
663 // flag is cleared, it is set and the index of the current waker becomes
664 // that of the redundant slot in the next iteration.
665 //
666 // Transitions:
667 //
668 // | I O E | I O E |
669 // |-----------|-----------|
670 // | x 0 0 | unobserv. | -> Deallocate
671 // | x 1 1 | 0 0 1 | -> End
672 // | x 1 0 | !x 1 1 | -> Retry
673
674 // Retrieve the state and the current index.
675 //
676 // Ordering: Relaxed ordering is sufficient since only the sender can
677 // modify `INDEX`.
678 let mut state = unsafe { self.inner.as_ref().state.load(Ordering::Relaxed) };
679 let mut idx = state_to_index(state);
680
681 loop {
682 // Take the current waker.
683 //
684 // Safety: the receiver thread never accesses the waker stored at
685 // `INDEX`. It is safe to access `inner` since we did not clear the
686 // `OPEN` flag.
687 let waker = unsafe { self.inner.as_ref().take_waker(idx) };
688
689 loop {
690 // Modify the state according to the transition table.
691 let new_state = if state & EMPTY == EMPTY {
692 EMPTY
693 } else {
694 state ^ (EMPTY | INDEX)
695 };
696
697 // Ordering: Acquire is necessary to synchronize with the
698 // Release store in the `Receiver::poll` method in case an
699 // updated waker needs to be taken or with the Acquire operation
700 // in the drop handler of the receiver in case the channel was
701 // closed and `inner` needs to be dropped. Release is in turn
702 // necessary to ensure the visibility of the consumption of the
703 // waker in the previous loop iteration (if any). The Release
704 // synchronizes with the Acquire load of the state in the
705 // receiver `poll` and `sender` methods.
706 //
707 // Safety: it is safe to access `inner` since we have not yet
708 // cleared the `OPEN` flag.
709 unsafe {
710 match self.inner.as_ref().state.compare_exchange_weak(
711 state,
712 new_state,
713 Ordering::AcqRel,
714 Ordering::Relaxed,
715 ) {
716 Ok(s) => {
717 state = s;
718 break;
719 }
720 Err(s) => state = s,
721 }
722 }
723 }
724
725 // Deallocate the channel if it was already closed by the receiver.
726 //
727 // Safety: `inner` will no longer be used once deallocated.
728 unsafe {
729 if state & OPEN == 0 {
730 drop(Box::from_raw(self.inner.as_ptr()));
731 return;
732 }
733 }
734
735 // If the waker was not updated, notify the receiver and return.
736 if state & EMPTY == EMPTY {
737 // Unwind safety: the state has already been updated, so
738 // panicking in `Waker::wake` is OK.
739 if let Some(waker) = waker {
740 waker.wake()
741 }
742 return;
743 }
744
745 // Update the local waker index to the current value of `INDEX`.
746 idx = 1 - idx;
747 }
748 }
749}
750
751/// Error signaling that the sender was dropped without sending a value.
752#[derive(Debug, PartialEq, Eq, Clone, Copy)]
753pub struct RecvError {}
754
755impl fmt::Display for RecvError {
756 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
757 write!(fmt, "channel closed")
758 }
759}
760
761impl Error for RecvError {}
762
763/// Creates a new multi-shot channel.
764pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
765 let mut receiver = Receiver::new();
766 let sender = receiver.sender().unwrap();
767
768 (sender, receiver)
769}
770
771fn state_to_index(state: usize) -> usize {
772 (state & INDEX) >> 2
773}
774fn index_to_state(index: usize) -> usize {
775 index << 2
776}
777
778#[cfg(all(test, not(multishot_loom)))]
779mod tests {
780 use super::*;
781
782 use std::sync::Arc;
783 use std::task::Wake;
784 use std::thread;
785
786 // Dumb waker counting notifications.
787 struct TestWaker {
788 count: AtomicUsize,
789 }
790 impl TestWaker {
791 fn new() -> Self {
792 Self {
793 count: AtomicUsize::new(0),
794 }
795 }
796 fn take_count(&self) -> usize {
797 self.count.swap(0, Ordering::Acquire)
798 }
799 }
800 impl Wake for TestWaker {
801 fn wake(self: Arc<Self>) {
802 self.count.fetch_add(1, Ordering::Release);
803 }
804 }
805
806 // Executes a closure consuming the sender and checks the result of the
807 // completed future.
808 fn multishot_notify_single_threaded<F>(f: F, expect: Result<i32, RecvError>)
809 where
810 F: FnOnce(Sender<Box<i32>>) + Send + Copy + 'static,
811 {
812 let test_waker = Arc::new(TestWaker::new());
813 let waker = test_waker.clone().into();
814 let mut cx = Context::from_waker(&waker);
815 let mut receiver: Receiver<Box<i32>> = Receiver::new();
816
817 // Consume sender before polling.
818 {
819 let sender = receiver.sender().expect("could not create sender");
820 let mut fut = receiver.recv();
821 let mut fut = Pin::new(&mut fut);
822
823 f(sender);
824
825 let res = fut.as_mut().poll(&mut cx);
826 assert_eq!(test_waker.take_count(), 0);
827 assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
828 }
829
830 // Consume sender after polling.
831 {
832 let sender = receiver.sender().expect("could not create sender");
833 let mut fut = receiver.recv();
834 let mut fut = Pin::new(&mut fut);
835
836 let res = fut.as_mut().poll(&mut cx);
837 assert_eq!(res, Poll::Pending);
838 f(sender);
839 assert_eq!(test_waker.take_count(), 1);
840 let res = fut.as_mut().poll(&mut cx);
841 assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
842 }
843 }
844
845 #[test]
846 /// Sends a message.
847 fn multishot_send_notify_single_threaded() {
848 multishot_notify_single_threaded(|sender| sender.send(Box::new(42)), Ok(42));
849 }
850 #[test]
851 /// Drops the sender.
852 fn multishot_drop_notify_single_threaded() {
853 multishot_notify_single_threaded(|sender| drop(sender), Err(RecvError {}));
854 }
855
856 // Changes the waker before executing a closure consuming the sender.
857 fn multishot_change_waker_single_threaded<F>(f: F, expect: Result<i32, RecvError>)
858 where
859 F: FnOnce(Sender<Box<i32>>) + Send + Copy + 'static,
860 {
861 let test_waker1 = Arc::new(TestWaker::new());
862 let waker1 = test_waker1.clone().into();
863 let mut cx1 = Context::from_waker(&waker1);
864 let test_waker2 = Arc::new(TestWaker::new());
865 let waker2 = test_waker2.clone().into();
866 let mut cx2 = Context::from_waker(&waker2);
867 let test_waker3 = Arc::new(TestWaker::new());
868 let waker3 = test_waker3.clone().into();
869 let mut cx3 = Context::from_waker(&waker3);
870
871 // Change waker and consume sender.
872 {
873 let (sender, mut receiver) = channel::<Box<i32>>();
874 let mut fut = receiver.recv();
875 let mut fut = Pin::new(&mut fut);
876
877 let res = fut.as_mut().poll(&mut cx1);
878 assert_eq!(res, Poll::Pending);
879 let res = fut.as_mut().poll(&mut cx2);
880 assert_eq!(res, Poll::Pending);
881 f(sender);
882 assert_eq!(test_waker2.take_count(), 1);
883 let res = fut.as_mut().poll(&mut cx1);
884 assert_eq!(test_waker1.take_count(), 0);
885 assert_eq!(test_waker2.take_count(), 0);
886 assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
887 }
888
889 // Change waker twice and consume sender.
890 {
891 let (sender, mut receiver) = channel::<Box<i32>>();
892 let mut fut = receiver.recv();
893 let mut fut = Pin::new(&mut fut);
894
895 let res = fut.as_mut().poll(&mut cx1);
896 assert_eq!(res, Poll::Pending);
897 let res = fut.as_mut().poll(&mut cx2);
898 assert_eq!(res, Poll::Pending);
899 let res = fut.as_mut().poll(&mut cx3);
900 assert_eq!(res, Poll::Pending);
901 f(sender);
902 assert_eq!(test_waker3.take_count(), 1);
903 let res = fut.as_mut().poll(&mut cx2);
904 assert_eq!(test_waker1.take_count(), 0);
905 assert_eq!(test_waker2.take_count(), 0);
906 assert_eq!(res.map_ok(|v| *v), Poll::Ready(expect));
907 }
908 }
909
910 #[test]
911 /// Sends a message after changing the waker.
912 fn multishot_send_change_waker_single_threaded() {
913 multishot_change_waker_single_threaded(|sender| sender.send(Box::new(42)), Ok(42));
914 }
915 #[test]
916 /// Drops the sender after changing the waker.
917 fn multishot_drop_change_waker_single_threaded() {
918 multishot_change_waker_single_threaded(|sender| drop(sender), Err(RecvError {}));
919 }
920
921 // Executes a closure consuming the sender on a separate thread and checks
922 // the result of the completed future.
923 fn multishot_notify_multi_threaded<F>(f: F, expect: Result<i32, RecvError>)
924 where
925 F: FnOnce(Sender<Box<i32>>) + Send + Copy + 'static,
926 {
927 let test_waker = Arc::new(TestWaker::new());
928 let waker = test_waker.clone().into();
929 let mut cx = Context::from_waker(&waker);
930 let mut receiver: Receiver<Box<i32>> = Receiver::new();
931
932 let sender = receiver.sender().expect("could not create sender");
933 let mut fut = receiver.recv();
934 let mut fut = Pin::new(&mut fut);
935
936 let th = thread::spawn(move || f(sender));
937
938 let res = fut.as_mut().poll(&mut cx);
939
940 th.join().unwrap();
941
942 match res {
943 Poll::Pending => {
944 assert_eq!(test_waker.take_count(), 1);
945 assert_eq!(
946 fut.as_mut().poll(&mut cx).map_ok(|v| *v),
947 Poll::Ready(expect)
948 );
949 }
950 Poll::Ready(res) => assert_eq!(res.map(|v| *v), expect),
951 }
952 }
953
954 #[test]
955 /// Sends a message from another thread.
956 fn multishot_send_notify_multi_threaded() {
957 multishot_notify_multi_threaded(|sender| sender.send(Box::new(42)), Ok(42));
958 }
959 #[test]
960 /// Drops the sender on another thread.
961 fn multishot_drop_notify_multi_threaded() {
962 multishot_notify_multi_threaded(|sender| drop(sender), Err(RecvError {}));
963 }
964
965 #[test]
966 // Drop both the sender and receiver concurrently. This test is mainly meant
967 // for MIRI.
968 fn multishot_drop_both_multi_threaded() {
969 let mut receiver: Receiver<Box<i32>> = Receiver::new();
970
971 let sender = receiver.sender().expect("could not create sender");
972
973 let th = thread::spawn(move || drop(sender));
974 drop(receiver);
975
976 th.join().unwrap();
977 }
978
979 #[test]
980 // Consume the sender and drop the receiver concurrently. This test is
981 // mainly meant for MIRI.
982 fn multishot_send_and_drop_multi_threaded() {
983 let mut receiver: Receiver<Box<i32>> = Receiver::new();
984
985 let sender = receiver.sender().expect("could not create sender");
986
987 let th = thread::spawn(move || sender.send(Box::new(123)));
988 drop(receiver);
989
990 th.join().unwrap();
991 }
992}
993
994#[cfg(all(test, multishot_loom))]
995mod tests {
996 use super::*;
997
998 use std::future::Future;
999 use std::sync::Arc;
1000 use std::task::{Context, Poll, Wake};
1001
1002 use loom::sync::atomic::{AtomicBool, AtomicUsize};
1003 use loom::thread;
1004
1005 // Dumb waker counting notifications.
1006 struct TestWaker {
1007 count: AtomicUsize,
1008 }
1009 impl TestWaker {
1010 fn new() -> Self {
1011 Self {
1012 count: AtomicUsize::new(0),
1013 }
1014 }
1015 fn take_count(&self) -> usize {
1016 self.count.swap(0, Ordering::Acquire)
1017 }
1018 }
1019 impl Wake for TestWaker {
1020 fn wake(self: Arc<Self>) {
1021 self.count.fetch_add(1, Ordering::Release);
1022 }
1023 }
1024
1025 // Executes a closure consuming the sender and checks the result of the
1026 // completed future.
1027 fn multishot_loom_notify<F>(f: F, expect: Result<i32, RecvError>)
1028 where
1029 F: FnOnce(Sender<i32>) + Send + Sync + Copy + 'static,
1030 {
1031 loom::model(move || {
1032 let test_waker = Arc::new(TestWaker::new());
1033 let waker = test_waker.clone().into();
1034 let mut cx = Context::from_waker(&waker);
1035
1036 let (sender, mut receiver) = channel();
1037
1038 let has_message = Arc::new(AtomicBool::new(false));
1039 thread::spawn({
1040 let has_message = has_message.clone();
1041 move || {
1042 f(sender);
1043 has_message.store(true, Ordering::Release);
1044 }
1045 });
1046
1047 let mut fut = receiver.recv();
1048 let mut fut = Pin::new(&mut fut);
1049
1050 let res = fut.as_mut().poll(&mut cx);
1051
1052 match res {
1053 Poll::Pending => {
1054 let msg = has_message.load(Ordering::Acquire);
1055 let event_count = test_waker.take_count();
1056 if event_count == 0 {
1057 // Make sure that if the waker was not notified, then no
1058 // message was sent (or equivalently, if a message was sent,
1059 // the waker was notified).
1060 assert_eq!(msg, false);
1061 } else {
1062 assert_eq!(event_count, 1);
1063 // Make sure that if the waker was notified, the message
1064 // can be retrieved (this is crucial to ensure that
1065 // notifications are not lost).
1066 let res = fut.as_mut().poll(&mut cx);
1067 assert_eq!(test_waker.take_count(), 0);
1068 assert_eq!(res, Poll::Ready(expect));
1069 }
1070 }
1071 Poll::Ready(val) => {
1072 assert_eq!(val, expect);
1073 }
1074 }
1075 });
1076 }
1077
1078 // Executes a closure consuming the sender and checks the result of the
1079 // completed future, changing the waker several times.
1080 fn multishot_loom_change_waker<F>(f: F, expect: Result<i32, RecvError>)
1081 where
1082 F: FnOnce(Sender<i32>) + Send + Sync + Copy + 'static,
1083 {
1084 loom::model(move || {
1085 let test_waker1 = Arc::new(TestWaker::new());
1086 let waker1 = test_waker1.clone().into();
1087 let mut cx1 = Context::from_waker(&waker1);
1088
1089 let test_waker2 = Arc::new(TestWaker::new());
1090 let waker2 = test_waker2.clone().into();
1091 let mut cx2 = Context::from_waker(&waker2);
1092
1093 let (sender, mut receiver) = channel();
1094
1095 thread::spawn({
1096 move || {
1097 f(sender);
1098 }
1099 });
1100
1101 let mut fut = receiver.recv();
1102 let mut fut = Pin::new(&mut fut);
1103
1104 // Attempt to poll the future to completion with the provided context.
1105 fn try_complete(
1106 fut: &mut Pin<&mut Recv<i32>>,
1107 cx: &mut Context,
1108 other_cx: &mut Context,
1109 test_waker: &TestWaker,
1110 other_test_waker: &TestWaker,
1111 expect: Result<i32, RecvError>,
1112 ) -> bool {
1113 let res = fut.as_mut().poll(cx);
1114
1115 // If `Ready` is returned we are done.
1116 if let Poll::Ready(val) = res {
1117 assert_eq!(val, expect);
1118 return true;
1119 }
1120
1121 // The sender should not have used the other waker even if it
1122 // was registered before the last call to `poll`.
1123 assert_eq!(other_test_waker.take_count(), 0);
1124
1125 // Although the last call to `poll` has returned `Pending`, the
1126 // sender may have been consumed in the meantime so check
1127 // whether there is a notification.
1128 let event_count = test_waker.take_count();
1129 if event_count != 0 {
1130 // Expect only one notification.
1131 assert_eq!(event_count, 1);
1132
1133 // Since the task was notified it is expected that the
1134 // future is now ready.
1135 let res = fut.as_mut().poll(other_cx);
1136 assert_eq!(test_waker.take_count(), 0);
1137 assert_eq!(other_test_waker.take_count(), 0);
1138 assert_eq!(res, Poll::Ready(expect));
1139 return true;
1140 }
1141
1142 // The future was not polled to completion.
1143 false
1144 }
1145
1146 // Poll with cx1.
1147 if try_complete(
1148 &mut fut,
1149 &mut cx1,
1150 &mut cx2,
1151 &test_waker1,
1152 &test_waker2,
1153 expect,
1154 ) {
1155 return;
1156 }
1157 // Poll with cx2.
1158 if try_complete(
1159 &mut fut,
1160 &mut cx2,
1161 &mut cx1,
1162 &test_waker2,
1163 &test_waker1,
1164 expect,
1165 ) {
1166 return;
1167 }
1168 // Poll again with cx1.
1169 if try_complete(
1170 &mut fut,
1171 &mut cx1,
1172 &mut cx2,
1173 &test_waker1,
1174 &test_waker2,
1175 expect,
1176 ) {
1177 return;
1178 }
1179 });
1180 }
1181
1182 // Executes a closure consuming the sender and attempts to reuse the
1183 // channel.
1184 fn multishot_loom_recycle<F>(f: F)
1185 where
1186 F: FnOnce(Sender<i32>) + Send + Sync + Copy + 'static,
1187 {
1188 loom::model(move || {
1189 let test_waker = Arc::new(TestWaker::new());
1190 let waker = test_waker.clone().into();
1191 let mut cx = Context::from_waker(&waker);
1192
1193 let (sender, mut receiver) = channel();
1194
1195 {
1196 thread::spawn({
1197 move || {
1198 f(sender);
1199 }
1200 });
1201
1202 let mut fut = receiver.recv();
1203 let mut fut = Pin::new(&mut fut);
1204
1205 // Poll up to twice.
1206 let res = fut.as_mut().poll(&mut cx);
1207 if res == Poll::Pending {
1208 let res = fut.as_mut().poll(&mut cx);
1209 if res == Poll::Pending {
1210 return;
1211 }
1212 }
1213 }
1214
1215 // The future was polled to completion, meaning that the sender was
1216 // consumed and should be immediately recyclable.
1217 let sender = receiver
1218 .sender()
1219 .expect("Could not recycle the sender after it was consumed");
1220
1221 // It's all downhill from here, just make sure the recycled sender
1222 // works correctly.
1223 {
1224 thread::spawn({
1225 move || {
1226 sender.send(13);
1227 }
1228 });
1229
1230 let mut fut = receiver.recv();
1231 let mut fut = Pin::new(&mut fut);
1232
1233 let res = fut.as_mut().poll(&mut cx);
1234 if let Poll::Ready(val) = res {
1235 assert_eq!(val, Ok(13));
1236 }
1237 }
1238 });
1239 }
1240
1241 #[test]
1242 /// Sends a message.
1243 fn multishot_loom_send_notify() {
1244 multishot_loom_notify(|sender| sender.send(42), Ok(42));
1245 }
1246 #[test]
1247 /// Drops the sender.
1248 fn multishot_loom_drop_notify() {
1249 multishot_loom_notify(|sender| drop(sender), Err(RecvError {}));
1250 }
1251 #[test]
1252 /// Changes the waker while sending a message.
1253 fn multishot_loom_send_change_waker() {
1254 multishot_loom_change_waker(|sender| sender.send(42), Ok(42));
1255 }
1256 #[test]
1257 /// Changes the waker while dropping the sender.
1258 fn multishot_loom_drop_change_waker() {
1259 multishot_loom_change_waker(|sender| drop(sender), Err(RecvError {}));
1260 }
1261 #[test]
1262 /// Recycles the sender after sending a message.
1263 fn multishot_loom_send_recycle() {
1264 multishot_loom_recycle(|sender| sender.send(42));
1265 }
1266 #[test]
1267 /// Recycles the sender after dropping the previous sender.
1268 fn multishot_loom_drop_recycle() {
1269 multishot_loom_recycle(|sender| drop(sender));
1270 }
1271}