mea/oneshot/mod.rs
1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This implementation is derived from the `oneshot` crate [1], with significant simplifications
16// since mea needs not support synchronized receiving functions.
17//
18// [1] https://github.com/faern/oneshot/blob/25274e99/src/lib.rs
19
20//! A one-shot channel is used for sending a single message between
21//! asynchronous tasks. The [`channel`] function is used to create a
22//! [`Sender`] and [`Receiver`] handle pair that form the channel.
23//!
24//! The `Sender` handle is used by the producer to send the value.
25//! The `Receiver` handle is used by the consumer to receive the value.
26//!
27//! Each handle can be used on separate tasks.
28//!
29//! Since the `send` method is not async, it can be used anywhere. This includes
30//! sending between two runtimes, and using it from non-async code.
31//!
32//! # Examples
33//!
34//! ```
35//! # #[tokio::main]
36//! # async fn main() {
37//! use mea::oneshot;
38//!
39//! let (tx, rx) = oneshot::channel();
40//!
41//! tokio::spawn(async move {
42//! if let Err(_) = tx.send(3) {
43//! println!("the receiver dropped");
44//! }
45//! });
46//!
47//! match rx.await {
48//! Ok(v) => println!("got = {:?}", v),
49//! Err(_) => println!("the sender dropped"),
50//! }
51//! # }
52//! ```
53//!
54//! If the sender is dropped without sending, the receiver will fail with
55//! [`RecvError`]:
56//!
57//! ```
58//! # #[tokio::main]
59//! # async fn main() {
60//! use mea::oneshot;
61//!
62//! let (tx, rx) = oneshot::channel::<u32>();
63//!
64//! tokio::spawn(async move {
65//! drop(tx);
66//! });
67//!
68//! match rx.await {
69//! Ok(_) => panic!("This doesn't happen"),
70//! Err(_) => println!("the sender dropped"),
71//! }
72//! # }
73//! ```
74
75use std::cell::UnsafeCell;
76use std::fmt;
77use std::future::Future;
78use std::future::IntoFuture;
79use std::hint;
80use std::mem;
81use std::mem::MaybeUninit;
82use std::pin::Pin;
83use std::ptr;
84use std::ptr::NonNull;
85use std::sync::atomic::AtomicU8;
86use std::sync::atomic::Ordering;
87use std::sync::atomic::fence;
88use std::task::Context;
89use std::task::Poll;
90use std::task::Waker;
91
92#[cfg(test)]
93mod tests;
94
95/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
96pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
97 let channel_ptr = NonNull::from(Box::leak(Box::new(Channel::new())));
98 (Sender { channel_ptr }, Receiver { channel_ptr })
99}
100
101/// Sends a value to the associated [`Receiver`].
102pub struct Sender<T> {
103 channel_ptr: NonNull<Channel<T>>,
104}
105
106impl<T> fmt::Debug for Sender<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("Sender").finish_non_exhaustive()
109 }
110}
111
112unsafe impl<T: Send> Send for Sender<T> {}
113unsafe impl<T: Sync> Sync for Sender<T> {}
114
115#[inline(always)]
116fn sender_wake_up_receiver<T>(channel: &Channel<T>, state: u8) {
117 // ORDERING: Synchronizes with writing waker to memory, and prevents the
118 // taking of the waker from being ordered before this operation.
119 fence(Ordering::Acquire);
120
121 // Take the waker, but critically do not awake it. If we awake it now, the
122 // receiving thread could still observe the AWAKING state and re-await, meaning
123 // that after we change to the MESSAGE state, it would remain waiting indefinitely
124 // or until a spurious wakeup.
125 //
126 // SAFETY: at this point we are in the AWAKING state, and the receiving thread
127 // does not access the waker while in this state, nor does it free the channel
128 // allocation in this state.
129 let waker = unsafe { channel.take_waker() };
130
131 // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
132 // in the receiving thread, ensuring that both our read of the waker and write of
133 // the message happen-before the taking of the message and freeing of the channel.
134 // Furthermore, we need acquire ordering to ensure awaking the receiver
135 // happens after the channel state is updated.
136 channel.state.swap(state, Ordering::AcqRel);
137
138 // Note: it is possible that between the store above and this statement that
139 // the receiving thread is spuriously awakened, takes the message, and frees
140 // the channel allocation. However, we took ownership of the channel out of
141 // that allocation, and freeing the channel does not drop the waker since the
142 // waker is wrapped in MaybeUninit. Therefore, this data is valid regardless of
143 // whether the receiver has completed by this point.
144 waker.wake();
145}
146
147impl<T> Sender<T> {
148 /// Attempts to send a value on this channel, returning an error contains the message if it
149 /// could not be sent.
150 pub fn send(self, message: T) -> Result<(), SendError<T>> {
151 let channel_ptr = self.channel_ptr;
152
153 // Do not run the Drop implementation if send was called, any cleanup happens below.
154 mem::forget(self);
155
156 // SAFETY: The channel exists on the heap for the entire duration of this method, and we
157 // only ever acquire shared references to it. Note that if the receiver disconnects it
158 // does not free the channel.
159 let channel = unsafe { channel_ptr.as_ref() };
160
161 // Write the message into the channel on the heap.
162 //
163 // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
164 // state, and since we are responsible for setting that state, we can guarantee that we have
165 // exclusive access to this memory location to perform this write.
166 unsafe { channel.write_message(message) };
167
168 // Update the state to signal there is a message on the channel:
169 //
170 // * EMPTY + 1 = MESSAGE
171 // * RECEIVING + 1 = AWAKING
172 // * DISCONNECTED + 1 = EMPTY (invalid), however this state is never observed
173 //
174 // ORDERING: we use release ordering to ensure writing the message is visible to the
175 // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
176 // and thus we do not need an acquire ordering. The RECEIVING branch manages synchronization
177 // independent of this operation.
178 match channel.state.fetch_add(1, Ordering::Release) {
179 // The receiver is alive and has not started waiting. Send done.
180 EMPTY => Ok(()),
181 // The receiver is waiting. Wake it up so it can return the message.
182 RECEIVING => {
183 sender_wake_up_receiver(channel, MESSAGE);
184 Ok(())
185 }
186 // The receiver was already dropped. The error is responsible for freeing the channel.
187 //
188 // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
189 // we can transfer exclusive ownership of the channel's resources to the error.
190 // Moreover, since we just placed the message in the channel, the channel contains a
191 // valid message.
192 DISCONNECTED => Err(SendError { channel_ptr }),
193 state => unreachable!("unexpected channel state: {}", state),
194 }
195 }
196
197 /// Returns true if the associated [`Receiver`] has been dropped.
198 ///
199 /// If true is returned, a future call to send is guaranteed to return an error.
200 pub fn is_closed(&self) -> bool {
201 // SAFETY: The channel exists on the heap for the entire duration of this method, and we
202 // only ever acquire shared references to it. Note that if the receiver disconnects it
203 // does not free the channel.
204 let channel = unsafe { self.channel_ptr.as_ref() };
205
206 // ORDERING: We *chose* a Relaxed ordering here as it sufficient to enforce the method's
207 // contract: "if true is returned, a future call to send is guaranteed to return an error."
208 //
209 // Once true has been observed, it will remain true. However, if false is observed,
210 // the receiver might have just disconnected but this thread has not observed it yet.
211 matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
212 }
213}
214
215impl<T> Drop for Sender<T> {
216 fn drop(&mut self) {
217 // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
218 // DISCONNECTED states.
219 //
220 // * If we are in the MESSAGE state, then we called mem::forget(self), so we should
221 // not be in this function call.
222 // * If we are in the DISCONNECTED state, then the receiver either received a MESSAGE
223 // so this statement is unreachable, or was dropped and observed that our side was still
224 // alive, and thus didn't free the channel.
225 let channel = unsafe { self.channel_ptr.as_ref() };
226
227 // Update the channel state to disconnected:
228 //
229 // * EMPTY ^ 001 = DISCONNECTED
230 // * RECEIVING ^ 001 = AWAKING
231 // * DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
232 //
233 // ORDERING: we need not release ordering here since there are no modifications we
234 // need to make visible to other thread, and the Err(RECEIVING) branch handles
235 // synchronization independent of this fetch_xor
236 match channel.state.fetch_xor(0b001, Ordering::Relaxed) {
237 // The receiver has not started waiting, nor is it dropped.
238 EMPTY => {}
239 // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
240 RECEIVING => sender_wake_up_receiver(channel, DISCONNECTED),
241 // The receiver was already dropped. We are responsible for freeing the channel.
242 DISCONNECTED => {
243 // SAFETY: when the receiver switches the state to DISCONNECTED they have received
244 // the message or will no longer be trying to receive the message, and have
245 // observed that the sender is still alive, meaning that we are responsible for
246 // freeing the channel allocation.
247 unsafe { dealloc(self.channel_ptr) };
248 }
249 state => unreachable!("unexpected channel state: {}", state),
250 }
251 }
252}
253
254/// Receives a value from the associated [`Sender`].
255pub struct Receiver<T> {
256 channel_ptr: NonNull<Channel<T>>,
257}
258
259impl<T> fmt::Debug for Receiver<T> {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.debug_struct("Receiver").finish_non_exhaustive()
262 }
263}
264
265unsafe impl<T: Send> Send for Receiver<T> {}
266
267impl<T> IntoFuture for Receiver<T> {
268 type Output = Result<T, RecvError>;
269
270 type IntoFuture = Recv<T>;
271
272 fn into_future(self) -> Self::IntoFuture {
273 let Receiver { channel_ptr } = self;
274 // Do not run our Drop implementation, since the receiver lives on as the new future.
275 mem::forget(self);
276 Recv { channel_ptr }
277 }
278}
279
280impl<T> Receiver<T> {
281 /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
282 /// the message has already been received.
283 ///
284 /// If `true` is returned, all future calls to receive the message are guaranteed to return
285 /// [`RecvError`]. And future calls to this method is guaranteed to also return `true`.
286 pub fn is_closed(&self) -> bool {
287 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
288 // is still alive, meaning that even if the sender was dropped then it would have observed
289 // the fact that we are still alive and left the responsibility of deallocating the
290 // channel to us, so `self.channel` is valid
291 let channel = unsafe { self.channel_ptr.as_ref() };
292
293 // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
294 // enforce the method's contract.
295 //
296 // Once true has been observed, it will remain true. However, if false is observed,
297 // the sender might have just disconnected but this thread has not observed it yet.
298 matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
299 }
300
301 /// Returns true if there is a message in the channel, ready to be received.
302 ///
303 /// If `true` is returned, the next call to receive the message is guaranteed to return
304 /// the message immediately.
305 pub fn has_message(&self) -> bool {
306 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
307 // is still alive, meaning that even if the sender was dropped then it would have observed
308 // the fact that we are still alive and left the responsibility of deallocating the
309 // channel to us, so `self.channel` is valid
310 let channel = unsafe { self.channel_ptr.as_ref() };
311
312 // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
313 // before this one. This upholds the contract that if true is returned, the next call to
314 // receive the message is guaranteed to also observe the `MESSAGE` state and return the
315 // message immediately.
316 matches!(channel.state.load(Ordering::Acquire), MESSAGE)
317 }
318
319 /// Checks if there is a message in the channel without blocking. Returns:
320 ///
321 /// * `Ok(message)` if there was a message in the channel.
322 /// * `Err(TryRecvError::Empty)` if the [`Sender`] is alive, but has not yet sent a message.
323 /// * `Err(TryRecvError::Disconnected)` if the [`Sender`] was dropped before sending anything or
324 /// if the message has already been extracted by a previous `try_recv` call.
325 ///
326 /// If a message is returned, the channel is disconnected and any subsequent receive operation
327 /// using this receiver will return an error: [`TryRecvError::Disconnected`] for `try_recv`,
328 /// or [`RecvError::Disconnected`] for [`recv`](Receiver::into_future).
329 pub fn try_recv(&self) -> Result<T, TryRecvError> {
330 // SAFETY: The channel will not be freed while this method is still running.
331 let channel = unsafe { self.channel_ptr.as_ref() };
332
333 // ORDERING: we use acquire ordering to synchronize with the store of the message.
334 match channel.state.load(Ordering::Acquire) {
335 EMPTY => Err(TryRecvError::Empty),
336 DISCONNECTED => Err(TryRecvError::Disconnected),
337 MESSAGE => {
338 // It is okay to break up the load and store since once we are in the MESSAGE state,
339 // the sender no longer modifies the state
340 //
341 // ORDERING: at this point the sender has done its job and is no longer active, so
342 // we need not make any side effects visible to it.
343 channel.state.store(DISCONNECTED, Ordering::Relaxed);
344
345 // SAFETY: we are in the MESSAGE state so the message is present
346 Ok(unsafe { channel.take_message() })
347 }
348 state => unreachable!("unexpected channel state: {}", state),
349 }
350 }
351}
352
353impl<T> Drop for Receiver<T> {
354 fn drop(&mut self) {
355 // SAFETY: since the receiving side is still alive the sender would have observed that and
356 // left deallocating the channel allocation to us.
357 let channel = unsafe { self.channel_ptr.as_ref() };
358
359 // Set the channel state to disconnected and read what state the receiver was in.
360 match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
361 // The sender has not sent anything, nor is it dropped.
362 EMPTY => {}
363 // The sender already sent something. We must drop it, and free the channel.
364 MESSAGE => {
365 unsafe { channel.drop_message() };
366 unsafe { dealloc(self.channel_ptr) };
367 }
368 // The sender was already dropped. We are responsible for freeing the channel.
369 DISCONNECTED => {
370 unsafe { dealloc(self.channel_ptr) };
371 }
372 // NOTE: the receiver, unless transformed into a future, will never see the
373 // RECEIVING or AWAKING states, so we can ignore them here.
374 state => unreachable!("unexpected channel state: {}", state),
375 }
376 }
377}
378
379/// A future that completes when the message is sent from the associated [`Sender`], or the
380/// [`Sender`] is dropped before sending a message.
381pub struct Recv<T> {
382 channel_ptr: NonNull<Channel<T>>,
383}
384
385impl<T> fmt::Debug for Recv<T> {
386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387 f.debug_struct("Recv").finish_non_exhaustive()
388 }
389}
390
391unsafe impl<T: Send> Send for Recv<T> {}
392
393fn recv_awaken<T>(channel: &Channel<T>) -> Poll<Result<T, RecvError>> {
394 loop {
395 hint::spin_loop();
396
397 // ORDERING: The load above has already synchronized with writing message.
398 match channel.state.load(Ordering::Relaxed) {
399 AWAKING => {}
400 DISCONNECTED => break Poll::Ready(Err(RecvError::Disconnected)),
401 MESSAGE => {
402 // ORDERING: the sender has been dropped, so this update only
403 // needs to be visible to us.
404 channel.state.store(DISCONNECTED, Ordering::Relaxed);
405 // SAFETY: We observed the MESSAGE state.
406 break Poll::Ready(Ok(unsafe { channel.take_message() }));
407 }
408 state => unreachable!("unexpected channel state: {}", state),
409 }
410 }
411}
412
413impl<T> Future for Recv<T> {
414 type Output = Result<T, RecvError>;
415
416 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
417 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
418 // is still alive, meaning that even if the sender was dropped then it would have observed
419 // the fact that we are still alive and left the responsibility of deallocating the
420 // channel to us, so `self.channel` is valid
421 let channel = unsafe { self.channel_ptr.as_ref() };
422
423 // ORDERING: we use acquire ordering to synchronize with the store of the message.
424 match channel.state.load(Ordering::Acquire) {
425 // The sender is alive but has not sent anything yet.
426 EMPTY => {
427 let waker = cx.waker().clone();
428 // SAFETY: We can not be in the forbidden states, and no waker in the channel.
429 unsafe { channel.write_waker(waker) }
430 }
431 // The sender sent the message.
432 MESSAGE => {
433 // ORDERING: the sender has been dropped so this update only needs to be
434 // visible to us.
435 channel.state.store(DISCONNECTED, Ordering::Relaxed);
436 Poll::Ready(Ok(unsafe { channel.take_message() }))
437 }
438 // We were polled again while waiting for the sender. Replace the waker with the new
439 // one.
440 RECEIVING => {
441 // ORDERING: We use relaxed ordering on both success and failure since we have not
442 // written anything above that must be released, and the individual match arms
443 // handle any additional synchronization.
444 match channel.state.compare_exchange(
445 RECEIVING,
446 EMPTY,
447 Ordering::Relaxed,
448 Ordering::Relaxed,
449 ) {
450 // We successfully changed the state back to EMPTY.
451 //
452 // This is the most likely branch to be taken, which is why we do not use any
453 // memory barriers in the compare_exchange above.
454 Ok(_) => {
455 let waker = cx.waker().clone();
456
457 // SAFETY: We wrote the waker in a previous call to poll. We do not need
458 // a memory barrier since the previous write here was by ourselves.
459 unsafe { channel.drop_waker() };
460
461 // SAFETY: We can not be in the forbidden states, and no waker in the
462 // channel.
463 unsafe { channel.write_waker(waker) }
464 }
465 // The sender sent the message while we prepared to replace the waker.
466 // We take the message and mark the channel disconnected.
467 // The sender has already taken the waker.
468 Err(MESSAGE) => {
469 // ORDERING: Synchronize with writing message. This branch is
470 // unlikely to be taken.
471 channel.state.swap(DISCONNECTED, Ordering::Acquire);
472
473 // SAFETY: The state tells us the sender has initialized the message.
474 Poll::Ready(Ok(unsafe { channel.take_message() }))
475 }
476 // The sender is currently waking us up.
477 Err(AWAKING) => recv_awaken(channel),
478 // The sender was dropped before sending anything while we prepared to park.
479 // The sender has taken the waker already.
480 Err(DISCONNECTED) => Poll::Ready(Err(RecvError::Disconnected)),
481 Err(state) => unreachable!("unexpected channel state: {}", state),
482 }
483 }
484 // The sender has observed the RECEIVING state and is currently reading the waker from
485 // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
486 // state. We busy loop here since we know the sender is done very soon.
487 AWAKING => recv_awaken(channel),
488 // The sender was dropped before sending anything.
489 DISCONNECTED => Poll::Ready(Err(RecvError::Disconnected)),
490 state => unreachable!("unexpected channel state: {}", state),
491 }
492 }
493}
494
495impl<T> Drop for Recv<T> {
496 fn drop(&mut self) {
497 // SAFETY: since the receiving side is still alive the sender would have observed that and
498 // left deallocating the channel allocation to us.
499 let channel = unsafe { self.channel_ptr.as_ref() };
500
501 // Set the channel state to disconnected and read what state the receiver was in.
502 match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
503 // The sender has not sent anything, nor is it dropped.
504 EMPTY => {}
505 // The sender already sent something. We must drop it, and free the channel.
506 MESSAGE => {
507 unsafe { channel.drop_message() };
508 unsafe { dealloc(self.channel_ptr) };
509 }
510 // The receiver has been polled. We must drop the waker.
511 RECEIVING => {
512 unsafe { channel.drop_waker() };
513 }
514 // The sender was already dropped. We are responsible for freeing the channel.
515 DISCONNECTED => {
516 // SAFETY: see safety comment at top of function.
517 unsafe { dealloc(self.channel_ptr) };
518 }
519 // This receiver was previously polled, so the channel was in the RECEIVING state.
520 // But the sender has observed the RECEIVING state and is currently reading the waker
521 // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED
522 // state. We busy loop here since we know the sender is done very soon.
523 AWAKING => {
524 loop {
525 hint::spin_loop();
526
527 // ORDERING: The swap above has already synchronized with writing message.
528 match channel.state.load(Ordering::Relaxed) {
529 AWAKING => {}
530 DISCONNECTED => break,
531 MESSAGE => {
532 // SAFETY: we are in the message state so the message is initialized.
533 unsafe { channel.drop_message() };
534 break;
535 }
536 state => unreachable!("unexpected channel state: {}", state),
537 }
538 }
539 unsafe { dealloc(self.channel_ptr) };
540 }
541 state => unreachable!("unexpected channel state: {}", state),
542 }
543 }
544}
545
546/// Internal channel data structure.
547///
548/// The [`channel`] method allocates and puts one instance of this struct on the heap for each
549/// oneshot channel instance. The struct holds:
550///
551/// * The current state of the channel.
552/// * The message in the channel. This memory is uninitialized until the message is sent.
553/// * The waker instance for the task that is currently receiving on this channel. This memory is
554/// uninitialized until the receiver starts receiving.
555struct Channel<T> {
556 state: AtomicU8,
557 message: UnsafeCell<MaybeUninit<T>>,
558 waker: UnsafeCell<MaybeUninit<Waker>>,
559}
560
561impl<T> Channel<T> {
562 const fn new() -> Self {
563 Self {
564 state: AtomicU8::new(EMPTY),
565 message: UnsafeCell::new(MaybeUninit::uninit()),
566 waker: UnsafeCell::new(MaybeUninit::uninit()),
567 }
568 }
569
570 #[inline(always)]
571 unsafe fn message(&self) -> &MaybeUninit<T> {
572 unsafe { &*self.message.get() }
573 }
574
575 #[inline(always)]
576 unsafe fn write_message(&self, message: T) {
577 unsafe {
578 let slot = &mut *self.message.get();
579 slot.as_mut_ptr().write(message);
580 }
581 }
582
583 #[inline(always)]
584 unsafe fn drop_message(&self) {
585 unsafe {
586 let slot = &mut *self.message.get();
587 slot.assume_init_drop();
588 }
589 }
590
591 #[inline(always)]
592 unsafe fn take_message(&self) -> T {
593 unsafe { ptr::read(self.message.get()).assume_init() }
594 }
595
596 /// # Safety
597 ///
598 /// * The `waker` field must not have a waker stored when calling this method.
599 /// * The `state` must not be in the RECEIVING state when calling this method.
600 unsafe fn write_waker(&self, waker: Waker) -> Poll<Result<T, RecvError>> {
601 // Write the waker instance to the channel.
602 //
603 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
604 // try to access the waker until it sees the state set to RECEIVING below.
605 unsafe {
606 let slot = &mut *self.waker.get();
607 slot.as_mut_ptr().write(waker);
608 }
609
610 // ORDERING: we use release ordering on success so the sender can synchronize with
611 // our write of the waker. We use relaxed ordering on failure since the sender does
612 // not need to synchronize with our write and the individual match arms handle any
613 // additional synchronization
614 match self
615 .state
616 .compare_exchange(EMPTY, RECEIVING, Ordering::Release, Ordering::Relaxed)
617 {
618 // We stored our waker, now we return and let the sender wake us up.
619 Ok(_) => Poll::Pending,
620 // The sender sent the message while we prepared to await.
621 // We take the message and mark the channel disconnected.
622 Err(MESSAGE) => {
623 // ORDERING: Synchronize with writing message. This branch is unlikely to be
624 // taken, so it is likely more efficient to use a fence here
625 // instead of AcqRel ordering on the compare_exchange
626 // operation.
627 fence(Ordering::Acquire);
628
629 // SAFETY: we started in the EMPTY state and the sender switched us to the
630 // MESSAGE state. This means that it did not take the waker, so we're
631 // responsible for dropping it.
632 unsafe { self.drop_waker() };
633
634 // ORDERING: sender does not exist, so this update only needs to be visible to
635 // us.
636 self.state.store(DISCONNECTED, Ordering::Relaxed);
637
638 // SAFETY: The MESSAGE state tells us there is a correctly initialized message.
639 Poll::Ready(Ok(unsafe { self.take_message() }))
640 }
641 // The sender was dropped before sending anything while we prepared to await.
642 Err(DISCONNECTED) => {
643 // SAFETY: we started in the EMPTY state and the sender switched us to the
644 // DISCONNECTED state. This means that it did not take the waker, so we are
645 // responsible for dropping it.
646 unsafe { self.drop_waker() };
647 Poll::Ready(Err(RecvError::Disconnected))
648 }
649 Err(state) => unreachable!("unexpected channel state: {}", state),
650 }
651 }
652
653 #[inline(always)]
654 unsafe fn drop_waker(&self) {
655 unsafe {
656 let slot = &mut *self.waker.get();
657 slot.assume_init_drop();
658 }
659 }
660
661 #[inline(always)]
662 unsafe fn take_waker(&self) -> Waker {
663 unsafe { ptr::read(self.waker.get()).assume_init() }
664 }
665}
666
667unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
668 unsafe { drop(Box::from_raw(channel.as_ptr())) }
669}
670
671/// An error returned when trying to send on a closed channel. Returned from
672/// [`Sender::send`] if the corresponding [`Receiver`] has already been dropped.
673///
674/// The message that could not be sent can be retrieved again with [`SendError::into_inner`].
675pub struct SendError<T> {
676 channel_ptr: NonNull<Channel<T>>,
677}
678
679unsafe impl<T: Send> Send for SendError<T> {}
680unsafe impl<T: Sync> Sync for SendError<T> {}
681
682impl<T> SendError<T> {
683 /// Get a reference to the message that failed to be sent.
684 pub fn as_inner(&self) -> &T {
685 unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
686 }
687
688 /// Consumes the error and returns the message that failed to be sent.
689 pub fn into_inner(self) -> T {
690 let channel_ptr = self.channel_ptr;
691
692 // Do not run destructor if we consumed ourselves. Freeing happens below.
693 mem::forget(self);
694
695 // SAFETY: we have ownership of the channel
696 let channel: &Channel<T> = unsafe { channel_ptr.as_ref() };
697
698 // SAFETY: we know that the message is initialized according to the safety requirements of
699 // `new`
700 let message = unsafe { channel.take_message() };
701
702 // SAFETY: we own the channel
703 unsafe { dealloc(channel_ptr) };
704
705 message
706 }
707}
708
709impl<T> Drop for SendError<T> {
710 fn drop(&mut self) {
711 // SAFETY: we have ownership of the channel and require that the message is initialized
712 // upon construction
713 unsafe {
714 self.channel_ptr.as_ref().drop_message();
715 dealloc(self.channel_ptr);
716 }
717 }
718}
719
720impl<T> fmt::Display for SendError<T> {
721 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
722 "sending on a closed channel".fmt(f)
723 }
724}
725
726impl<T> fmt::Debug for SendError<T> {
727 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728 write!(f, "SendError<{}>(..)", stringify!(T))
729 }
730}
731
732impl<T> std::error::Error for SendError<T> {}
733
734/// Error returned by [`Receiver::try_recv`].
735#[derive(Debug, Clone, Eq, PartialEq)]
736pub enum TryRecvError {
737 /// This channel is currently empty, but the sender has not yet disconnected, so data may yet
738 /// become available.
739 Empty,
740 /// The sender has become disconnected, and there will never be any more data received on it.
741 Disconnected,
742}
743
744impl fmt::Display for TryRecvError {
745 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
746 match self {
747 TryRecvError::Empty => write!(f, "receiving on an empty channel"),
748 TryRecvError::Disconnected => write!(f, "receiving on a closed channel"),
749 }
750 }
751}
752
753impl std::error::Error for TryRecvError {}
754
755/// An error returned when awaiting the message via [`Receiver`].
756///
757/// This error indicates that the corresponding [`Sender`] was dropped before sending any message.
758/// Note that if a message was already received (e.g., via [`Receiver::try_recv`]), subsequent
759/// `try_recv` calls will return [`TryRecvError::Disconnected`] instead.
760#[derive(Debug, Clone, Eq, PartialEq)]
761pub enum RecvError {
762 /// The sender has become disconnected, and there will never be any more data received on it.
763 Disconnected,
764}
765
766impl fmt::Display for RecvError {
767 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768 write!(f, "receiving on a closed channel")
769 }
770}
771
772impl std::error::Error for RecvError {}
773
774/// The initial channel state. Active while both endpoints are still alive, no message has been
775/// sent, and the receiver is not receiving.
776const EMPTY: u8 = 0b011;
777/// A message has been sent to the channel, but the receiver has not yet read it.
778const MESSAGE: u8 = 0b100;
779/// No message has yet been sent on the channel, but the receiver future ([`Recv`]) is currently
780/// receiving.
781const RECEIVING: u8 = 0b000;
782/// A message is sending to the channel, or the channel is closing. The receiver future ([`Recv`])
783/// is currently being awakened.
784const AWAKING: u8 = 0b001;
785/// The channel has been closed. This means that either the sender or receiver has been dropped,
786/// or the message sent to the channel has already been received.
787const DISCONNECTED: u8 = 0b010;