Skip to main content

oneshot_uniffi/
lib.rs

1//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
2//! can only transport a single message. This has a few nice outcomes. One thing is that
3//! the implementation can be very efficient, utilizing the knowledge that there will
4//! only be one message. But more importantly, it allows the API to be expressed in such
5//! a way that certain edge cases that you don't want to care about when only sending a
6//! single message on a channel does not exist. For example: The sender can't be copied
7//! or cloned, and the send method takes ownership and consumes the sender.
8//! So you are guaranteed, at the type level, that there can only be one message sent.
9//!
10//! The sender's send method is non-blocking, and potentially lock- and wait-free.
11//! See documentation on [Sender::send] for situations where it might not be fully wait-free.
12//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
13//! limited thread blocking receive operations. The receiver also implements `Future` and
14//! supports asynchronously awaiting the message.
15//!
16//!
17//! # Examples
18//!
19//! This example sets up a background worker that processes requests coming in on a standard
20//! mpsc channel and replies on a oneshot channel provided with each request. The worker can
21//! be interacted with both from sync and async contexts since the oneshot receiver
22//! can receive both blocking and async.
23//!
24//! ```rust
25//! use std::sync::mpsc;
26//! use std::thread;
27//! use std::time::Duration;
28//!
29//! type Request = String;
30//!
31//! // Starts a background thread performing some computation on requests sent to it.
32//! // Delivers the response back over a oneshot channel.
33//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
34//!     let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
35//!     thread::spawn(move || {
36//!         for (request_data, response_sender) in request_receiver.iter() {
37//!             let compute_operation = || request_data.len();
38//!             let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
39//!         }
40//!     });
41//!     request_sender
42//! }
43//!
44//! let processor = spawn_processing_thread();
45//!
46//! // If compiled with `std` the library can receive messages with timeout on regular threads
47//! #[cfg(feature = "std")] {
48//!     let (response_sender, response_receiver) = oneshot::channel();
49//!     let request = Request::from("data from sync thread");
50//!
51//!     processor.send((request, response_sender)).expect("Processor down");
52//!     match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
53//!         Ok(result) => println!("Processor returned {}", result),
54//!         Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
55//!         Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
56//!     }
57//! }
58//!
59//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
60//! #[cfg(feature = "async")] {
61//!     tokio::runtime::Runtime::new()
62//!         .unwrap()
63//!         .block_on(async move {
64//!             let (response_sender, response_receiver) = oneshot::channel();
65//!             let request = Request::from("data from sync thread");
66//!
67//!             processor.send((request, response_sender)).expect("Processor down");
68//!             match response_receiver.await { // <- Receive on the oneshot channel asynchronously
69//!                 Ok(result) => println!("Processor returned {}", result),
70//!                 Err(_e) => panic!("Processor exited"),
71//!             }
72//!         });
73//! }
74//! ```
75//!
76//! # Sync vs async
77//!
78//! The main motivation for writing this library was that there were no (known to me) channel
79//! implementations allowing you to seamlessly send messages between a normal thread and an async
80//! task, or the other way around. If message passing is the way you are communicating, of course
81//! that should work smoothly between the sync and async parts of the program!
82//!
83//! This library achieves that by having a fast and cheap send operation that can
84//! be used in both sync threads and async tasks. The receiver has both thread blocking
85//! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
86//!
87//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
88//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
89//! be possible to use this library with any executor.
90//!
91
92// # Implementation description
93//
94// When a channel is created via the channel function, it creates a single heap allocation
95// containing:
96// * A one byte atomic integer that represents the current channel state,
97// * Uninitialized memory to fit the message,
98// * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
99//
100// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
101// So with all features enabled (the default) each channel allocates 25 bytes plus the size of the
102// message, plus any padding needed to get correct memory alignment.
103//
104// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
105// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
106// be consumed or dropped signal via the state that it is gone. And the second one see this and
107// frees the memory.
108//
109// ## Footnotes
110//
111// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
112//      impossible to *wait* for the message. `try_recv` the only available method in this scenario.
113
114#![deny(rust_2018_idioms)]
115#![cfg_attr(not(feature = "std"), no_std)]
116
117#[cfg(not(loom))]
118extern crate alloc;
119
120use core::{
121    marker::PhantomData,
122    mem::{self, MaybeUninit},
123    ptr::{self, NonNull},
124};
125
126#[cfg(not(loom))]
127use core::{
128    cell::UnsafeCell,
129    sync::atomic::{fence, AtomicU8, Ordering::*},
130};
131#[cfg(loom)]
132use loom::{
133    cell::UnsafeCell,
134    sync::atomic::{fence, AtomicU8, Ordering::*},
135};
136
137#[cfg(all(feature = "async", not(loom)))]
138use core::hint;
139#[cfg(all(feature = "async", loom))]
140use loom::hint;
141
142#[cfg(feature = "async")]
143use core::{
144    pin::Pin,
145    task::{self, Poll},
146};
147#[cfg(feature = "std")]
148use std::time::{Duration, Instant};
149
150#[cfg(feature = "std")]
151mod thread {
152    #[cfg(not(loom))]
153    pub use std::thread::{current, park, park_timeout, yield_now, Thread};
154
155    #[cfg(loom)]
156    pub use loom::thread::{current, park, yield_now, Thread};
157
158    // loom does not support parking with a timeout. So we just
159    // yield. This means that the "park" will "spuriously" wake up
160    // way too early. But the code should properly handle this.
161    // One thing to note is that very short timeouts are needed
162    // when using loom, since otherwise the looping will cause
163    // an overflow in loom.
164    #[cfg(loom)]
165    pub fn park_timeout(_timeout: std::time::Duration) {
166        loom::thread::yield_now()
167    }
168}
169
170#[cfg(loom)]
171mod loombox;
172#[cfg(not(loom))]
173use alloc::boxed::Box;
174#[cfg(loom)]
175use loombox::Box;
176
177mod errors;
178pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError};
179
180/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
181pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
182    // Allocate the channel on the heap and get the pointer.
183    // The last endpoint of the channel to be alive is responsible for freeing the channel
184    // and dropping any object that might have been written to it.
185
186    let channel_ptr = Box::into_raw(Box::new(Channel::new()));
187
188    // SAFETY: `channel_ptr` came from a Box and thus is not null
189    let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) };
190
191    (
192        Sender {
193            channel_ptr,
194            _invariant: PhantomData,
195        },
196        Receiver { channel_ptr },
197    )
198}
199
200#[derive(Debug)]
201pub struct Sender<T> {
202    channel_ptr: NonNull<Channel<T>>,
203    // In reality we want contravariance, however we can't obtain that.
204    //
205    // Consider the following scenario:
206    // ```
207    // let (mut tx, rx) = channel::<&'short u8>();
208    // let (tx2, rx2) = channel::<&'long u8>();
209    //
210    // tx = tx2;
211    //
212    // // Pretend short_ref is some &'short u8
213    // tx.send(short_ref).unwrap();
214    // let long_ref = rx2.recv().unwrap();
215    // ```
216    //
217    // If this type were covariant then we could safely extend lifetimes, which is not okay.
218    // Hence, we enforce invariance.
219    _invariant: PhantomData<fn(T) -> T>,
220}
221
222#[derive(Debug)]
223pub struct Receiver<T> {
224    // Covariance is the right choice here. Consider the example presented in Sender, and you'll
225    // see that if we replaced `rx` instead then we would get the expected behavior
226    channel_ptr: NonNull<Channel<T>>,
227}
228
229unsafe impl<T: Send> Send for Sender<T> {}
230unsafe impl<T: Send> Send for Receiver<T> {}
231impl<T> Unpin for Receiver<T> {}
232
233impl<T> Sender<T> {
234    /// Sends `message` over the channel to the corresponding [`Receiver`].
235    ///
236    /// Returns an error if the receiver has already been dropped. The message can
237    /// be extracted from the error.
238    ///
239    /// This method is lock-free and wait-free when sending on a channel that the
240    /// receiver is currently not receiving on. If the receiver is receiving during the send
241    /// operation this method includes waking up the thread/task. Unparking a thread involves
242    /// a mutex in Rust's standard library at the time of writing this.
243    /// How lock-free waking up an async task is
244    /// depends on your executor. If this method returns a `SendError`, please mind that dropping
245    /// the error involves running any drop implementation on the message type, and freeing the
246    /// channel's heap allocation, which might or might not be lock-free.
247    pub fn send(self, message: T) -> Result<(), SendError<T>> {
248        let channel_ptr = self.channel_ptr;
249
250        // Don't run our Drop implementation if send was called, any cleanup now happens here
251        mem::forget(self);
252
253        // SAFETY: The channel exists on the heap for the entire duration of this method and we
254        // only ever acquire shared references to it. Note that if the receiver disconnects it
255        // does not free the channel.
256        let channel = unsafe { channel_ptr.as_ref() };
257
258        // Write the message into the channel on the heap.
259        // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
260        // state, and since we're responsible for setting that state, we can guarantee that we have
261        // exclusive access to this memory location to perform this write.
262        unsafe { channel.write_message(message) };
263
264        // Set the state to signal there is a message on the channel.
265        // ORDERING: we use release ordering to ensure the write of the message is visible to the
266        // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
267        // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
268        // independent of this operation.
269        //
270        // EMPTY + 1 = MESSAGE
271        // RECEIVING + 1 = UNPARKING
272        // DISCONNECTED + 1 = invalid, however this state is never observed
273        match channel.state.fetch_add(1, Release) {
274            // The receiver is alive and has not started waiting. Send done.
275            EMPTY => Ok(()),
276            // The receiver is waiting. Wake it up so it can return the message.
277            RECEIVING => {
278                // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
279                // taking of the waker from being ordered before this operation.
280                fence(Acquire);
281
282                // Take the waker, but critically do not unpark it. If we unparked now, then the
283                // receiving thread could still observe the UNPARKING state and re-park, meaning
284                // that after we change to the MESSAGE state, it would remain parked indefinitely
285                // or until a spurious wakeup.
286                // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
287                // does not access the waker while in this state, nor does it free the channel
288                // allocation in this state.
289                let waker = unsafe { channel.take_waker() };
290
291                // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
292                // in the receiving thread, ensuring that both our read of the waker and write of
293                // the message happen-before the taking of the message and freeing of the channel.
294                // Furthermore, we need acquire ordering to ensure the unparking of the receiver
295                // happens after the channel state is updated.
296                channel.state.swap(MESSAGE, AcqRel);
297
298                // Note: it is possible that between the store above and this statement that
299                // the receiving thread is spuriously unparked, takes the message, and frees
300                // the channel allocation. However, we took ownership of the channel out of
301                // that allocation, and freeing the channel does not drop the waker since the
302                // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
303                // whether or not the receive has completed by this point.
304                waker.unpark();
305
306                Ok(())
307            }
308            // The receiver was already dropped. The error is responsible for freeing the channel.
309            // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
310            // we can transfer exclusive ownership of the channel's resources to the error.
311            // Moreover, since we just placed the message in the channel, the channel contains a
312            // valid message.
313            DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
314            _ => unreachable!(),
315        }
316    }
317
318    /// Consumes the Sender, returning a raw pointer to the channel on the heap.
319    ///
320    /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
321    /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw].
322    /// Memory will leak if the Sender is never reconstructed.
323    pub fn into_raw(self) -> *mut () {
324        let raw = self.channel_ptr.as_ptr() as *mut ();
325        mem::forget(self);
326        raw
327    }
328
329    /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender.
330    ///
331    /// # Safety
332    ///
333    /// This pointer must have come from [`Sender<T>::into_raw`] with the same message type, `T`.
334    /// At most one Sender must exist for a channel at any point in time.
335    /// Constructing multiple Senders from the same raw pointer leads to undefined behavior.
336    pub unsafe fn from_raw(raw: *mut ()) -> Self {
337        Self {
338            channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
339            _invariant: PhantomData,
340        }
341    }
342}
343
344impl<T> Drop for Sender<T> {
345    fn drop(&mut self) {
346        // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
347        // DISCONNECTED states. If we are in the MESSAGE state, then we called
348        // mem::forget(self), so we should not be in this function call. If we are in the
349        // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
350        // unreachable, or was dropped and observed that our side was still alive, and thus didn't
351        // free the channel.
352        let channel = unsafe { self.channel_ptr.as_ref() };
353
354        // Set the channel state to disconnected and read what state the receiver was in
355        // ORDERING: we don't need release ordering here since there are no modifications we
356        // need to make visible to other thread, and the Err(RECEIVING) branch handles
357        // synchronization independent of this cmpxchg
358        //
359        // EMPTY ^ 001 = DISCONNECTED
360        // RECEIVING ^ 001 = UNPARKING
361        // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
362        match channel.state.fetch_xor(0b001, Relaxed) {
363            // The receiver has not started waiting, nor is it dropped.
364            EMPTY => (),
365            // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
366            RECEIVING => {
367                // See comments in Sender::send
368
369                fence(Acquire);
370
371                let waker = unsafe { channel.take_waker() };
372
373                // We still need release ordering here to make sure our read of the waker happens
374                // before this, and acquire ordering to ensure the unparking of the receiver
375                // happens after this.
376                channel.state.swap(DISCONNECTED, AcqRel);
377
378                // The Acquire ordering above ensures that the write of the DISCONNECTED state
379                // happens-before unparking the receiver.
380                waker.unpark();
381            }
382            // The receiver was already dropped. We are responsible for freeing the channel.
383            DISCONNECTED => {
384                // SAFETY: when the receiver switches the state to DISCONNECTED they have received
385                // the message or will no longer be trying to receive the message, and have
386                // observed that the sender is still alive, meaning that we're responsible for
387                // freeing the channel allocation.
388                unsafe { dealloc(self.channel_ptr) };
389            }
390            _ => unreachable!(),
391        }
392    }
393}
394
395impl<T> Receiver<T> {
396    /// Checks if there is a message in the channel without blocking. Returns:
397    ///  * `Ok(message)` if there was a message in the channel.
398    ///  * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
399    ///  * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
400    ///    message has already been extracted by a previous receive call.
401    ///
402    /// If a message is returned, the channel is disconnected and any subsequent receive operation
403    /// using this receiver will return an error.
404    ///
405    /// This method is completely lock-free and wait-free. The only thing it does is an atomic
406    /// integer load of the channel state. And if there is a message in the channel it additionally
407    /// performs one atomic integer store and copies the message from the heap to the stack for
408    /// returning it.
409    pub fn try_recv(&self) -> Result<T, TryRecvError> {
410        // SAFETY: The channel will not be freed while this method is still running.
411        let channel = unsafe { self.channel_ptr.as_ref() };
412
413        // ORDERING: we use acquire ordering to synchronize with the store of the message.
414        match channel.state.load(Acquire) {
415            MESSAGE => {
416                // It's okay to break up the load and store since once we're in the message state
417                // the sender no longer modifies the state
418                // ORDERING: at this point the sender has done its job and is no longer active, so
419                // we don't need to make any side effects visible to it
420                channel.state.store(DISCONNECTED, Relaxed);
421
422                // SAFETY: we are in the MESSAGE state so the message is present
423                Ok(unsafe { channel.take_message() })
424            }
425            EMPTY => Err(TryRecvError::Empty),
426            DISCONNECTED => Err(TryRecvError::Disconnected),
427            #[cfg(feature = "async")]
428            RECEIVING | UNPARKING => Err(TryRecvError::Empty),
429            _ => unreachable!(),
430        }
431    }
432
433    /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
434    /// disconnected.
435    ///
436    /// This method will always block the current thread if there is no data available and it is
437    /// still possible for the message to be sent. Once the message is sent to the corresponding
438    /// [`Sender`], then this receiver will wake up and return that message.
439    ///
440    /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
441    /// this call is blocking, this call will wake up and return `Err` to indicate that the message
442    /// can never be received on this channel.
443    ///
444    /// If a sent message has already been extracted from this channel this method will return an
445    /// error.
446    ///
447    /// # Panics
448    ///
449    /// Panics if called after this receiver has been polled asynchronously.
450    #[cfg(feature = "std")]
451    pub fn recv(self) -> Result<T, RecvError> {
452        // Note that we don't need to worry about changing the state to disconnected or setting the
453        // state to an invalid value at any point in this function because we take ownership of
454        // self, and this function does not exit until the message has been received or both side
455        // of the channel are inactive and cleaned up.
456
457        let channel_ptr = self.channel_ptr;
458
459        // Don't run our Drop implementation if we are receiving consuming ourselves.
460        mem::forget(self);
461
462        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
463        // is still alive, meaning that even if the sender was dropped then it would have observed
464        // the fact that we're still alive and left the responsibility of deallocating the
465        // channel to us, so channel_ptr is valid
466        let channel = unsafe { channel_ptr.as_ref() };
467
468        // ORDERING: we use acquire ordering to synchronize with the write of the message in the
469        // case that it's available
470        match channel.state.load(Acquire) {
471            // The sender is alive but has not sent anything yet. We prepare to park.
472            EMPTY => {
473                // Conditionally add a delay here to help the tests trigger the edge cases where
474                // the sender manages to be dropped or send something before we are able to store
475                // our waker object in the channel.
476                #[cfg(oneshot_test_delay)]
477                std::thread::sleep(std::time::Duration::from_millis(10));
478
479                // Write our waker instance to the channel.
480                // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
481                // try to access the waker until it sees the state set to RECEIVING below
482                unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
483
484                // Switch the state to RECEIVING. We need to do this in one atomic step in case the
485                // sender disconnected or sent the message while we wrote the waker to memory. We
486                // don't need to do a compare exchange here however because if the original state
487                // was not EMPTY, then the sender has either finished sending the message or is
488                // being dropped, so the RECEIVING state will never be observed after we return.
489                // ORDERING: we use release ordering so the sender can synchronize with our writing
490                // of the waker to memory. The individual branches handle any additional
491                // synchronizaton
492                match channel.state.swap(RECEIVING, Release) {
493                    // We stored our waker, now we park until the sender has changed the state
494                    EMPTY => loop {
495                        thread::park();
496
497                        // ORDERING: synchronize with the write of the message
498                        match channel.state.load(Acquire) {
499                            // The sender sent the message while we were parked.
500                            MESSAGE => {
501                                // SAFETY: we are in the message state so the message is valid
502                                let message = unsafe { channel.take_message() };
503
504                                // SAFETY: the Sender delegates the responsibility of deallocating
505                                // the channel to us upon sending the message
506                                unsafe { dealloc(channel_ptr) };
507
508                                break Ok(message);
509                            }
510                            // The sender was dropped while we were parked.
511                            DISCONNECTED => {
512                                // SAFETY: the Sender doesn't deallocate the channel allocation in
513                                // its drop implementation if we're receiving
514                                unsafe { dealloc(channel_ptr) };
515
516                                break Err(RecvError);
517                            }
518                            // State did not change, spurious wakeup, park again.
519                            RECEIVING | UNPARKING => (),
520                            _ => unreachable!(),
521                        }
522                    },
523                    // The sender sent the message while we prepared to park.
524                    MESSAGE => {
525                        // ORDERING: Synchronize with the write of the message. This branch is
526                        // unlikely to be taken, so it's likely more efficient to use a fence here
527                        // instead of AcqRel ordering on the RMW operation
528                        fence(Acquire);
529
530                        // SAFETY: we started in the empty state and the sender switched us to the
531                        // message state. This means that it did not take the waker, so we're
532                        // responsible for dropping it.
533                        unsafe { channel.drop_waker() };
534
535                        // SAFETY: we are in the message state so the message is valid
536                        let message = unsafe { channel.take_message() };
537
538                        // SAFETY: the Sender delegates the responsibility of deallocating the
539                        // channel to us upon sending the message
540                        unsafe { dealloc(channel_ptr) };
541
542                        Ok(message)
543                    }
544                    // The sender was dropped before sending anything while we prepared to park.
545                    DISCONNECTED => {
546                        // SAFETY: we started in the empty state and the sender switched us to the
547                        // disconnected state. It does not take the waker when it does this so we
548                        // need to drop it.
549                        unsafe { channel.drop_waker() };
550
551                        // SAFETY: the sender does not deallocate the channel if it switches from
552                        // empty to disconnected so we need to free the allocation
553                        unsafe { dealloc(channel_ptr) };
554
555                        Err(RecvError)
556                    }
557                    _ => unreachable!(),
558                }
559            }
560            // The sender already sent the message.
561            MESSAGE => {
562                // SAFETY: we are in the message state so the message is valid
563                let message = unsafe { channel.take_message() };
564
565                // SAFETY: we are already in the message state so the sender has been forgotten
566                // and it's our job to clean up resources
567                unsafe { dealloc(channel_ptr) };
568
569                Ok(message)
570            }
571            // The sender was dropped before sending anything, or we already received the message.
572            DISCONNECTED => {
573                // SAFETY: the sender does not deallocate the channel if it switches from empty to
574                // disconnected so we need to free the allocation
575                unsafe { dealloc(channel_ptr) };
576
577                Err(RecvError)
578            }
579            // The receiver must have been `Future::poll`ed prior to this call.
580            #[cfg(feature = "async")]
581            RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
582            _ => unreachable!(),
583        }
584    }
585
586    /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
587    /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
588    /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
589    ///
590    /// If a message is returned, the channel is disconnected and any subsequent receive operation
591    /// using this receiver will return an error.
592    ///
593    /// # Panics
594    ///
595    /// Panics if called after this receiver has been polled asynchronously.
596    #[cfg(feature = "std")]
597    pub fn recv_ref(&self) -> Result<T, RecvError> {
598        self.start_recv_ref(RecvError, |channel| {
599            loop {
600                thread::park();
601
602                // ORDERING: we use acquire ordering to synchronize with the write of the message
603                match channel.state.load(Acquire) {
604                    // The sender sent the message while we were parked.
605                    // We take the message and mark the channel disconnected.
606                    MESSAGE => {
607                        // ORDERING: the sender is inactive at this point so we don't need to make
608                        // any reads or writes visible to the sending thread
609                        channel.state.store(DISCONNECTED, Relaxed);
610
611                        // SAFETY: we were just in the message state so the message is valid
612                        break Ok(unsafe { channel.take_message() });
613                    }
614                    // The sender was dropped while we were parked.
615                    DISCONNECTED => break Err(RecvError),
616                    // State did not change, spurious wakeup, park again.
617                    RECEIVING | UNPARKING => (),
618                    _ => unreachable!(),
619                }
620            }
621        })
622    }
623
624    /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
625    ///  * `Ok(message)` if there was a message in the channel before the timeout was reached.
626    ///  * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
627    ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
628    ///    has already been extracted by a previous receive call.
629    ///
630    /// If a message is returned, the channel is disconnected and any subsequent receive operation
631    /// using this receiver will return an error.
632    ///
633    /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
634    /// in the future this falls back to an indefinitely blocking receive operation.
635    ///
636    /// # Panics
637    ///
638    /// Panics if called after this receiver has been polled asynchronously.
639    #[cfg(feature = "std")]
640    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
641        match Instant::now().checked_add(timeout) {
642            Some(deadline) => self.recv_deadline(deadline),
643            None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
644        }
645    }
646
647    /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
648    ///  * `Ok(message)` if there was a message in the channel before the deadline was reached.
649    ///  * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
650    ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
651    ///    has already been extracted by a previous receive call.
652    ///
653    /// If a message is returned, the channel is disconnected and any subsequent receive operation
654    /// using this receiver will return an error.
655    ///
656    /// # Panics
657    ///
658    /// Panics if called after this receiver has been polled asynchronously.
659    #[cfg(feature = "std")]
660    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
661        /// # Safety
662        ///
663        /// If the sender is unparking us after a message send, the message must already have been
664        /// written to the channel and an acquire memory barrier issued before calling this function
665        #[cold]
666        unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
667            loop {
668                thread::park();
669
670                // ORDERING: The callee has already synchronized with any message write
671                match channel.state.load(Relaxed) {
672                    MESSAGE => {
673                        // ORDERING: the sender has been dropped, so this update only
674                        // needs to be visible to us
675                        channel.state.store(DISCONNECTED, Relaxed);
676                        break Ok(channel.take_message());
677                    }
678                    DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
679                    // The sender is still unparking us. We continue on the empty state here since
680                    // the current implementation eagerly sets the state to EMPTY upon timeout.
681                    EMPTY => (),
682                    _ => unreachable!(),
683                }
684            }
685        }
686
687        self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
688            loop {
689                match deadline.checked_duration_since(Instant::now()) {
690                    Some(timeout) => {
691                        thread::park_timeout(timeout);
692
693                        // ORDERING: synchronize with the write of the message
694                        match channel.state.load(Acquire) {
695                            // The sender sent the message while we were parked.
696                            MESSAGE => {
697                                // ORDERING: the sender has been `mem::forget`-ed so this update
698                                // only needs to be visible to us.
699                                channel.state.store(DISCONNECTED, Relaxed);
700
701                                // SAFETY: we either are in the message state or were just in the
702                                // message state
703                                break Ok(unsafe { channel.take_message() });
704                            }
705                            // The sender was dropped while we were parked.
706                            DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
707                            // State did not change, spurious wakeup, park again.
708                            RECEIVING | UNPARKING => (),
709                            _ => unreachable!(),
710                        }
711                    }
712                    None => {
713                        // ORDERING: synchronize with the write of the message
714                        match channel.state.swap(EMPTY, Acquire) {
715                            // We reached the end of the timeout without receiving a message
716                            RECEIVING => {
717                                // SAFETY: we were in the receiving state and are now in the empty
718                                // state, so the sender has not and will not try to read the waker,
719                                // so we have exclusive access to drop it.
720                                unsafe { channel.drop_waker() };
721
722                                break Err(RecvTimeoutError::Timeout);
723                            }
724                            // The sender sent the message while we were parked.
725                            MESSAGE => {
726                                // Same safety and ordering as the Some branch
727
728                                channel.state.store(DISCONNECTED, Relaxed);
729                                break Ok(unsafe { channel.take_message() });
730                            }
731                            // The sender was dropped while we were parked.
732                            DISCONNECTED => {
733                                // ORDERING: we were originally in the disconnected state meaning
734                                // that the sender is inactive and no longer observing the state,
735                                // so we only need to change it back to DISCONNECTED for if the
736                                // receiver is dropped or a recv* method is called again
737                                channel.state.store(DISCONNECTED, Relaxed);
738
739                                break Err(RecvTimeoutError::Disconnected);
740                            }
741                            // The sender sent the message and started unparking us
742                            UNPARKING => {
743                                // We were in the UNPARKING state and are now in the EMPTY state.
744                                // We wait to be properly unparked and to observe if the sender
745                                // sets MESSAGE or DISCONNECTED state.
746                                // SAFETY: The load above has synchronized with any message write.
747                                break unsafe { wait_for_unpark(channel) };
748                            }
749                            _ => unreachable!(),
750                        }
751                    }
752                }
753            }
754        })
755    }
756
757    /// Begins the process of receiving on the channel by reference. If the message is already
758    /// ready, or the sender has disconnected, then this function will return the appropriate
759    /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
760    /// sender has finished or disconnected again, and then will call `finish`. `finish` is
761    /// thus responsible for cleaning up the channel's resources appropriately before it returns,
762    /// such as destroying the waker, for instance.
763    #[cfg(feature = "std")]
764    #[inline]
765    fn start_recv_ref<E>(
766        &self,
767        disconnected_error: E,
768        finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
769    ) -> Result<T, E> {
770        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
771        // is still alive, meaning that even if the sender was dropped then it would have observed
772        // the fact that we're still alive and left the responsibility of deallocating the
773        // channel to us, so `self.channel` is valid
774        let channel = unsafe { self.channel_ptr.as_ref() };
775
776        // ORDERING: synchronize with the write of the message
777        match channel.state.load(Acquire) {
778            // The sender is alive but has not sent anything yet. We prepare to park.
779            EMPTY => {
780                // Conditionally add a delay here to help the tests trigger the edge cases where
781                // the sender manages to be dropped or send something before we are able to store
782                // our waker object in the channel.
783                #[cfg(oneshot_test_delay)]
784                std::thread::sleep(std::time::Duration::from_millis(10));
785
786                // Write our waker instance to the channel.
787                // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
788                // try to access the waker until it sees the state set to RECEIVING below
789                unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
790
791                // ORDERING: we use release ordering on success so the sender can synchronize with
792                // our write of the waker. We use relaxed ordering on failure since the sender does
793                // not need to synchronize with our write and the individual match arms handle any
794                // additional synchronization
795                match channel
796                    .state
797                    .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
798                {
799                    // We stored our waker, now we delegate to the callback to finish the receive
800                    // operation
801                    Ok(_) => finish(channel),
802                    // The sender sent the message while we prepared to finish
803                    Err(MESSAGE) => {
804                        // See comments in `recv` for ordering and safety
805
806                        fence(Acquire);
807
808                        unsafe { channel.drop_waker() };
809
810                        // ORDERING: the sender has been `mem::forget`-ed so this update only
811                        // needs to be visible to us
812                        channel.state.store(DISCONNECTED, Relaxed);
813
814                        // SAFETY: The MESSAGE state tells us there is a correctly initialized
815                        // message
816                        Ok(unsafe { channel.take_message() })
817                    }
818                    // The sender was dropped before sending anything while we prepared to park.
819                    Err(DISCONNECTED) => {
820                        // See comments in `recv` for safety
821                        unsafe { channel.drop_waker() };
822                        Err(disconnected_error)
823                    }
824                    _ => unreachable!(),
825                }
826            }
827            // The sender sent the message. We take the message and mark the channel disconnected.
828            MESSAGE => {
829                // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
830                // visible to us
831                channel.state.store(DISCONNECTED, Relaxed);
832
833                // SAFETY: we are in the message state so the message is valid
834                Ok(unsafe { channel.take_message() })
835            }
836            // The sender was dropped before sending anything, or we already received the message.
837            DISCONNECTED => Err(disconnected_error),
838            // The receiver must have been `Future::poll`ed prior to this call.
839            #[cfg(feature = "async")]
840            RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
841            _ => unreachable!(),
842        }
843    }
844
845    /// Consumes the Receiver, returning a raw pointer to the channel on the heap.
846    ///
847    /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
848    /// to do with the returned pointer is to later reconstruct the Receiver with
849    /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed.
850    pub fn into_raw(self) -> *mut () {
851        let raw = self.channel_ptr.as_ptr() as *mut ();
852        mem::forget(self);
853        raw
854    }
855
856    /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver.
857    ///
858    /// # Safety
859    ///
860    /// This pointer must have come from [`Receiver<T>::into_raw`] with the same message type, `T`.
861    /// At most one Receiver must exist for a channel at any point in time.
862    /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior.
863    pub unsafe fn from_raw(raw: *mut ()) -> Self {
864        Self {
865            channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
866        }
867    }
868}
869
870#[cfg(feature = "async")]
871impl<T> core::future::Future for Receiver<T> {
872    type Output = Result<T, RecvError>;
873
874    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
875        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
876        // is still alive, meaning that even if the sender was dropped then it would have observed
877        // the fact that we're still alive and left the responsibility of deallocating the
878        // channel to us, so `self.channel` is valid
879        let channel = unsafe { self.channel_ptr.as_ref() };
880
881        // ORDERING: we use acquire ordering to synchronize with the store of the message.
882        match channel.state.load(Acquire) {
883            // The sender is alive but has not sent anything yet.
884            EMPTY => {
885                // SAFETY: We can't be in the forbidden states, and no waker in the channel.
886                unsafe { channel.write_async_waker(cx) }
887            }
888            // We were polled again while waiting for the sender. Replace the waker with the new one.
889            RECEIVING => {
890                // ORDERING: We use relaxed ordering on both success and failure since we have not
891                // written anything above that must be released, and the individual match arms
892                // handle any additional synchronization.
893                match channel
894                    .state
895                    .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
896                {
897                    // We successfully changed the state back to EMPTY. Replace the waker.
898                    // This is the most likely branch to be taken, which is why we don't use any
899                    // memory barriers in the compare_exchange above.
900                    Ok(_) => {
901                        // SAFETY: We wrote the waker in a previous call to poll. We do not need
902                        // a memory barrier since the previous write here was by ourselves.
903                        unsafe { channel.drop_waker() };
904                        // SAFETY: We can't be in the forbidden states, and no waker in the channel.
905                        unsafe { channel.write_async_waker(cx) }
906                    }
907                    // The sender sent the message while we prepared to replace the waker.
908                    // We take the message and mark the channel disconnected.
909                    // The sender has already taken the waker.
910                    Err(MESSAGE) => {
911                        // ORDERING: Synchronize with the write of the message. This branch is
912                        // unlikely to be taken.
913                        channel.state.swap(DISCONNECTED, Acquire);
914                        // SAFETY: The state tells us the sender has initialized the message.
915                        Poll::Ready(Ok(unsafe { channel.take_message() }))
916                    }
917                    // The sender was dropped before sending anything while we prepared to park.
918                    // The sender has taken the waker already.
919                    Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
920                    // The sender is currently waking us up.
921                    Err(UNPARKING) => {
922                        // We can't trust that the old waker that the sender has access to
923                        // is honored by the async runtime at this point. So we wake ourselves
924                        // up to get polled instantly again.
925                        cx.waker().wake_by_ref();
926                        Poll::Pending
927                    }
928                    _ => unreachable!(),
929                }
930            }
931            // The sender sent the message.
932            MESSAGE => {
933                // ORDERING: the sender has been dropped so this update only needs to be
934                // visible to us
935                channel.state.store(DISCONNECTED, Relaxed);
936                Poll::Ready(Ok(unsafe { channel.take_message() }))
937            }
938            // The sender was dropped before sending anything, or we already received the message.
939            DISCONNECTED => Poll::Ready(Err(RecvError)),
940            // The sender has observed the RECEIVING state and is currently reading the waker from
941            // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
942            // state. We busy loop here since we know the sender is done very soon.
943            UNPARKING => loop {
944                hint::spin_loop();
945                // ORDERING: The load above has already synchronized with the write of the message.
946                match channel.state.load(Relaxed) {
947                    MESSAGE => {
948                        // ORDERING: the sender has been dropped, so this update only
949                        // needs to be visible to us
950                        channel.state.store(DISCONNECTED, Relaxed);
951                        // SAFETY: We observed the MESSAGE state
952                        break Poll::Ready(Ok(unsafe { channel.take_message() }));
953                    }
954                    DISCONNECTED => break Poll::Ready(Err(RecvError)),
955                    UNPARKING => (),
956                    _ => unreachable!(),
957                }
958            },
959            _ => unreachable!(),
960        }
961    }
962}
963
964impl<T> Drop for Receiver<T> {
965    fn drop(&mut self) {
966        // SAFETY: since the receiving side is still alive the sender would have observed that and
967        // left deallocating the channel allocation to us.
968        let channel = unsafe { self.channel_ptr.as_ref() };
969
970        // Set the channel state to disconnected and read what state the receiver was in
971        match channel.state.swap(DISCONNECTED, Acquire) {
972            // The sender has not sent anything, nor is it dropped.
973            EMPTY => (),
974            // The sender already sent something. We must drop it, and free the channel.
975            MESSAGE => {
976                // SAFETY: we are in the message state so the message is initialized
977                unsafe { channel.drop_message() };
978
979                // SAFETY: see safety comment at top of function
980                unsafe { dealloc(self.channel_ptr) };
981            }
982            // The receiver has been polled.
983            #[cfg(feature = "async")]
984            RECEIVING => {
985                // TODO: figure this out when async is fixed
986                unsafe { channel.drop_waker() };
987            }
988            // The sender was already dropped. We are responsible for freeing the channel.
989            DISCONNECTED => {
990                // SAFETY: see safety comment at top of function
991                unsafe { dealloc(self.channel_ptr) };
992            }
993            _ => unreachable!(),
994        }
995    }
996}
997
998/// All the values that the `Channel::state` field can have during the lifetime of a channel.
999mod states {
1000    // These values are very explicitly chosen so that we can replace some cmpxchg calls with
1001    // fetch_* calls.
1002
1003    /// The initial channel state. Active while both endpoints are still alive, no message has been
1004    /// sent, and the receiver is not receiving.
1005    pub const EMPTY: u8 = 0b011;
1006    /// A message has been sent to the channel, but the receiver has not yet read it.
1007    pub const MESSAGE: u8 = 0b100;
1008    /// No message has yet been sent on the channel, but the receiver is currently receiving.
1009    pub const RECEIVING: u8 = 0b000;
1010    #[cfg(any(feature = "std", feature = "async"))]
1011    pub const UNPARKING: u8 = 0b001;
1012    /// The channel has been closed. This means that either the sender or receiver has been dropped,
1013    /// or the message sent to the channel has already been received. Since this is a oneshot
1014    /// channel, it is disconnected after the one message it is supposed to hold has been
1015    /// transmitted.
1016    pub const DISCONNECTED: u8 = 0b010;
1017}
1018use states::*;
1019
1020/// Internal channel data structure structure. the `channel` method allocates and puts one instance
1021/// of this struct on the heap for each oneshot channel instance. The struct holds:
1022/// * The current state of the channel.
1023/// * The message in the channel. This memory is uninitialized until the message is sent.
1024/// * The waker instance for the thread or task that is currently receiving on this channel.
1025///   This memory is uninitialized until the receiver starts receiving.
1026struct Channel<T> {
1027    state: AtomicU8,
1028    message: UnsafeCell<MaybeUninit<T>>,
1029    waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
1030}
1031
1032impl<T> Channel<T> {
1033    pub fn new() -> Self {
1034        Self {
1035            state: AtomicU8::new(EMPTY),
1036            message: UnsafeCell::new(MaybeUninit::uninit()),
1037            waker: UnsafeCell::new(MaybeUninit::uninit()),
1038        }
1039    }
1040
1041    #[inline(always)]
1042    unsafe fn message(&self) -> &MaybeUninit<T> {
1043        #[cfg(loom)]
1044        {
1045            self.message.with(|ptr| &*ptr)
1046        }
1047
1048        #[cfg(not(loom))]
1049        {
1050            &*self.message.get()
1051        }
1052    }
1053
1054    #[inline(always)]
1055    unsafe fn with_message_mut<F>(&self, op: F)
1056    where
1057        F: FnOnce(&mut MaybeUninit<T>),
1058    {
1059        #[cfg(loom)]
1060        {
1061            self.message.with_mut(|ptr| op(&mut *ptr))
1062        }
1063
1064        #[cfg(not(loom))]
1065        {
1066            op(&mut *self.message.get())
1067        }
1068    }
1069
1070    #[inline(always)]
1071    #[cfg(any(feature = "std", feature = "async"))]
1072    unsafe fn with_waker_mut<F>(&self, op: F)
1073    where
1074        F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
1075    {
1076        #[cfg(loom)]
1077        {
1078            self.waker.with_mut(|ptr| op(&mut *ptr))
1079        }
1080
1081        #[cfg(not(loom))]
1082        {
1083            op(&mut *self.waker.get())
1084        }
1085    }
1086
1087    #[inline(always)]
1088    unsafe fn write_message(&self, message: T) {
1089        self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
1090    }
1091
1092    #[inline(always)]
1093    unsafe fn take_message(&self) -> T {
1094        #[cfg(loom)]
1095        {
1096            self.message.with(|ptr| ptr::read(ptr)).assume_init()
1097        }
1098
1099        #[cfg(not(loom))]
1100        {
1101            ptr::read(self.message.get()).assume_init()
1102        }
1103    }
1104
1105    #[inline(always)]
1106    unsafe fn drop_message(&self) {
1107        self.with_message_mut(|slot| slot.assume_init_drop());
1108    }
1109
1110    #[cfg(any(feature = "std", feature = "async"))]
1111    #[inline(always)]
1112    unsafe fn write_waker(&self, waker: ReceiverWaker) {
1113        self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
1114    }
1115
1116    #[inline(always)]
1117    unsafe fn take_waker(&self) -> ReceiverWaker {
1118        #[cfg(loom)]
1119        {
1120            self.waker.with(|ptr| ptr::read(ptr)).assume_init()
1121        }
1122
1123        #[cfg(not(loom))]
1124        {
1125            ptr::read(self.waker.get()).assume_init()
1126        }
1127    }
1128
1129    #[cfg(any(feature = "std", feature = "async"))]
1130    #[inline(always)]
1131    unsafe fn drop_waker(&self) {
1132        self.with_waker_mut(|slot| slot.assume_init_drop());
1133    }
1134
1135    /// # Safety
1136    ///
1137    /// * `Channel::waker` must not have a waker stored in it when calling this method.
1138    /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
1139    #[cfg(feature = "async")]
1140    unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
1141        // Write our thread instance to the channel.
1142        // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
1143        // try to access the waker until it sees the state set to RECEIVING below
1144        self.write_waker(ReceiverWaker::task_waker(cx));
1145
1146        // ORDERING: we use release ordering on success so the sender can synchronize with
1147        // our write of the waker. We use relaxed ordering on failure since the sender does
1148        // not need to synchronize with our write and the individual match arms handle any
1149        // additional synchronization
1150        match self
1151            .state
1152            .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
1153        {
1154            // We stored our waker, now we return and let the sender wake us up
1155            Ok(_) => Poll::Pending,
1156            // The sender sent the message while we prepared to park.
1157            // We take the message and mark the channel disconnected.
1158            Err(MESSAGE) => {
1159                // ORDERING: Synchronize with the write of the message. This branch is
1160                // unlikely to be taken, so it's likely more efficient to use a fence here
1161                // instead of AcqRel ordering on the compare_exchange operation
1162                fence(Acquire);
1163
1164                // SAFETY: we started in the EMPTY state and the sender switched us to the
1165                // MESSAGE state. This means that it did not take the waker, so we're
1166                // responsible for dropping it.
1167                self.drop_waker();
1168
1169                // ORDERING: sender does not exist, so this update only needs to be visible to us
1170                self.state.store(DISCONNECTED, Relaxed);
1171
1172                // SAFETY: The MESSAGE state tells us there is a correctly initialized message
1173                Poll::Ready(Ok(self.take_message()))
1174            }
1175            // The sender was dropped before sending anything while we prepared to park.
1176            Err(DISCONNECTED) => {
1177                // SAFETY: we started in the EMPTY state and the sender switched us to the
1178                // DISCONNECTED state. This means that it did not take the waker, so we're
1179                // responsible for dropping it.
1180                self.drop_waker();
1181                Poll::Ready(Err(RecvError))
1182            }
1183            _ => unreachable!(),
1184        }
1185    }
1186}
1187
1188enum ReceiverWaker {
1189    /// The receiver is waiting synchronously. Its thread is parked.
1190    #[cfg(feature = "std")]
1191    Thread(thread::Thread),
1192    /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
1193    #[cfg(feature = "async")]
1194    Task(task::Waker),
1195    /// A little hack to not make this enum an uninhibitable type when no features are enabled.
1196    #[cfg(not(any(feature = "async", feature = "std")))]
1197    _Uninhabited,
1198}
1199
1200impl ReceiverWaker {
1201    #[cfg(feature = "std")]
1202    pub fn current_thread() -> Self {
1203        Self::Thread(thread::current())
1204    }
1205
1206    #[cfg(feature = "async")]
1207    pub fn task_waker(cx: &task::Context<'_>) -> Self {
1208        Self::Task(cx.waker().clone())
1209    }
1210
1211    pub fn unpark(self) {
1212        match self {
1213            #[cfg(feature = "std")]
1214            ReceiverWaker::Thread(thread) => thread.unpark(),
1215            #[cfg(feature = "async")]
1216            ReceiverWaker::Task(waker) => waker.wake(),
1217            #[cfg(not(any(feature = "async", feature = "std")))]
1218            ReceiverWaker::_Uninhabited => unreachable!(),
1219        }
1220    }
1221}
1222
1223#[cfg(not(loom))]
1224#[test]
1225fn receiver_waker_size() {
1226    let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
1227        (false, false) => 0,
1228        (false, true) => 16,
1229        (true, false) => 8,
1230        (true, true) => 16,
1231    };
1232    assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
1233}
1234
1235#[cfg(all(feature = "std", feature = "async"))]
1236const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
1237    "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
1238
1239#[inline]
1240pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
1241    drop(Box::from_raw(channel.as_ptr()))
1242}