yash_executor/
forwarder.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2024 WATANABE Yuki
3
4//! Utilities for forwarding the result of a future to another future
5//!
6//! The [`forwarder`] function creates a pair of [`Sender`] and [`Receiver`] that
7//! can be used to forward the result of a future to another future. The sender
8//! half is used to send the result, and the receiver half is used to receive the
9//! result.
10//!
11//! ```
12//! # use yash_executor::forwarder::*;
13//! let (sender, receiver) = forwarder::<u32>();
14//!
15//! // The result is not yet available
16//! assert_eq!(receiver.try_receive(), Err(TryReceiveError::NotSent));
17//!
18//! // Send the result
19//! sender.send(42).unwrap();
20//!
21//! // The result is now available
22//! assert_eq!(receiver.try_receive(), Ok(42));
23//! ```
24//!
25//! If the `Sender` is dropped before sending the result, the `Receiver` will
26//! never receive the result. If the `Receiver` is dropped before receiving the
27//! result, the `Sender` will not be able to send the result, but it does not
28//! otherwise affect the task that produces the result.
29
30use alloc::rc::{Rc, Weak};
31use core::cell::RefCell;
32use core::fmt::Display;
33use core::future::Future;
34use core::pin::Pin;
35use core::task::{Context, Poll, Waker};
36
37/// State shared between the sender and receiver
38#[derive(Debug)]
39enum Relay<T> {
40    /// The result has not been computed yet, and the receiver has not been polled.
41    Pending,
42    /// The result has not been computed yet, and the receiver has been polled.
43    Polled(Waker),
44    /// The result has been computed, but the receiver has not received it yet.
45    Computed(T),
46    /// The receiver has received the result.
47    Done,
48}
49
50/// Sender half of the forwarder
51///
52/// See the [module-level documentation](self) for more information.
53#[derive(Debug)]
54pub struct Sender<T> {
55    relay: Weak<RefCell<Relay<T>>>,
56}
57
58/// Receiver half of the forwarder
59///
60/// Call [`try_receive`](Self::try_receive) to examine if the result has been
61/// sent from the sender. `Receiver` also implements the `Future` trait, so you
62/// can use it in an async block or function to receive the result
63/// asynchronously.
64///
65/// See also the [module-level documentation](self) for more information.
66#[derive(Debug)]
67pub struct Receiver<T> {
68    relay: Rc<RefCell<Relay<T>>>,
69}
70
71/// Creates a new forwarder.
72#[must_use]
73pub fn forwarder<T>() -> (Sender<T>, Receiver<T>) {
74    let relay = Rc::new(RefCell::new(Relay::Pending));
75    let sender = Sender {
76        relay: Rc::downgrade(&relay),
77    };
78    let receiver = Receiver { relay };
79    (sender, receiver)
80}
81
82/// Error returned when receiving a value fails
83///
84/// This error may be returned from the [`Receiver::try_receive`] method.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub enum TryReceiveError {
87    /// The sender has been dropped, which means the receiver will never receive
88    /// the value.
89    SenderDropped,
90    /// The value has not been sent yet.
91    NotSent,
92    /// The value has already been received.
93    AlreadyReceived,
94}
95
96impl<T> Sender<T> {
97    /// Sends a value to the receiver.
98    ///
99    /// The value is sent to the receiver. If the receiver has been dropped,
100    /// the value is returned back to the caller.
101    ///
102    /// This method consumes the sender, ensuring that the value is sent at most
103    /// once for each sender-receiver pair.
104    pub fn send(self, value: T) -> Result<(), T> {
105        let Some(relay) = self.relay.upgrade() else {
106            return Err(value);
107        };
108
109        let relay = &mut *relay.borrow_mut();
110        match core::mem::replace(relay, Relay::Computed(value)) {
111            Relay::Pending => Ok(()),
112            Relay::Polled(waker) => {
113                waker.wake();
114                Ok(())
115            }
116            // We can send only once, so these cases are impossible
117            Relay::Computed(_) | Relay::Done => unreachable!(),
118        }
119    }
120}
121
122impl<T> Receiver<T> {
123    /// Receives a value from the sender.
124    ///
125    /// This method is similar to [`poll`](Self::poll), but it does not require
126    /// a `Context` argument. If the value has not been sent yet, this method
127    /// returns `Err(TryReceiveError::NotSent)`.
128    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
129        let relay = &mut *self.relay.borrow_mut();
130        match relay {
131            Relay::Pending | Relay::Polled(_) => {
132                if Rc::weak_count(&self.relay) == 0 {
133                    Err(TryReceiveError::SenderDropped)
134                } else {
135                    Err(TryReceiveError::NotSent)
136                }
137            }
138
139            Relay::Computed(_) => {
140                let Relay::Computed(value) = core::mem::replace(relay, Relay::Done) else {
141                    unreachable!()
142                };
143                Ok(value)
144            }
145
146            Relay::Done => Err(TryReceiveError::AlreadyReceived),
147        }
148    }
149}
150
151impl<T> Future for Receiver<T> {
152    type Output = T;
153
154    /// Polls the receiver to receive the value.
155    ///
156    /// This method is similar to [`try_receive`](Self::try_receive), but it
157    /// requires a `Context` argument. If the value has not been sent yet, this
158    /// method returns `Poll::Pending` and stores the `Waker` from the `Context`
159    /// for waking up the current task when the value is sent.
160    ///
161    /// This method should not be called after the value has been received.
162    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<T> {
163        let relay = &mut *self.relay.borrow_mut();
164        match relay {
165            Relay::Pending | Relay::Polled(_) => {
166                *relay = Relay::Polled(context.waker().clone());
167                Poll::Pending
168            }
169
170            Relay::Computed(_) => {
171                let Relay::Computed(value) = core::mem::replace(relay, Relay::Done) else {
172                    unreachable!()
173                };
174                Poll::Ready(value)
175            }
176
177            Relay::Done => panic!("Receiver polled after receiving the value"),
178        }
179    }
180}
181
182impl Display for TryReceiveError {
183    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
184        match self {
185            TryReceiveError::SenderDropped => "sender already dropped".fmt(f),
186            TryReceiveError::NotSent => "result not sent yet".fmt(f),
187            TryReceiveError::AlreadyReceived => "result already received".fmt(f),
188        }
189    }
190}
191
192// TODO Bump MSRV to 1.81.0 to impl core::error::Error for TryReceiveError