mea/oneshot/mod.rs
1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This implementation is derived from the `oneshot` crate [1], with significant simplifications
16// since mea needs not support synchronized receiving functions.
17//
18// [1] https://github.com/faern/oneshot/blob/25274e99/src/lib.rs
19
20//! A one-shot channel is used for sending a single message between
21//! asynchronous tasks. The [`channel`] function is used to create a
22//! [`Sender`] and [`Receiver`] handle pair that form the channel.
23//!
24//! The `Sender` handle is used by the producer to send the value.
25//! The `Receiver` handle is used by the consumer to receive the value.
26//!
27//! Each handle can be used on separate tasks.
28//!
29//! Since the `send` method is not async, it can be used anywhere. This includes
30//! sending between two runtimes, and using it from non-async code.
31//!
32//! # Examples
33//!
34//! ```
35//! # #[tokio::main]
36//! # async fn main() {
37//! use mea::oneshot;
38//!
39//! let (tx, rx) = oneshot::channel();
40//!
41//! tokio::spawn(async move {
42//! if let Err(_) = tx.send(3) {
43//! println!("the receiver dropped");
44//! }
45//! });
46//!
47//! match rx.await {
48//! Ok(v) => println!("got = {:?}", v),
49//! Err(_) => println!("the sender dropped"),
50//! }
51//! # }
52//! ```
53//!
54//! If the sender is dropped without sending, the receiver will fail with
55//! [`RecvError`]:
56//!
57//! ```
58//! # #[tokio::main]
59//! # async fn main() {
60//! use mea::oneshot;
61//!
62//! let (tx, rx) = oneshot::channel::<u32>();
63//!
64//! tokio::spawn(async move {
65//! drop(tx);
66//! });
67//!
68//! match rx.await {
69//! Ok(_) => panic!("This doesn't happen"),
70//! Err(_) => println!("the sender dropped"),
71//! }
72//! # }
73//! ```
74
75use std::cell::UnsafeCell;
76use std::fmt;
77use std::future::Future;
78use std::future::IntoFuture;
79use std::hint;
80use std::mem;
81use std::mem::MaybeUninit;
82use std::pin::Pin;
83use std::ptr;
84use std::ptr::NonNull;
85use std::sync::atomic::AtomicU8;
86use std::sync::atomic::Ordering;
87use std::sync::atomic::fence;
88use std::task::Context;
89use std::task::Poll;
90use std::task::Waker;
91
92#[cfg(test)]
93mod tests;
94
95/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
96pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
97 let channel_ptr = NonNull::from(Box::leak(Box::new(Channel::new())));
98 (Sender { channel_ptr }, Receiver { channel_ptr })
99}
100
101/// Sends a value to the associated [`Receiver`].
102#[derive(Debug)]
103pub struct Sender<T> {
104 channel_ptr: NonNull<Channel<T>>,
105}
106
107unsafe impl<T: Send> Send for Sender<T> {}
108unsafe impl<T: Sync> Sync for Sender<T> {}
109
110#[inline(always)]
111fn sender_wake_up_receiver<T>(channel: &Channel<T>, state: u8) {
112 // ORDERING: Synchronizes with writing waker to memory, and prevents the
113 // taking of the waker from being ordered before this operation.
114 fence(Ordering::Acquire);
115
116 // Take the waker, but critically do not awake it. If we awake it now, the
117 // receiving thread could still observe the AWAKING state and re-await, meaning
118 // that after we change to the MESSAGE state, it would remain waiting indefinitely
119 // or until a spurious wakeup.
120 //
121 // SAFETY: at this point we are in the AWAKING state, and the receiving thread
122 // does not access the waker while in this state, nor does it free the channel
123 // allocation in this state.
124 let waker = unsafe { channel.take_waker() };
125
126 // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
127 // in the receiving thread, ensuring that both our read of the waker and write of
128 // the message happen-before the taking of the message and freeing of the channel.
129 // Furthermore, we need acquire ordering to ensure awaking the receiver
130 // happens after the channel state is updated.
131 channel.state.swap(state, Ordering::AcqRel);
132
133 // Note: it is possible that between the store above and this statement that
134 // the receiving thread is spuriously awakened, takes the message, and frees
135 // the channel allocation. However, we took ownership of the channel out of
136 // that allocation, and freeing the channel does not drop the waker since the
137 // waker is wrapped in MaybeUninit. Therefore, this data is valid regardless of
138 // whether the receiver has completed by this point.
139 waker.wake();
140}
141
142impl<T> Sender<T> {
143 /// Attempts to send a value on this channel, returning an error contains the message if it
144 /// could not be sent.
145 pub fn send(self, message: T) -> Result<(), SendError<T>> {
146 let channel_ptr = self.channel_ptr;
147
148 // Do not run the Drop implementation if send was called, any cleanup happens below.
149 mem::forget(self);
150
151 // SAFETY: The channel exists on the heap for the entire duration of this method, and we
152 // only ever acquire shared references to it. Note that if the receiver disconnects it
153 // does not free the channel.
154 let channel = unsafe { channel_ptr.as_ref() };
155
156 // Write the message into the channel on the heap.
157 //
158 // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
159 // state, and since we are responsible for setting that state, we can guarantee that we have
160 // exclusive access to this memory location to perform this write.
161 unsafe { channel.write_message(message) };
162
163 // Update the state to signal there is a message on the channel:
164 //
165 // * EMPTY + 1 = MESSAGE
166 // * RECEIVING + 1 = AWAKING
167 // * DISCONNECTED + 1 = EMPTY (invalid), however this state is never observed
168 //
169 // ORDERING: we use release ordering to ensure writing the message is visible to the
170 // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
171 // and thus we do not need an acquire ordering. The RECEIVING branch manages synchronization
172 // independent of this operation.
173 match channel.state.fetch_add(1, Ordering::Release) {
174 // The receiver is alive and has not started waiting. Send done.
175 EMPTY => Ok(()),
176 // The receiver is waiting. Wake it up so it can return the message.
177 RECEIVING => {
178 sender_wake_up_receiver(channel, MESSAGE);
179 Ok(())
180 }
181 // The receiver was already dropped. The error is responsible for freeing the channel.
182 //
183 // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
184 // we can transfer exclusive ownership of the channel's resources to the error.
185 // Moreover, since we just placed the message in the channel, the channel contains a
186 // valid message.
187 DISCONNECTED => Err(SendError { channel_ptr }),
188 state => unreachable!("unexpected channel state: {}", state),
189 }
190 }
191
192 /// Returns true if the associated [`Receiver`] has been dropped.
193 ///
194 /// If true is returned, a future call to send is guaranteed to return an error.
195 pub fn is_closed(&self) -> bool {
196 // SAFETY: The channel exists on the heap for the entire duration of this method, and we
197 // only ever acquire shared references to it. Note that if the receiver disconnects it
198 // does not free the channel.
199 let channel = unsafe { self.channel_ptr.as_ref() };
200
201 // ORDERING: We *chose* a Relaxed ordering here as it sufficient to enforce the method's
202 // contract: "if true is returned, a future call to send is guaranteed to return an error."
203 //
204 // Once true has been observed, it will remain true. However, if false is observed,
205 // the receiver might have just disconnected but this thread has not observed it yet.
206 matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
207 }
208}
209
210impl<T> Drop for Sender<T> {
211 fn drop(&mut self) {
212 // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
213 // DISCONNECTED states.
214 //
215 // * If we are in the MESSAGE state, then we called mem::forget(self), so we should
216 // not be in this function call.
217 // * If we are in the DISCONNECTED state, then the receiver either received a MESSAGE
218 // so this statement is unreachable, or was dropped and observed that our side was still
219 // alive, and thus didn't free the channel.
220 let channel = unsafe { self.channel_ptr.as_ref() };
221
222 // Update the channel state to disconnected:
223 //
224 // * EMPTY ^ 001 = DISCONNECTED
225 // * RECEIVING ^ 001 = AWAKING
226 // * DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
227 //
228 // ORDERING: we need not release ordering here since there are no modifications we
229 // need to make visible to other thread, and the Err(RECEIVING) branch handles
230 // synchronization independent of this fetch_xor
231 match channel.state.fetch_xor(0b001, Ordering::Relaxed) {
232 // The receiver has not started waiting, nor is it dropped.
233 EMPTY => {}
234 // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
235 RECEIVING => sender_wake_up_receiver(channel, DISCONNECTED),
236 // The receiver was already dropped. We are responsible for freeing the channel.
237 DISCONNECTED => {
238 // SAFETY: when the receiver switches the state to DISCONNECTED they have received
239 // the message or will no longer be trying to receive the message, and have
240 // observed that the sender is still alive, meaning that we are responsible for
241 // freeing the channel allocation.
242 unsafe { dealloc(self.channel_ptr) };
243 }
244 state => unreachable!("unexpected channel state: {}", state),
245 }
246 }
247}
248
249/// Receives a value from the associated [`Sender`].
250#[derive(Debug)]
251pub struct Receiver<T> {
252 channel_ptr: NonNull<Channel<T>>,
253}
254
255unsafe impl<T: Send> Send for Receiver<T> {}
256
257impl<T> IntoFuture for Receiver<T> {
258 type Output = Result<T, RecvError>;
259
260 type IntoFuture = Recv<T>;
261
262 fn into_future(self) -> Self::IntoFuture {
263 let Receiver { channel_ptr } = self;
264 // Do not run our Drop implementation, since the receiver lives on as the new future.
265 mem::forget(self);
266 Recv { channel_ptr }
267 }
268}
269
270impl<T> Receiver<T> {
271 /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
272 /// the message has already been received.
273 ///
274 /// If `true` is returned, all future calls to receive the message are guaranteed to return
275 /// [`RecvError`]. And future calls to this method is guaranteed to also return `true`.
276 pub fn is_closed(&self) -> bool {
277 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
278 // is still alive, meaning that even if the sender was dropped then it would have observed
279 // the fact that we are still alive and left the responsibility of deallocating the
280 // channel to us, so `self.channel` is valid
281 let channel = unsafe { self.channel_ptr.as_ref() };
282
283 // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
284 // enforce the method's contract.
285 //
286 // Once true has been observed, it will remain true. However, if false is observed,
287 // the sender might have just disconnected but this thread has not observed it yet.
288 matches!(channel.state.load(Ordering::Relaxed), DISCONNECTED)
289 }
290
291 /// Returns true if there is a message in the channel, ready to be received.
292 ///
293 /// If `true` is returned, the next call to receive the message is guaranteed to return
294 /// the message immediately.
295 pub fn has_message(&self) -> bool {
296 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
297 // is still alive, meaning that even if the sender was dropped then it would have observed
298 // the fact that we are still alive and left the responsibility of deallocating the
299 // channel to us, so `self.channel` is valid
300 let channel = unsafe { self.channel_ptr.as_ref() };
301
302 // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
303 // before this one. This upholds the contract that if true is returned, the next call to
304 // receive the message is guaranteed to also observe the `MESSAGE` state and return the
305 // message immediately.
306 matches!(channel.state.load(Ordering::Acquire), MESSAGE)
307 }
308
309 /// Checks if there is a message in the channel without blocking. Returns:
310 ///
311 /// * `Ok(Some(message))` if there was a message in the channel.
312 /// * `Ok(None)` if the [`Sender`] is alive, but has not yet sent a message.
313 /// * `Err(RecvError)` if the [`Sender`] was dropped before sending anything or if the message
314 /// has already been extracted by a previous `try_recv` call.
315 ///
316 /// If a message is returned, the channel is disconnected and any subsequent receive operation
317 /// using this receiver will return an [`RecvError`].
318 pub fn try_recv(&self) -> Result<Option<T>, RecvError> {
319 // SAFETY: The channel will not be freed while this method is still running.
320 let channel = unsafe { self.channel_ptr.as_ref() };
321
322 // ORDERING: we use acquire ordering to synchronize with the store of the message.
323 match channel.state.load(Ordering::Acquire) {
324 EMPTY => Ok(None),
325 DISCONNECTED => Err(RecvError(())),
326 MESSAGE => {
327 // It is okay to break up the load and store since once we are in the MESSAGE state,
328 // the sender no longer modifies the state
329 //
330 // ORDERING: at this point the sender has done its job and is no longer active, so
331 // we need not make any side effects visible to it.
332 channel.state.store(DISCONNECTED, Ordering::Relaxed);
333
334 // SAFETY: we are in the MESSAGE state so the message is present
335 Ok(Some(unsafe { channel.take_message() }))
336 }
337 state => unreachable!("unexpected channel state: {}", state),
338 }
339 }
340}
341
342impl<T> Drop for Receiver<T> {
343 fn drop(&mut self) {
344 // SAFETY: since the receiving side is still alive the sender would have observed that and
345 // left deallocating the channel allocation to us.
346 let channel = unsafe { self.channel_ptr.as_ref() };
347
348 // Set the channel state to disconnected and read what state the receiver was in.
349 match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
350 // The sender has not sent anything, nor is it dropped.
351 EMPTY => {}
352 // The sender already sent something. We must drop it, and free the channel.
353 MESSAGE => {
354 unsafe { channel.drop_message() };
355 unsafe { dealloc(self.channel_ptr) };
356 }
357 // The sender was already dropped. We are responsible for freeing the channel.
358 DISCONNECTED => {
359 unsafe { dealloc(self.channel_ptr) };
360 }
361 // NOTE: the receiver, unless transformed into a future, will never see the
362 // RECEIVING or AWAKING states, so we can ignore them here.
363 state => unreachable!("unexpected channel state: {}", state),
364 }
365 }
366}
367
368/// A future that completes when the message is sent from the associated [`Sender`], or the
369/// [`Sender`] is dropped before sending a message.
370#[derive(Debug)]
371pub struct Recv<T> {
372 channel_ptr: NonNull<Channel<T>>,
373}
374
375unsafe impl<T: Send> Send for Recv<T> {}
376
377fn recv_awaken<T>(channel: &Channel<T>) -> Poll<Result<T, RecvError>> {
378 loop {
379 hint::spin_loop();
380
381 // ORDERING: The load above has already synchronized with writing message.
382 match channel.state.load(Ordering::Relaxed) {
383 AWAKING => {}
384 DISCONNECTED => break Poll::Ready(Err(RecvError(()))),
385 MESSAGE => {
386 // ORDERING: the sender has been dropped, so this update only
387 // needs to be visible to us.
388 channel.state.store(DISCONNECTED, Ordering::Relaxed);
389 // SAFETY: We observed the MESSAGE state.
390 break Poll::Ready(Ok(unsafe { channel.take_message() }));
391 }
392 state => unreachable!("unexpected channel state: {}", state),
393 }
394 }
395}
396
397impl<T> Future for Recv<T> {
398 type Output = Result<T, RecvError>;
399
400 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
401 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
402 // is still alive, meaning that even if the sender was dropped then it would have observed
403 // the fact that we are still alive and left the responsibility of deallocating the
404 // channel to us, so `self.channel` is valid
405 let channel = unsafe { self.channel_ptr.as_ref() };
406
407 // ORDERING: we use acquire ordering to synchronize with the store of the message.
408 match channel.state.load(Ordering::Acquire) {
409 // The sender is alive but has not sent anything yet.
410 EMPTY => {
411 let waker = cx.waker().clone();
412 // SAFETY: We can not be in the forbidden states, and no waker in the channel.
413 unsafe { channel.write_waker(waker) }
414 }
415 // The sender sent the message.
416 MESSAGE => {
417 // ORDERING: the sender has been dropped so this update only needs to be
418 // visible to us.
419 channel.state.store(DISCONNECTED, Ordering::Relaxed);
420 Poll::Ready(Ok(unsafe { channel.take_message() }))
421 }
422 // We were polled again while waiting for the sender. Replace the waker with the new
423 // one.
424 RECEIVING => {
425 // ORDERING: We use relaxed ordering on both success and failure since we have not
426 // written anything above that must be released, and the individual match arms
427 // handle any additional synchronization.
428 match channel.state.compare_exchange(
429 RECEIVING,
430 EMPTY,
431 Ordering::Relaxed,
432 Ordering::Relaxed,
433 ) {
434 // We successfully changed the state back to EMPTY.
435 //
436 // This is the most likely branch to be taken, which is why we do not use any
437 // memory barriers in the compare_exchange above.
438 Ok(_) => {
439 let waker = cx.waker().clone();
440
441 // SAFETY: We wrote the waker in a previous call to poll. We do not need
442 // a memory barrier since the previous write here was by ourselves.
443 unsafe { channel.drop_waker() };
444
445 // SAFETY: We can not be in the forbidden states, and no waker in the
446 // channel.
447 unsafe { channel.write_waker(waker) }
448 }
449 // The sender sent the message while we prepared to replace the waker.
450 // We take the message and mark the channel disconnected.
451 // The sender has already taken the waker.
452 Err(MESSAGE) => {
453 // ORDERING: Synchronize with writing message. This branch is
454 // unlikely to be taken.
455 channel.state.swap(DISCONNECTED, Ordering::Acquire);
456
457 // SAFETY: The state tells us the sender has initialized the message.
458 Poll::Ready(Ok(unsafe { channel.take_message() }))
459 }
460 // The sender is currently waking us up.
461 Err(AWAKING) => recv_awaken(channel),
462 // The sender was dropped before sending anything while we prepared to park.
463 // The sender has taken the waker already.
464 Err(DISCONNECTED) => Poll::Ready(Err(RecvError(()))),
465 Err(state) => unreachable!("unexpected channel state: {}", state),
466 }
467 }
468 // The sender has observed the RECEIVING state and is currently reading the waker from
469 // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
470 // state. We busy loop here since we know the sender is done very soon.
471 AWAKING => recv_awaken(channel),
472 // The sender was dropped before sending anything.
473 DISCONNECTED => Poll::Ready(Err(RecvError(()))),
474 state => unreachable!("unexpected channel state: {}", state),
475 }
476 }
477}
478
479impl<T> Drop for Recv<T> {
480 fn drop(&mut self) {
481 // SAFETY: since the receiving side is still alive the sender would have observed that and
482 // left deallocating the channel allocation to us.
483 let channel = unsafe { self.channel_ptr.as_ref() };
484
485 // Set the channel state to disconnected and read what state the receiver was in.
486 match channel.state.swap(DISCONNECTED, Ordering::Acquire) {
487 // The sender has not sent anything, nor is it dropped.
488 EMPTY => {}
489 // The sender already sent something. We must drop it, and free the channel.
490 MESSAGE => {
491 unsafe { channel.drop_message() };
492 unsafe { dealloc(self.channel_ptr) };
493 }
494 // The receiver has been polled. We must drop the waker.
495 RECEIVING => {
496 unsafe { channel.drop_waker() };
497 }
498 // The sender was already dropped. We are responsible for freeing the channel.
499 DISCONNECTED => {
500 // SAFETY: see safety comment at top of function.
501 unsafe { dealloc(self.channel_ptr) };
502 }
503 // This receiver was previously polled, so the channel was in the RECEIVING state.
504 // But the sender has observed the RECEIVING state and is currently reading the waker
505 // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED
506 // state. We busy loop here since we know the sender is done very soon.
507 AWAKING => {
508 loop {
509 hint::spin_loop();
510
511 // ORDERING: The swap above has already synchronized with writing message.
512 match channel.state.load(Ordering::Relaxed) {
513 AWAKING => {}
514 DISCONNECTED => break,
515 MESSAGE => {
516 // SAFETY: we are in the message state so the message is initialized.
517 unsafe { channel.drop_message() };
518 break;
519 }
520 state => unreachable!("unexpected channel state: {}", state),
521 }
522 }
523 unsafe { dealloc(self.channel_ptr) };
524 }
525 state => unreachable!("unexpected channel state: {}", state),
526 }
527 }
528}
529
530/// Internal channel data structure.
531///
532/// The [`channel`] method allocates and puts one instance of this struct on the heap for each
533/// oneshot channel instance. The struct holds:
534///
535/// * The current state of the channel.
536/// * The message in the channel. This memory is uninitialized until the message is sent.
537/// * The waker instance for the task that is currently receiving on this channel. This memory is
538/// uninitialized until the receiver starts receiving.
539struct Channel<T> {
540 state: AtomicU8,
541 message: UnsafeCell<MaybeUninit<T>>,
542 waker: UnsafeCell<MaybeUninit<Waker>>,
543}
544
545impl<T> Channel<T> {
546 const fn new() -> Self {
547 Self {
548 state: AtomicU8::new(EMPTY),
549 message: UnsafeCell::new(MaybeUninit::uninit()),
550 waker: UnsafeCell::new(MaybeUninit::uninit()),
551 }
552 }
553
554 #[inline(always)]
555 unsafe fn message(&self) -> &MaybeUninit<T> {
556 unsafe { &*self.message.get() }
557 }
558
559 #[inline(always)]
560 unsafe fn write_message(&self, message: T) {
561 unsafe {
562 let slot = &mut *self.message.get();
563 slot.as_mut_ptr().write(message);
564 }
565 }
566
567 #[inline(always)]
568 unsafe fn drop_message(&self) {
569 unsafe {
570 let slot = &mut *self.message.get();
571 slot.assume_init_drop();
572 }
573 }
574
575 #[inline(always)]
576 unsafe fn take_message(&self) -> T {
577 unsafe { ptr::read(self.message.get()).assume_init() }
578 }
579
580 /// # Safety
581 ///
582 /// * The `waker` field must not have a waker stored when calling this method.
583 /// * The `state` must not be in the RECEIVING state when calling this method.
584 unsafe fn write_waker(&self, waker: Waker) -> Poll<Result<T, RecvError>> {
585 // Write the waker instance to the channel.
586 //
587 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
588 // try to access the waker until it sees the state set to RECEIVING below.
589 unsafe {
590 let slot = &mut *self.waker.get();
591 slot.as_mut_ptr().write(waker);
592 }
593
594 // ORDERING: we use release ordering on success so the sender can synchronize with
595 // our write of the waker. We use relaxed ordering on failure since the sender does
596 // not need to synchronize with our write and the individual match arms handle any
597 // additional synchronization
598 match self
599 .state
600 .compare_exchange(EMPTY, RECEIVING, Ordering::Release, Ordering::Relaxed)
601 {
602 // We stored our waker, now we return and let the sender wake us up.
603 Ok(_) => Poll::Pending,
604 // The sender sent the message while we prepared to await.
605 // We take the message and mark the channel disconnected.
606 Err(MESSAGE) => {
607 // ORDERING: Synchronize with writing message. This branch is unlikely to be
608 // taken, so it is likely more efficient to use a fence here
609 // instead of AcqRel ordering on the compare_exchange
610 // operation.
611 fence(Ordering::Acquire);
612
613 // SAFETY: we started in the EMPTY state and the sender switched us to the
614 // MESSAGE state. This means that it did not take the waker, so we're
615 // responsible for dropping it.
616 unsafe { self.drop_waker() };
617
618 // ORDERING: sender does not exist, so this update only needs to be visible to
619 // us.
620 self.state.store(DISCONNECTED, Ordering::Relaxed);
621
622 // SAFETY: The MESSAGE state tells us there is a correctly initialized message.
623 Poll::Ready(Ok(unsafe { self.take_message() }))
624 }
625 // The sender was dropped before sending anything while we prepared to await.
626 Err(DISCONNECTED) => {
627 // SAFETY: we started in the EMPTY state and the sender switched us to the
628 // DISCONNECTED state. This means that it did not take the waker, so we are
629 // responsible for dropping it.
630 unsafe { self.drop_waker() };
631 Poll::Ready(Err(RecvError(())))
632 }
633 Err(state) => unreachable!("unexpected channel state: {}", state),
634 }
635 }
636
637 #[inline(always)]
638 unsafe fn drop_waker(&self) {
639 unsafe {
640 let slot = &mut *self.waker.get();
641 slot.assume_init_drop();
642 }
643 }
644
645 #[inline(always)]
646 unsafe fn take_waker(&self) -> Waker {
647 unsafe { ptr::read(self.waker.get()).assume_init() }
648 }
649}
650
651unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
652 unsafe { drop(Box::from_raw(channel.as_ptr())) }
653}
654
655/// An error returned when trying to send on a closed channel. Returned from
656/// [`Sender::send`] if the corresponding [`Receiver`] has already been dropped.
657///
658/// The message that could not be sent can be retrieved again with [`SendError::into_inner`].
659pub struct SendError<T> {
660 channel_ptr: NonNull<Channel<T>>,
661}
662
663unsafe impl<T: Send> Send for SendError<T> {}
664unsafe impl<T: Sync> Sync for SendError<T> {}
665
666impl<T> SendError<T> {
667 /// Get a reference to the message that failed to be sent.
668 pub fn as_inner(&self) -> &T {
669 unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
670 }
671
672 /// Consumes the error and returns the message that failed to be sent.
673 pub fn into_inner(self) -> T {
674 let channel_ptr = self.channel_ptr;
675
676 // Do not run destructor if we consumed ourselves. Freeing happens below.
677 mem::forget(self);
678
679 // SAFETY: we have ownership of the channel
680 let channel: &Channel<T> = unsafe { channel_ptr.as_ref() };
681
682 // SAFETY: we know that the message is initialized according to the safety requirements of
683 // `new`
684 let message = unsafe { channel.take_message() };
685
686 // SAFETY: we own the channel
687 unsafe { dealloc(channel_ptr) };
688
689 message
690 }
691}
692
693impl<T> Drop for SendError<T> {
694 fn drop(&mut self) {
695 // SAFETY: we have ownership of the channel and require that the message is initialized
696 // upon construction
697 unsafe {
698 self.channel_ptr.as_ref().drop_message();
699 dealloc(self.channel_ptr);
700 }
701 }
702}
703
704impl<T> fmt::Display for SendError<T> {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 "sending on a closed channel".fmt(f)
707 }
708}
709
710impl<T> fmt::Debug for SendError<T> {
711 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
712 write!(f, "SendError<{}>(..)", stringify!(T))
713 }
714}
715
716impl<T> std::error::Error for SendError<T> {}
717
718/// An error returned when receiving the message.
719///
720/// The receiving operation can only fail if the corresponding [`Sender`] was dropped
721/// before sending any message, or if a message has already been received on the channel.
722#[derive(Debug, Clone, Eq, PartialEq, Hash)]
723pub struct RecvError(());
724
725impl fmt::Display for RecvError {
726 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
727 "receiving on a closed channel".fmt(f)
728 }
729}
730
731impl std::error::Error for RecvError {}
732
733/// The initial channel state. Active while both endpoints are still alive, no message has been
734/// sent, and the receiver is not receiving.
735const EMPTY: u8 = 0b011;
736/// A message has been sent to the channel, but the receiver has not yet read it.
737const MESSAGE: u8 = 0b100;
738/// No message has yet been sent on the channel, but the receiver future ([`Recv`]) is currently
739/// receiving.
740const RECEIVING: u8 = 0b000;
741/// A message is sending to the channel, or the channel is closing. The receiver future ([`Recv`])
742/// is currently being awakened.
743const AWAKING: u8 = 0b001;
744/// The channel has been closed. This means that either the sender or receiver has been dropped,
745/// or the message sent to the channel has already been received.
746const DISCONNECTED: u8 = 0b010;