mea/oneshot/
mod.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This implementation is derived from the `oneshot` crate [1], with significant simplifications
16// since mea needs not support synchronized receiving functions.
17//
18// [1] https://github.com/faern/oneshot/blob/25274e99/src/lib.rs
19
20//! A one-shot channel is used for sending a single message between
21//! asynchronous tasks. The [`channel`] function is used to create a
22//! [`Sender`] and [`Receiver`] handle pair that form the channel.
23//!
24//! The `Sender` handle is used by the producer to send the value.
25//! The `Receiver` handle is used by the consumer to receive the value.
26//!
27//! Each handle can be used on separate tasks.
28//!
29//! Since the `send` method is not async, it can be used anywhere. This includes
30//! sending between two runtimes, and using it from non-async code.
31//!
32//! # Examples
33//!
34//! ```
35//! # #[tokio::main]
36//! # async fn main() {
37//! use mea::oneshot;
38//!
39//! let (tx, rx) = oneshot::channel();
40//!
41//! tokio::spawn(async move {
42//!     if let Err(_) = tx.send(3) {
43//!         println!("the receiver dropped");
44//!     }
45//! });
46//!
47//! match rx.await {
48//!     Ok(v) => println!("got = {:?}", v),
49//!     Err(_) => println!("the sender dropped"),
50//! }
51//! # }
52//! ```
53//!
54//! If the sender is dropped without sending, the receiver will fail with
55//! [`RecvError`]:
56//!
57//! ```
58//! # #[tokio::main]
59//! # async fn main() {
60//! use mea::oneshot;
61//!
62//! let (tx, rx) = oneshot::channel::<u32>();
63//!
64//! tokio::spawn(async move {
65//!     drop(tx);
66//! });
67//!
68//! match rx.await {
69//!     Ok(_) => panic!("This doesn't happen"),
70//!     Err(_) => println!("the sender dropped"),
71//! }
72//! # }
73//! ```
74
75use std::cell::UnsafeCell;
76use std::fmt;
77use std::future::Future;
78use std::future::IntoFuture;
79use std::hint;
80use std::mem;
81use std::mem::MaybeUninit;
82use std::pin::Pin;
83use std::ptr;
84use std::ptr::NonNull;
85use std::sync::atomic::AtomicU8;
86use std::sync::atomic::Ordering;
87use std::sync::atomic::fence;
88use std::task::Context;
89use std::task::Poll;
90use std::task::Waker;
91
92#[cfg(test)]
93mod tests;
94
95/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
96pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
97    let channel_ptr = NonNull::from(Box::leak(Box::new(Channel::new())));
98    (Sender { channel_ptr }, Receiver { channel_ptr })
99}
100
101/// Sends a value to the associated [`Receiver`].
102pub struct Sender<T> {
103    channel_ptr: NonNull<Channel<T>>,
104}
105
106impl<T> fmt::Debug for Sender<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.debug_struct("Sender").finish_non_exhaustive()
109    }
110}
111
112unsafe impl<T: Send> Send for Sender<T> {}
113unsafe impl<T: Sync> Sync for Sender<T> {}
114
115#[inline(always)]
116fn sender_wake_up_receiver<T>(channel: &Channel<T>, state: u8) {
117    // ORDERING: Synchronizes with writing waker to memory, and prevents the
118    // taking of the waker from being ordered before this operation.
119    fence(Ordering::Acquire);
120
121    // Take the waker, but critically do not awake it. If we awake it now, the
122    // receiving thread could still observe the AWAKING state and re-await, meaning
123    // that after we change to the MESSAGE state, it would remain waiting indefinitely
124    // or until a spurious wakeup.
125    //
126    // SAFETY: at this point we are in the AWAKING state, and the receiving thread
127    // does not access the waker while in this state, nor does it free the channel
128    // allocation in this state.
129    let waker = unsafe { channel.take_waker() };
130
131    // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
132    // in the receiving thread, ensuring that both our read of the waker and write of
133    // the message happen-before the taking of the message and freeing of the channel.
134    // Furthermore, we need acquire ordering to ensure awaking the receiver
135    // happens after the channel state is updated.
136    channel.state.swap(state, Ordering::AcqRel);
137
138    // Note: it is possible that between the store above and this statement that
139    // the receiving thread is spuriously awakened, takes the message, and frees
140    // the channel allocation. However, we took ownership of the channel out of
141    // that allocation, and freeing the channel does not drop the waker since the
142    // waker is wrapped in MaybeUninit. Therefore, this data is valid regardless of
143    // whether the receiver has completed by this point.
144    waker.wake();
145}
146
147impl<T> Sender<T> {
148    /// Attempts to send a value on this channel, returning an error contains the message if it
149    /// could not be sent.
150    pub fn send(self, message: T) -> Result<(), SendError<T>> {
151        let channel_ptr = self.channel_ptr;
152
153        // Do not run the Drop implementation if send was called, any cleanup happens below.
154        mem::forget(self);
155
156        // SAFETY: The channel exists on the heap for the entire duration of this method, and we
157        // only ever acquire shared references to it. Note that if the receiver disconnects it
158        // does not free the channel.
159        let channel = unsafe { channel_ptr.as_ref() };
160
161        // Write the message into the channel on the heap.
162        //
163        // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
164        // state, and since we are responsible for setting that state, we can guarantee that we have
165        // exclusive access to this memory location to perform this write.
166        unsafe { channel.write_message(message) };
167
168        // Update the state to signal there is a message on the channel:
169        //
170        // * EMPTY + 1 = MESSAGE
171        // * RECEIVING + 1 = AWAKING
172        // * DISCONNECTED + 1 = EMPTY (invalid), however this state is never observed
173        //
174        // ORDERING: we use release ordering to ensure writing the message is visible to the
175        // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
176        // and thus we do not need an acquire ordering. The RECEIVING branch manages synchronization
177        // independent of this operation.
178        match channel.state.fetch_add(1, Ordering::Release) {
179            // The receiver is alive and has not started waiting. Send done.
180            EMPTY => Ok(()),
181            // The receiver is waiting. Wake it up so it can return the message.
182            RECEIVING => {
183                sender_wake_up_receiver(channel, MESSAGE);
184                Ok(())
185            }
186            // The receiver was already dropped. The error is responsible for freeing the channel.
187            //
188            // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
189            // we can transfer exclusive ownership of the channel's resources to the error.
190            // Moreover, since we just placed the message in the channel, the channel contains a
191            // valid message.
192            DISCONNECTED => Err(SendError { channel_ptr }),
193            state => unreachable!("unexpected channel state: {}", state),
194        }
195    }
196
197    /// Returns true if the associated [`Receiver`] has been dropped.
198    ///
199    /// If true is returned, a future call to send is guaranteed to return an error.
200    pub fn is_closed(&self) -> bool {
201        // SAFETY: The channel exists on the heap for the entire duration of this method, and we
202        // only ever acquire shared references to it. Note that if the receiver disconnects it
203        // does not free the channel.
204        let channel = unsafe { self.channel_ptr.as_ref() };
205
206        // ORDERING: We *chose* a Relaxed ordering here as it sufficient to enforce the method's
207        // contract: "if true is returned, a future call to send is guaranteed to return an error."
208        //
209        // Once true has been observed, it will remain true. However, if false is observed,
210        // the receiver might have just disconnected but this thread has not observed it yet.
211        matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
212    }
213}
214
215impl<T> Drop for Sender<T> {
216    fn drop(&mut self) {
217        // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
218        // DISCONNECTED states.
219        //
220        // * If we are in the MESSAGE state, then we called mem::forget(self), so we should
221        // not be in this function call.
222        // * If we are in the DISCONNECTED state, then the receiver either received a MESSAGE
223        // so this statement is unreachable, or was dropped and observed that our side was still
224        // alive, and thus didn't free the channel.
225        let channel = unsafe { self.channel_ptr.as_ref() };
226
227        // Update the channel state to disconnected:
228        //
229        // * EMPTY ^ 001 = DISCONNECTED
230        // * RECEIVING ^ 001 = AWAKING
231        // * DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
232        //
233        // ORDERING: we need not release ordering here since there are no modifications we
234        // need to make visible to other thread, and the Err(RECEIVING) branch handles
235        // synchronization independent of this fetch_xor
236        match channel.state.fetch_xor(0b001, Ordering::Relaxed) {
237            // The receiver has not started waiting, nor is it dropped.
238            EMPTY => {}
239            // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
240            RECEIVING => sender_wake_up_receiver(channel, DISCONNECTED),
241            // The receiver was already dropped. We are responsible for freeing the channel.
242            DISCONNECTED => {
243                // SAFETY: when the receiver switches the state to DISCONNECTED they have received
244                // the message or will no longer be trying to receive the message, and have
245                // observed that the sender is still alive, meaning that we are responsible for
246                // freeing the channel allocation.
247                unsafe { dealloc(self.channel_ptr) };
248            }
249            state => unreachable!("unexpected channel state: {}", state),
250        }
251    }
252}
253
254/// Receives a value from the associated [`Sender`].
255pub struct Receiver<T> {
256    channel_ptr: NonNull<Channel<T>>,
257}
258
259impl<T> fmt::Debug for Receiver<T> {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        f.debug_struct("Receiver").finish_non_exhaustive()
262    }
263}
264
265unsafe impl<T: Send> Send for Receiver<T> {}
266
267impl<T> IntoFuture for Receiver<T> {
268    type Output = Result<T, RecvError>;
269
270    type IntoFuture = Recv<T>;
271
272    fn into_future(self) -> Self::IntoFuture {
273        let Receiver { channel_ptr } = self;
274        // Do not run our Drop implementation, since the receiver lives on as the new future.
275        mem::forget(self);
276        Recv { channel_ptr }
277    }
278}
279
280impl<T> Receiver<T> {
281    /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
282    /// the message has already been received.
283    ///
284    /// If `true` is returned, all future calls to receive the message are guaranteed to return
285    /// [`RecvError`]. And future calls to this method is guaranteed to also return `true`.
286    pub fn is_closed(&self) -> bool {
287        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
288        // is still alive, meaning that even if the sender was dropped then it would have observed
289        // the fact that we are still alive and left the responsibility of deallocating the
290        // channel to us, so `self.channel` is valid
291        let channel = unsafe { self.channel_ptr.as_ref() };
292
293        // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
294        // enforce the method's contract.
295        //
296        // Once true has been observed, it will remain true. However, if false is observed,
297        // the sender might have just disconnected but this thread has not observed it yet.
298        matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
299    }
300
301    /// Returns true if there is a message in the channel, ready to be received.
302    ///
303    /// If `true` is returned, the next call to receive the message is guaranteed to return
304    /// the message immediately.
305    pub fn has_message(&self) -> bool {
306        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
307        // is still alive, meaning that even if the sender was dropped then it would have observed
308        // the fact that we are still alive and left the responsibility of deallocating the
309        // channel to us, so `self.channel` is valid
310        let channel = unsafe { self.channel_ptr.as_ref() };
311
312        // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
313        // before this one. This upholds the contract that if true is returned, the next call to
314        // receive the message is guaranteed to also observe the `MESSAGE` state and return the
315        // message immediately.
316        matches!(channel.state.load(Ordering::Acquire), MESSAGE)
317    }
318
319    /// Checks if there is a message in the channel without blocking. Returns:
320    ///
321    /// * `Ok(message)` if there was a message in the channel.
322    /// * `Err(TryRecvError::Empty)` if the [`Sender`] is alive, but has not yet sent a message.
323    /// * `Err(TryRecvError::Disconnected)` if the [`Sender`] was dropped before sending anything or
324    ///   if the message has already been extracted by a previous `try_recv` call.
325    ///
326    /// If a message is returned, the channel is disconnected and any subsequent receive operation
327    /// using this receiver will return an error: [`TryRecvError::Disconnected`] for `try_recv`,
328    /// or [`RecvError::Disconnected`] for [`recv`](Receiver::into_future).
329    pub fn try_recv(&self) -> Result<T, TryRecvError> {
330        // SAFETY: The channel will not be freed while this method is still running.
331        let channel = unsafe { self.channel_ptr.as_ref() };
332
333        // ORDERING: we use acquire ordering to synchronize with the store of the message.
334        match channel.state.load(Ordering::Acquire) {
335            EMPTY => Err(TryRecvError::Empty),
336            DISCONNECTED => Err(TryRecvError::Disconnected),
337            MESSAGE => {
338                // It is okay to break up the load and store since once we are in the MESSAGE state,
339                // the sender no longer modifies the state
340                //
341                // ORDERING: at this point the sender has done its job and is no longer active, so
342                // we need not make any side effects visible to it.
343                channel.state.store(DISCONNECTED, Ordering::Relaxed);
344
345                // SAFETY: we are in the MESSAGE state so the message is present
346                Ok(unsafe { channel.take_message() })
347            }
348            state => unreachable!("unexpected channel state: {}", state),
349        }
350    }
351}
352
353impl<T> Drop for Receiver<T> {
354    fn drop(&mut self) {
355        // SAFETY: since the receiving side is still alive the sender would have observed that and
356        // left deallocating the channel allocation to us.
357        let channel = unsafe { self.channel_ptr.as_ref() };
358
359        // Set the channel state to disconnected and read what state the receiver was in.
360        match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
361            // The sender has not sent anything, nor is it dropped.
362            EMPTY => {}
363            // The sender already sent something. We must drop it, and free the channel.
364            MESSAGE => {
365                unsafe { channel.drop_message() };
366                unsafe { dealloc(self.channel_ptr) };
367            }
368            // The sender was already dropped. We are responsible for freeing the channel.
369            DISCONNECTED => {
370                unsafe { dealloc(self.channel_ptr) };
371            }
372            // NOTE: the receiver, unless transformed into a future, will never see the
373            // RECEIVING or AWAKING states, so we can ignore them here.
374            state => unreachable!("unexpected channel state: {}", state),
375        }
376    }
377}
378
379/// A future that completes when the message is sent from the associated [`Sender`], or the
380/// [`Sender`] is dropped before sending a message.
381pub struct Recv<T> {
382    channel_ptr: NonNull<Channel<T>>,
383}
384
385impl<T> fmt::Debug for Recv<T> {
386    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387        f.debug_struct("Recv").finish_non_exhaustive()
388    }
389}
390
391unsafe impl<T: Send> Send for Recv<T> {}
392
393fn recv_awaken<T>(channel: &Channel<T>) -> Poll<Result<T, RecvError>> {
394    loop {
395        hint::spin_loop();
396
397        // ORDERING: The load above has already synchronized with writing message.
398        match channel.state.load(Ordering::Relaxed) {
399            AWAKING => {}
400            DISCONNECTED => break Poll::Ready(Err(RecvError::Disconnected)),
401            MESSAGE => {
402                // ORDERING: the sender has been dropped, so this update only
403                // needs to be visible to us.
404                channel.state.store(DISCONNECTED, Ordering::Relaxed);
405                // SAFETY: We observed the MESSAGE state.
406                break Poll::Ready(Ok(unsafe { channel.take_message() }));
407            }
408            state => unreachable!("unexpected channel state: {}", state),
409        }
410    }
411}
412
413impl<T> Future for Recv<T> {
414    type Output = Result<T, RecvError>;
415
416    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
417        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
418        // is still alive, meaning that even if the sender was dropped then it would have observed
419        // the fact that we are still alive and left the responsibility of deallocating the
420        // channel to us, so `self.channel` is valid
421        let channel = unsafe { self.channel_ptr.as_ref() };
422
423        // ORDERING: we use acquire ordering to synchronize with the store of the message.
424        match channel.state.load(Ordering::Acquire) {
425            // The sender is alive but has not sent anything yet.
426            EMPTY => {
427                let waker = cx.waker().clone();
428                // SAFETY: We can not be in the forbidden states, and no waker in the channel.
429                unsafe { channel.write_waker(waker) }
430            }
431            // The sender sent the message.
432            MESSAGE => {
433                // ORDERING: the sender has been dropped so this update only needs to be
434                // visible to us.
435                channel.state.store(DISCONNECTED, Ordering::Relaxed);
436                Poll::Ready(Ok(unsafe { channel.take_message() }))
437            }
438            // We were polled again while waiting for the sender. Replace the waker with the new
439            // one.
440            RECEIVING => {
441                // ORDERING: We use relaxed ordering on both success and failure since we have not
442                // written anything above that must be released, and the individual match arms
443                // handle any additional synchronization.
444                match channel.state.compare_exchange(
445                    RECEIVING,
446                    EMPTY,
447                    Ordering::Relaxed,
448                    Ordering::Relaxed,
449                ) {
450                    // We successfully changed the state back to EMPTY.
451                    //
452                    // This is the most likely branch to be taken, which is why we do not use any
453                    // memory barriers in the compare_exchange above.
454                    Ok(_) => {
455                        let waker = cx.waker().clone();
456
457                        // SAFETY: We wrote the waker in a previous call to poll. We do not need
458                        // a memory barrier since the previous write here was by ourselves.
459                        unsafe { channel.drop_waker() };
460
461                        // SAFETY: We can not be in the forbidden states, and no waker in the
462                        // channel.
463                        unsafe { channel.write_waker(waker) }
464                    }
465                    // The sender sent the message while we prepared to replace the waker.
466                    // We take the message and mark the channel disconnected.
467                    // The sender has already taken the waker.
468                    Err(MESSAGE) => {
469                        // ORDERING: Synchronize with writing message. This branch is
470                        // unlikely to be taken.
471                        channel.state.swap(DISCONNECTED, Ordering::Acquire);
472
473                        // SAFETY: The state tells us the sender has initialized the message.
474                        Poll::Ready(Ok(unsafe { channel.take_message() }))
475                    }
476                    // The sender is currently waking us up.
477                    Err(AWAKING) => recv_awaken(channel),
478                    // The sender was dropped before sending anything while we prepared to park.
479                    // The sender has taken the waker already.
480                    Err(DISCONNECTED) => Poll::Ready(Err(RecvError::Disconnected)),
481                    Err(state) => unreachable!("unexpected channel state: {}", state),
482                }
483            }
484            // The sender has observed the RECEIVING state and is currently reading the waker from
485            // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
486            // state. We busy loop here since we know the sender is done very soon.
487            AWAKING => recv_awaken(channel),
488            // The sender was dropped before sending anything.
489            DISCONNECTED => Poll::Ready(Err(RecvError::Disconnected)),
490            state => unreachable!("unexpected channel state: {}", state),
491        }
492    }
493}
494
495impl<T> Drop for Recv<T> {
496    fn drop(&mut self) {
497        // SAFETY: since the receiving side is still alive the sender would have observed that and
498        // left deallocating the channel allocation to us.
499        let channel = unsafe { self.channel_ptr.as_ref() };
500
501        // Set the channel state to disconnected and read what state the receiver was in.
502        match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
503            // The sender has not sent anything, nor is it dropped.
504            EMPTY => {}
505            // The sender already sent something. We must drop it, and free the channel.
506            MESSAGE => {
507                unsafe { channel.drop_message() };
508                unsafe { dealloc(self.channel_ptr) };
509            }
510            // The receiver has been polled. We must drop the waker.
511            RECEIVING => {
512                unsafe { channel.drop_waker() };
513            }
514            // The sender was already dropped. We are responsible for freeing the channel.
515            DISCONNECTED => {
516                // SAFETY: see safety comment at top of function.
517                unsafe { dealloc(self.channel_ptr) };
518            }
519            // This receiver was previously polled, so the channel was in the RECEIVING state.
520            // But the sender has observed the RECEIVING state and is currently reading the waker
521            // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED
522            // state. We busy loop here since we know the sender is done very soon.
523            AWAKING => {
524                loop {
525                    hint::spin_loop();
526
527                    // ORDERING: The swap above has already synchronized with writing message.
528                    match channel.state.load(Ordering::Relaxed) {
529                        AWAKING => {}
530                        DISCONNECTED => break,
531                        MESSAGE => {
532                            // SAFETY: we are in the message state so the message is initialized.
533                            unsafe { channel.drop_message() };
534                            break;
535                        }
536                        state => unreachable!("unexpected channel state: {}", state),
537                    }
538                }
539                unsafe { dealloc(self.channel_ptr) };
540            }
541            state => unreachable!("unexpected channel state: {}", state),
542        }
543    }
544}
545
546/// Internal channel data structure.
547///
548/// The [`channel`] method allocates and puts one instance of this struct on the heap for each
549/// oneshot channel instance. The struct holds:
550///
551/// * The current state of the channel.
552/// * The message in the channel. This memory is uninitialized until the message is sent.
553/// * The waker instance for the task that is currently receiving on this channel. This memory is
554///   uninitialized until the receiver starts receiving.
555struct Channel<T> {
556    state: AtomicU8,
557    message: UnsafeCell<MaybeUninit<T>>,
558    waker: UnsafeCell<MaybeUninit<Waker>>,
559}
560
561impl<T> Channel<T> {
562    const fn new() -> Self {
563        Self {
564            state: AtomicU8::new(EMPTY),
565            message: UnsafeCell::new(MaybeUninit::uninit()),
566            waker: UnsafeCell::new(MaybeUninit::uninit()),
567        }
568    }
569
570    #[inline(always)]
571    unsafe fn message(&self) -> &MaybeUninit<T> {
572        unsafe { &*self.message.get() }
573    }
574
575    #[inline(always)]
576    unsafe fn write_message(&self, message: T) {
577        unsafe {
578            let slot = &mut *self.message.get();
579            slot.as_mut_ptr().write(message);
580        }
581    }
582
583    #[inline(always)]
584    unsafe fn drop_message(&self) {
585        unsafe {
586            let slot = &mut *self.message.get();
587            slot.assume_init_drop();
588        }
589    }
590
591    #[inline(always)]
592    unsafe fn take_message(&self) -> T {
593        unsafe { ptr::read(self.message.get()).assume_init() }
594    }
595
596    /// # Safety
597    ///
598    /// * The `waker` field must not have a waker stored when calling this method.
599    /// * The `state` must not be in the RECEIVING state when calling this method.
600    unsafe fn write_waker(&self, waker: Waker) -> Poll<Result<T, RecvError>> {
601        // Write the waker instance to the channel.
602        //
603        // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
604        // try to access the waker until it sees the state set to RECEIVING below.
605        unsafe {
606            let slot = &mut *self.waker.get();
607            slot.as_mut_ptr().write(waker);
608        }
609
610        // ORDERING: we use release ordering on success so the sender can synchronize with
611        // our write of the waker. We use relaxed ordering on failure since the sender does
612        // not need to synchronize with our write and the individual match arms handle any
613        // additional synchronization
614        match self
615            .state
616            .compare_exchange(EMPTY, RECEIVING, Ordering::Release, Ordering::Relaxed)
617        {
618            // We stored our waker, now we return and let the sender wake us up.
619            Ok(_) => Poll::Pending,
620            // The sender sent the message while we prepared to await.
621            // We take the message and mark the channel disconnected.
622            Err(MESSAGE) => {
623                // ORDERING: Synchronize with writing message. This branch is unlikely to be
624                // taken, so it is likely more efficient to use a fence here
625                // instead of AcqRel ordering on the compare_exchange
626                // operation.
627                fence(Ordering::Acquire);
628
629                // SAFETY: we started in the EMPTY state and the sender switched us to the
630                // MESSAGE state. This means that it did not take the waker, so we're
631                // responsible for dropping it.
632                unsafe { self.drop_waker() };
633
634                // ORDERING: sender does not exist, so this update only needs to be visible to
635                // us.
636                self.state.store(DISCONNECTED, Ordering::Relaxed);
637
638                // SAFETY: The MESSAGE state tells us there is a correctly initialized message.
639                Poll::Ready(Ok(unsafe { self.take_message() }))
640            }
641            // The sender was dropped before sending anything while we prepared to await.
642            Err(DISCONNECTED) => {
643                // SAFETY: we started in the EMPTY state and the sender switched us to the
644                // DISCONNECTED state. This means that it did not take the waker, so we are
645                // responsible for dropping it.
646                unsafe { self.drop_waker() };
647                Poll::Ready(Err(RecvError::Disconnected))
648            }
649            Err(state) => unreachable!("unexpected channel state: {}", state),
650        }
651    }
652
653    #[inline(always)]
654    unsafe fn drop_waker(&self) {
655        unsafe {
656            let slot = &mut *self.waker.get();
657            slot.assume_init_drop();
658        }
659    }
660
661    #[inline(always)]
662    unsafe fn take_waker(&self) -> Waker {
663        unsafe { ptr::read(self.waker.get()).assume_init() }
664    }
665}
666
667unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
668    unsafe { drop(Box::from_raw(channel.as_ptr())) }
669}
670
671/// An error returned when trying to send on a closed channel. Returned from
672/// [`Sender::send`] if the corresponding [`Receiver`] has already been dropped.
673///
674/// The message that could not be sent can be retrieved again with [`SendError::into_inner`].
675pub struct SendError<T> {
676    channel_ptr: NonNull<Channel<T>>,
677}
678
679unsafe impl<T: Send> Send for SendError<T> {}
680unsafe impl<T: Sync> Sync for SendError<T> {}
681
682impl<T> SendError<T> {
683    /// Get a reference to the message that failed to be sent.
684    pub fn as_inner(&self) -> &T {
685        unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
686    }
687
688    /// Consumes the error and returns the message that failed to be sent.
689    pub fn into_inner(self) -> T {
690        let channel_ptr = self.channel_ptr;
691
692        // Do not run destructor if we consumed ourselves. Freeing happens below.
693        mem::forget(self);
694
695        // SAFETY: we have ownership of the channel
696        let channel: &Channel<T> = unsafe { channel_ptr.as_ref() };
697
698        // SAFETY: we know that the message is initialized according to the safety requirements of
699        // `new`
700        let message = unsafe { channel.take_message() };
701
702        // SAFETY: we own the channel
703        unsafe { dealloc(channel_ptr) };
704
705        message
706    }
707}
708
709impl<T> Drop for SendError<T> {
710    fn drop(&mut self) {
711        // SAFETY: we have ownership of the channel and require that the message is initialized
712        // upon construction
713        unsafe {
714            self.channel_ptr.as_ref().drop_message();
715            dealloc(self.channel_ptr);
716        }
717    }
718}
719
720impl<T> fmt::Display for SendError<T> {
721    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
722        "sending on a closed channel".fmt(f)
723    }
724}
725
726impl<T> fmt::Debug for SendError<T> {
727    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728        write!(f, "SendError<{}>(..)", stringify!(T))
729    }
730}
731
732impl<T> std::error::Error for SendError<T> {}
733
734/// Error returned by [`Receiver::try_recv`].
735#[derive(Debug, Clone, Eq, PartialEq)]
736pub enum TryRecvError {
737    /// This channel is currently empty, but the sender has not yet disconnected, so data may yet
738    /// become available.
739    Empty,
740    /// The sender has become disconnected, and there will never be any more data received on it.
741    Disconnected,
742}
743
744impl fmt::Display for TryRecvError {
745    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
746        match self {
747            TryRecvError::Empty => write!(f, "receiving on an empty channel"),
748            TryRecvError::Disconnected => write!(f, "receiving on a closed channel"),
749        }
750    }
751}
752
753impl std::error::Error for TryRecvError {}
754
755/// An error returned when awaiting the message via [`Receiver`].
756///
757/// This error indicates that the corresponding [`Sender`] was dropped before sending any message.
758/// Note that if a message was already received (e.g., via [`Receiver::try_recv`]), subsequent
759/// `try_recv` calls will return [`TryRecvError::Disconnected`] instead.
760#[derive(Debug, Clone, Eq, PartialEq)]
761pub enum RecvError {
762    /// The sender has become disconnected, and there will never be any more data received on it.
763    Disconnected,
764}
765
766impl fmt::Display for RecvError {
767    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768        write!(f, "receiving on a closed channel")
769    }
770}
771
772impl std::error::Error for RecvError {}
773
774/// The initial channel state. Active while both endpoints are still alive, no message has been
775/// sent, and the receiver is not receiving.
776const EMPTY: u8 = 0b011;
777/// A message has been sent to the channel, but the receiver has not yet read it.
778const MESSAGE: u8 = 0b100;
779/// No message has yet been sent on the channel, but the receiver future ([`Recv`]) is currently
780/// receiving.
781const RECEIVING: u8 = 0b000;
782/// A message is sending to the channel, or the channel is closing. The receiver future ([`Recv`])
783/// is currently being awakened.
784const AWAKING: u8 = 0b001;
785/// The channel has been closed. This means that either the sender or receiver has been dropped,
786/// or the message sent to the channel has already been received.
787const DISCONNECTED: u8 = 0b010;