mea/oneshot/
mod.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This implementation is derived from the `oneshot` crate [1], with significant simplifications
16// since mea needs not support synchronized receiving functions.
17//
18// [1] https://github.com/faern/oneshot/blob/25274e99/src/lib.rs
19
20//! A one-shot channel is used for sending a single message between
21//! asynchronous tasks. The [`channel`] function is used to create a
22//! [`Sender`] and [`Receiver`] handle pair that form the channel.
23//!
24//! The `Sender` handle is used by the producer to send the value.
25//! The `Receiver` handle is used by the consumer to receive the value.
26//!
27//! Each handle can be used on separate tasks.
28//!
29//! Since the `send` method is not async, it can be used anywhere. This includes
30//! sending between two runtimes, and using it from non-async code.
31//!
32//! # Examples
33//!
34//! ```
35//! # #[tokio::main]
36//! # async fn main() {
37//! use mea::oneshot;
38//!
39//! let (tx, rx) = oneshot::channel();
40//!
41//! tokio::spawn(async move {
42//!     if let Err(_) = tx.send(3) {
43//!         println!("the receiver dropped");
44//!     }
45//! });
46//!
47//! match rx.await {
48//!     Ok(v) => println!("got = {:?}", v),
49//!     Err(_) => println!("the sender dropped"),
50//! }
51//! # }
52//! ```
53//!
54//! If the sender is dropped without sending, the receiver will fail with
55//! [`RecvError`]:
56//!
57//! ```
58//! # #[tokio::main]
59//! # async fn main() {
60//! use mea::oneshot;
61//!
62//! let (tx, rx) = oneshot::channel::<u32>();
63//!
64//! tokio::spawn(async move {
65//!     drop(tx);
66//! });
67//!
68//! match rx.await {
69//!     Ok(_) => panic!("This doesn't happen"),
70//!     Err(_) => println!("the sender dropped"),
71//! }
72//! # }
73//! ```
74
75use std::cell::UnsafeCell;
76use std::fmt;
77use std::future::Future;
78use std::future::IntoFuture;
79use std::hint;
80use std::mem;
81use std::mem::MaybeUninit;
82use std::pin::Pin;
83use std::ptr;
84use std::ptr::NonNull;
85use std::sync::atomic::AtomicU8;
86use std::sync::atomic::Ordering;
87use std::sync::atomic::fence;
88use std::task::Context;
89use std::task::Poll;
90use std::task::Waker;
91
92#[cfg(test)]
93mod tests;
94
95/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
96pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
97    let channel_ptr = NonNull::from(Box::leak(Box::new(Channel::new())));
98    (Sender { channel_ptr }, Receiver { channel_ptr })
99}
100
101/// Sends a value to the associated [`Receiver`].
102#[derive(Debug)]
103pub struct Sender<T> {
104    channel_ptr: NonNull<Channel<T>>,
105}
106
107unsafe impl<T: Send> Send for Sender<T> {}
108unsafe impl<T: Sync> Sync for Sender<T> {}
109
110#[inline(always)]
111fn sender_wake_up_receiver<T>(channel: &Channel<T>, state: u8) {
112    // ORDERING: Synchronizes with writing waker to memory, and prevents the
113    // taking of the waker from being ordered before this operation.
114    fence(Ordering::Acquire);
115
116    // Take the waker, but critically do not awake it. If we awake it now, the
117    // receiving thread could still observe the AWAKING state and re-await, meaning
118    // that after we change to the MESSAGE state, it would remain waiting indefinitely
119    // or until a spurious wakeup.
120    //
121    // SAFETY: at this point we are in the AWAKING state, and the receiving thread
122    // does not access the waker while in this state, nor does it free the channel
123    // allocation in this state.
124    let waker = unsafe { channel.take_waker() };
125
126    // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
127    // in the receiving thread, ensuring that both our read of the waker and write of
128    // the message happen-before the taking of the message and freeing of the channel.
129    // Furthermore, we need acquire ordering to ensure awaking the receiver
130    // happens after the channel state is updated.
131    channel.state.swap(state, Ordering::AcqRel);
132
133    // Note: it is possible that between the store above and this statement that
134    // the receiving thread is spuriously awakened, takes the message, and frees
135    // the channel allocation. However, we took ownership of the channel out of
136    // that allocation, and freeing the channel does not drop the waker since the
137    // waker is wrapped in MaybeUninit. Therefore, this data is valid regardless of
138    // whether the receiver has completed by this point.
139    waker.wake();
140}
141
142impl<T> Sender<T> {
143    /// Attempts to send a value on this channel, returning an error contains the message if it
144    /// could not be sent.
145    pub fn send(self, message: T) -> Result<(), SendError<T>> {
146        let channel_ptr = self.channel_ptr;
147
148        // Do not run the Drop implementation if send was called, any cleanup happens below.
149        mem::forget(self);
150
151        // SAFETY: The channel exists on the heap for the entire duration of this method, and we
152        // only ever acquire shared references to it. Note that if the receiver disconnects it
153        // does not free the channel.
154        let channel = unsafe { channel_ptr.as_ref() };
155
156        // Write the message into the channel on the heap.
157        //
158        // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
159        // state, and since we are responsible for setting that state, we can guarantee that we have
160        // exclusive access to this memory location to perform this write.
161        unsafe { channel.write_message(message) };
162
163        // Update the state to signal there is a message on the channel:
164        //
165        // * EMPTY + 1 = MESSAGE
166        // * RECEIVING + 1 = AWAKING
167        // * DISCONNECTED + 1 = EMPTY (invalid), however this state is never observed
168        //
169        // ORDERING: we use release ordering to ensure writing the message is visible to the
170        // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
171        // and thus we do not need an acquire ordering. The RECEIVING branch manages synchronization
172        // independent of this operation.
173        match channel.state.fetch_add(1, Ordering::Release) {
174            // The receiver is alive and has not started waiting. Send done.
175            EMPTY => Ok(()),
176            // The receiver is waiting. Wake it up so it can return the message.
177            RECEIVING => {
178                sender_wake_up_receiver(channel, MESSAGE);
179                Ok(())
180            }
181            // The receiver was already dropped. The error is responsible for freeing the channel.
182            //
183            // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
184            // we can transfer exclusive ownership of the channel's resources to the error.
185            // Moreover, since we just placed the message in the channel, the channel contains a
186            // valid message.
187            DISCONNECTED => Err(SendError { channel_ptr }),
188            state => unreachable!("unexpected channel state: {}", state),
189        }
190    }
191
192    /// Returns true if the associated [`Receiver`] has been dropped.
193    ///
194    /// If true is returned, a future call to send is guaranteed to return an error.
195    pub fn is_closed(&self) -> bool {
196        // SAFETY: The channel exists on the heap for the entire duration of this method, and we
197        // only ever acquire shared references to it. Note that if the receiver disconnects it
198        // does not free the channel.
199        let channel = unsafe { self.channel_ptr.as_ref() };
200
201        // ORDERING: We *chose* a Relaxed ordering here as it sufficient to enforce the method's
202        // contract: "if true is returned, a future call to send is guaranteed to return an error."
203        //
204        // Once true has been observed, it will remain true. However, if false is observed,
205        // the receiver might have just disconnected but this thread has not observed it yet.
206        matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
207    }
208}
209
210impl<T> Drop for Sender<T> {
211    fn drop(&mut self) {
212        // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
213        // DISCONNECTED states.
214        //
215        // * If we are in the MESSAGE state, then we called mem::forget(self), so we should
216        // not be in this function call.
217        // * If we are in the DISCONNECTED state, then the receiver either received a MESSAGE
218        // so this statement is unreachable, or was dropped and observed that our side was still
219        // alive, and thus didn't free the channel.
220        let channel = unsafe { self.channel_ptr.as_ref() };
221
222        // Update the channel state to disconnected:
223        //
224        // * EMPTY ^ 001 = DISCONNECTED
225        // * RECEIVING ^ 001 = AWAKING
226        // * DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
227        //
228        // ORDERING: we need not release ordering here since there are no modifications we
229        // need to make visible to other thread, and the Err(RECEIVING) branch handles
230        // synchronization independent of this fetch_xor
231        match channel.state.fetch_xor(0b001, Ordering::Relaxed) {
232            // The receiver has not started waiting, nor is it dropped.
233            EMPTY => {}
234            // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
235            RECEIVING => sender_wake_up_receiver(channel, DISCONNECTED),
236            // The receiver was already dropped. We are responsible for freeing the channel.
237            DISCONNECTED => {
238                // SAFETY: when the receiver switches the state to DISCONNECTED they have received
239                // the message or will no longer be trying to receive the message, and have
240                // observed that the sender is still alive, meaning that we are responsible for
241                // freeing the channel allocation.
242                unsafe { dealloc(self.channel_ptr) };
243            }
244            state => unreachable!("unexpected channel state: {}", state),
245        }
246    }
247}
248
249/// Receives a value from the associated [`Sender`].
250#[derive(Debug)]
251pub struct Receiver<T> {
252    channel_ptr: NonNull<Channel<T>>,
253}
254
255unsafe impl<T: Send> Send for Receiver<T> {}
256
257impl<T> IntoFuture for Receiver<T> {
258    type Output = Result<T, RecvError>;
259
260    type IntoFuture = Recv<T>;
261
262    fn into_future(self) -> Self::IntoFuture {
263        let Receiver { channel_ptr } = self;
264        // Do not run our Drop implementation, since the receiver lives on as the new future.
265        mem::forget(self);
266        Recv { channel_ptr }
267    }
268}
269
270impl<T> Receiver<T> {
271    /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
272    /// the message has already been received.
273    ///
274    /// If `true` is returned, all future calls to receive the message are guaranteed to return
275    /// [`RecvError`]. And future calls to this method is guaranteed to also return `true`.
276    pub fn is_closed(&self) -> bool {
277        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
278        // is still alive, meaning that even if the sender was dropped then it would have observed
279        // the fact that we are still alive and left the responsibility of deallocating the
280        // channel to us, so `self.channel` is valid
281        let channel = unsafe { self.channel_ptr.as_ref() };
282
283        // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
284        // enforce the method's contract.
285        //
286        // Once true has been observed, it will remain true. However, if false is observed,
287        // the sender might have just disconnected but this thread has not observed it yet.
288        matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
289    }
290
291    /// Returns true if there is a message in the channel, ready to be received.
292    ///
293    /// If `true` is returned, the next call to receive the message is guaranteed to return
294    /// the message immediately.
295    pub fn has_message(&self) -> bool {
296        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
297        // is still alive, meaning that even if the sender was dropped then it would have observed
298        // the fact that we are still alive and left the responsibility of deallocating the
299        // channel to us, so `self.channel` is valid
300        let channel = unsafe { self.channel_ptr.as_ref() };
301
302        // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
303        // before this one. This upholds the contract that if true is returned, the next call to
304        // receive the message is guaranteed to also observe the `MESSAGE` state and return the
305        // message immediately.
306        matches!(channel.state.load(Ordering::Acquire), MESSAGE)
307    }
308
309    /// Checks if there is a message in the channel without blocking. Returns:
310    ///
311    /// * `Ok(Some(message))` if there was a message in the channel.
312    /// * `Ok(None)` if the [`Sender`] is alive, but has not yet sent a message.
313    /// * `Err(RecvError)` if the [`Sender`] was dropped before sending anything or if the message
314    ///   has already been extracted by a previous `try_recv` call.
315    ///
316    /// If a message is returned, the channel is disconnected and any subsequent receive operation
317    /// using this receiver will return an [`RecvError`].
318    pub fn try_recv(&self) -> Result<Option<T>, RecvError> {
319        // SAFETY: The channel will not be freed while this method is still running.
320        let channel = unsafe { self.channel_ptr.as_ref() };
321
322        // ORDERING: we use acquire ordering to synchronize with the store of the message.
323        match channel.state.load(Ordering::Acquire) {
324            EMPTY => Ok(None),
325            DISCONNECTED => Err(RecvError(())),
326            MESSAGE => {
327                // It is okay to break up the load and store since once we are in the MESSAGE state,
328                // the sender no longer modifies the state
329                //
330                // ORDERING: at this point the sender has done its job and is no longer active, so
331                // we need not make any side effects visible to it.
332                channel.state.store(DISCONNECTED, Ordering::Relaxed);
333
334                // SAFETY: we are in the MESSAGE state so the message is present
335                Ok(Some(unsafe { channel.take_message() }))
336            }
337            state => unreachable!("unexpected channel state: {}", state),
338        }
339    }
340}
341
342impl<T> Drop for Receiver<T> {
343    fn drop(&mut self) {
344        // SAFETY: since the receiving side is still alive the sender would have observed that and
345        // left deallocating the channel allocation to us.
346        let channel = unsafe { self.channel_ptr.as_ref() };
347
348        // Set the channel state to disconnected and read what state the receiver was in.
349        match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
350            // The sender has not sent anything, nor is it dropped.
351            EMPTY => {}
352            // The sender already sent something. We must drop it, and free the channel.
353            MESSAGE => {
354                unsafe { channel.drop_message() };
355                unsafe { dealloc(self.channel_ptr) };
356            }
357            // The sender was already dropped. We are responsible for freeing the channel.
358            DISCONNECTED => {
359                unsafe { dealloc(self.channel_ptr) };
360            }
361            // NOTE: the receiver, unless transformed into a future, will never see the
362            // RECEIVING or AWAKING states, so we can ignore them here.
363            state => unreachable!("unexpected channel state: {}", state),
364        }
365    }
366}
367
368/// A future that completes when the message is sent from the associated [`Sender`], or the
369/// [`Sender`] is dropped before sending a message.
370#[derive(Debug)]
371pub struct Recv<T> {
372    channel_ptr: NonNull<Channel<T>>,
373}
374
375unsafe impl<T: Send> Send for Recv<T> {}
376
377fn recv_awaken<T>(channel: &Channel<T>) -> Poll<Result<T, RecvError>> {
378    loop {
379        hint::spin_loop();
380
381        // ORDERING: The load above has already synchronized with writing message.
382        match channel.state.load(Ordering::Relaxed) {
383            AWAKING => {}
384            DISCONNECTED => break Poll::Ready(Err(RecvError(()))),
385            MESSAGE => {
386                // ORDERING: the sender has been dropped, so this update only
387                // needs to be visible to us.
388                channel.state.store(DISCONNECTED, Ordering::Relaxed);
389                // SAFETY: We observed the MESSAGE state.
390                break Poll::Ready(Ok(unsafe { channel.take_message() }));
391            }
392            state => unreachable!("unexpected channel state: {}", state),
393        }
394    }
395}
396
397impl<T> Future for Recv<T> {
398    type Output = Result<T, RecvError>;
399
400    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
401        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
402        // is still alive, meaning that even if the sender was dropped then it would have observed
403        // the fact that we are still alive and left the responsibility of deallocating the
404        // channel to us, so `self.channel` is valid
405        let channel = unsafe { self.channel_ptr.as_ref() };
406
407        // ORDERING: we use acquire ordering to synchronize with the store of the message.
408        match channel.state.load(Ordering::Acquire) {
409            // The sender is alive but has not sent anything yet.
410            EMPTY => {
411                let waker = cx.waker().clone();
412                // SAFETY: We can not be in the forbidden states, and no waker in the channel.
413                unsafe { channel.write_waker(waker) }
414            }
415            // The sender sent the message.
416            MESSAGE => {
417                // ORDERING: the sender has been dropped so this update only needs to be
418                // visible to us.
419                channel.state.store(DISCONNECTED, Ordering::Relaxed);
420                Poll::Ready(Ok(unsafe { channel.take_message() }))
421            }
422            // We were polled again while waiting for the sender. Replace the waker with the new
423            // one.
424            RECEIVING => {
425                // ORDERING: We use relaxed ordering on both success and failure since we have not
426                // written anything above that must be released, and the individual match arms
427                // handle any additional synchronization.
428                match channel.state.compare_exchange(
429                    RECEIVING,
430                    EMPTY,
431                    Ordering::Relaxed,
432                    Ordering::Relaxed,
433                ) {
434                    // We successfully changed the state back to EMPTY.
435                    //
436                    // This is the most likely branch to be taken, which is why we do not use any
437                    // memory barriers in the compare_exchange above.
438                    Ok(_) => {
439                        let waker = cx.waker().clone();
440
441                        // SAFETY: We wrote the waker in a previous call to poll. We do not need
442                        // a memory barrier since the previous write here was by ourselves.
443                        unsafe { channel.drop_waker() };
444
445                        // SAFETY: We can not be in the forbidden states, and no waker in the
446                        // channel.
447                        unsafe { channel.write_waker(waker) }
448                    }
449                    // The sender sent the message while we prepared to replace the waker.
450                    // We take the message and mark the channel disconnected.
451                    // The sender has already taken the waker.
452                    Err(MESSAGE) => {
453                        // ORDERING: Synchronize with writing message. This branch is
454                        // unlikely to be taken.
455                        channel.state.swap(DISCONNECTED, Ordering::Acquire);
456
457                        // SAFETY: The state tells us the sender has initialized the message.
458                        Poll::Ready(Ok(unsafe { channel.take_message() }))
459                    }
460                    // The sender is currently waking us up.
461                    Err(AWAKING) => recv_awaken(channel),
462                    // The sender was dropped before sending anything while we prepared to park.
463                    // The sender has taken the waker already.
464                    Err(DISCONNECTED) => Poll::Ready(Err(RecvError(()))),
465                    Err(state) => unreachable!("unexpected channel state: {}", state),
466                }
467            }
468            // The sender has observed the RECEIVING state and is currently reading the waker from
469            // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
470            // state. We busy loop here since we know the sender is done very soon.
471            AWAKING => recv_awaken(channel),
472            // The sender was dropped before sending anything.
473            DISCONNECTED => Poll::Ready(Err(RecvError(()))),
474            state => unreachable!("unexpected channel state: {}", state),
475        }
476    }
477}
478
479impl<T> Drop for Recv<T> {
480    fn drop(&mut self) {
481        // SAFETY: since the receiving side is still alive the sender would have observed that and
482        // left deallocating the channel allocation to us.
483        let channel = unsafe { self.channel_ptr.as_ref() };
484
485        // Set the channel state to disconnected and read what state the receiver was in.
486        match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
487            // The sender has not sent anything, nor is it dropped.
488            EMPTY => {}
489            // The sender already sent something. We must drop it, and free the channel.
490            MESSAGE => {
491                unsafe { channel.drop_message() };
492                unsafe { dealloc(self.channel_ptr) };
493            }
494            // The receiver has been polled. We must drop the waker.
495            RECEIVING => {
496                unsafe { channel.drop_waker() };
497            }
498            // The sender was already dropped. We are responsible for freeing the channel.
499            DISCONNECTED => {
500                // SAFETY: see safety comment at top of function.
501                unsafe { dealloc(self.channel_ptr) };
502            }
503            // This receiver was previously polled, so the channel was in the RECEIVING state.
504            // But the sender has observed the RECEIVING state and is currently reading the waker
505            // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED
506            // state. We busy loop here since we know the sender is done very soon.
507            AWAKING => {
508                loop {
509                    hint::spin_loop();
510
511                    // ORDERING: The swap above has already synchronized with writing message.
512                    match channel.state.load(Ordering::Relaxed) {
513                        AWAKING => {}
514                        DISCONNECTED => break,
515                        MESSAGE => {
516                            // SAFETY: we are in the message state so the message is initialized.
517                            unsafe { channel.drop_message() };
518                            break;
519                        }
520                        state => unreachable!("unexpected channel state: {}", state),
521                    }
522                }
523                unsafe { dealloc(self.channel_ptr) };
524            }
525            state => unreachable!("unexpected channel state: {}", state),
526        }
527    }
528}
529
530/// Internal channel data structure.
531///
532/// The [`channel`] method allocates and puts one instance of this struct on the heap for each
533/// oneshot channel instance. The struct holds:
534///
535/// * The current state of the channel.
536/// * The message in the channel. This memory is uninitialized until the message is sent.
537/// * The waker instance for the task that is currently receiving on this channel. This memory is
538///   uninitialized until the receiver starts receiving.
539struct Channel<T> {
540    state: AtomicU8,
541    message: UnsafeCell<MaybeUninit<T>>,
542    waker: UnsafeCell<MaybeUninit<Waker>>,
543}
544
545impl<T> Channel<T> {
546    const fn new() -> Self {
547        Self {
548            state: AtomicU8::new(EMPTY),
549            message: UnsafeCell::new(MaybeUninit::uninit()),
550            waker: UnsafeCell::new(MaybeUninit::uninit()),
551        }
552    }
553
554    #[inline(always)]
555    unsafe fn message(&self) -> &MaybeUninit<T> {
556        unsafe { &*self.message.get() }
557    }
558
559    #[inline(always)]
560    unsafe fn write_message(&self, message: T) {
561        unsafe {
562            let slot = &mut *self.message.get();
563            slot.as_mut_ptr().write(message);
564        }
565    }
566
567    #[inline(always)]
568    unsafe fn drop_message(&self) {
569        unsafe {
570            let slot = &mut *self.message.get();
571            slot.assume_init_drop();
572        }
573    }
574
575    #[inline(always)]
576    unsafe fn take_message(&self) -> T {
577        unsafe { ptr::read(self.message.get()).assume_init() }
578    }
579
580    /// # Safety
581    ///
582    /// * The `waker` field must not have a waker stored when calling this method.
583    /// * The `state` must not be in the RECEIVING state when calling this method.
584    unsafe fn write_waker(&self, waker: Waker) -> Poll<Result<T, RecvError>> {
585        // Write the waker instance to the channel.
586        //
587        // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
588        // try to access the waker until it sees the state set to RECEIVING below.
589        unsafe {
590            let slot = &mut *self.waker.get();
591            slot.as_mut_ptr().write(waker);
592        }
593
594        // ORDERING: we use release ordering on success so the sender can synchronize with
595        // our write of the waker. We use relaxed ordering on failure since the sender does
596        // not need to synchronize with our write and the individual match arms handle any
597        // additional synchronization
598        match self
599            .state
600            .compare_exchange(EMPTY, RECEIVING, Ordering::Release, Ordering::Relaxed)
601        {
602            // We stored our waker, now we return and let the sender wake us up.
603            Ok(_) => Poll::Pending,
604            // The sender sent the message while we prepared to await.
605            // We take the message and mark the channel disconnected.
606            Err(MESSAGE) => {
607                // ORDERING: Synchronize with writing message. This branch is unlikely to be
608                // taken, so it is likely more efficient to use a fence here
609                // instead of AcqRel ordering on the compare_exchange
610                // operation.
611                fence(Ordering::Acquire);
612
613                // SAFETY: we started in the EMPTY state and the sender switched us to the
614                // MESSAGE state. This means that it did not take the waker, so we're
615                // responsible for dropping it.
616                unsafe { self.drop_waker() };
617
618                // ORDERING: sender does not exist, so this update only needs to be visible to
619                // us.
620                self.state.store(DISCONNECTED, Ordering::Relaxed);
621
622                // SAFETY: The MESSAGE state tells us there is a correctly initialized message.
623                Poll::Ready(Ok(unsafe { self.take_message() }))
624            }
625            // The sender was dropped before sending anything while we prepared to await.
626            Err(DISCONNECTED) => {
627                // SAFETY: we started in the EMPTY state and the sender switched us to the
628                // DISCONNECTED state. This means that it did not take the waker, so we are
629                // responsible for dropping it.
630                unsafe { self.drop_waker() };
631                Poll::Ready(Err(RecvError(())))
632            }
633            Err(state) => unreachable!("unexpected channel state: {}", state),
634        }
635    }
636
637    #[inline(always)]
638    unsafe fn drop_waker(&self) {
639        unsafe {
640            let slot = &mut *self.waker.get();
641            slot.assume_init_drop();
642        }
643    }
644
645    #[inline(always)]
646    unsafe fn take_waker(&self) -> Waker {
647        unsafe { ptr::read(self.waker.get()).assume_init() }
648    }
649}
650
651unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
652    unsafe { drop(Box::from_raw(channel.as_ptr())) }
653}
654
655/// An error returned when trying to send on a closed channel. Returned from
656/// [`Sender::send`] if the corresponding [`Receiver`] has already been dropped.
657///
658/// The message that could not be sent can be retrieved again with [`SendError::into_inner`].
659pub struct SendError<T> {
660    channel_ptr: NonNull<Channel<T>>,
661}
662
663unsafe impl<T: Send> Send for SendError<T> {}
664unsafe impl<T: Sync> Sync for SendError<T> {}
665
666impl<T> SendError<T> {
667    /// Get a reference to the message that failed to be sent.
668    pub fn as_inner(&self) -> &T {
669        unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
670    }
671
672    /// Consumes the error and returns the message that failed to be sent.
673    pub fn into_inner(self) -> T {
674        let channel_ptr = self.channel_ptr;
675
676        // Do not run destructor if we consumed ourselves. Freeing happens below.
677        mem::forget(self);
678
679        // SAFETY: we have ownership of the channel
680        let channel: &Channel<T> = unsafe { channel_ptr.as_ref() };
681
682        // SAFETY: we know that the message is initialized according to the safety requirements of
683        // `new`
684        let message = unsafe { channel.take_message() };
685
686        // SAFETY: we own the channel
687        unsafe { dealloc(channel_ptr) };
688
689        message
690    }
691}
692
693impl<T> Drop for SendError<T> {
694    fn drop(&mut self) {
695        // SAFETY: we have ownership of the channel and require that the message is initialized
696        // upon construction
697        unsafe {
698            self.channel_ptr.as_ref().drop_message();
699            dealloc(self.channel_ptr);
700        }
701    }
702}
703
704impl<T> fmt::Display for SendError<T> {
705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706        "sending on a closed channel".fmt(f)
707    }
708}
709
710impl<T> fmt::Debug for SendError<T> {
711    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
712        write!(f, "SendError<{}>(..)", stringify!(T))
713    }
714}
715
716impl<T> std::error::Error for SendError<T> {}
717
718/// An error returned when receiving the message.
719///
720/// The receiving operation can only fail if the corresponding [`Sender`] was dropped
721/// before sending any message, or if a message has already been received on the channel.
722#[derive(Debug, Clone, Eq, PartialEq, Hash)]
723pub struct RecvError(());
724
725impl fmt::Display for RecvError {
726    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
727        "receiving on a closed channel".fmt(f)
728    }
729}
730
731impl std::error::Error for RecvError {}
732
733/// The initial channel state. Active while both endpoints are still alive, no message has been
734/// sent, and the receiver is not receiving.
735const EMPTY: u8 = 0b011;
736/// A message has been sent to the channel, but the receiver has not yet read it.
737const MESSAGE: u8 = 0b100;
738/// No message has yet been sent on the channel, but the receiver future ([`Recv`]) is currently
739/// receiving.
740const RECEIVING: u8 = 0b000;
741/// A message is sending to the channel, or the channel is closing. The receiver future ([`Recv`])
742/// is currently being awakened.
743const AWAKING: u8 = 0b001;
744/// The channel has been closed. This means that either the sender or receiver has been dropped,
745/// or the message sent to the channel has already been received.
746const DISCONNECTED: u8 = 0b010;