yash_executor/forwarder.rs
1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2024 WATANABE Yuki
3
4//! Utilities for forwarding the result of a future to another future
5//!
6//! The [`forwarder`] function creates a pair of [`Sender`] and [`Receiver`] that
7//! can be used to forward the result of a future to another future. The sender
8//! half is used to send the result, and the receiver half is used to receive the
9//! result.
10//!
11//! ```
12//! # use yash_executor::forwarder::*;
13//! let (sender, receiver) = forwarder::<u32>();
14//!
15//! // The result is not yet available
16//! assert_eq!(receiver.try_receive(), Err(TryReceiveError::NotSent));
17//!
18//! // Send the result
19//! sender.send(42).unwrap();
20//!
21//! // The result is now available
22//! assert_eq!(receiver.try_receive(), Ok(42));
23//! ```
24//!
25//! If the `Sender` is dropped before sending the result, the `Receiver` will
26//! never receive the result. If the `Receiver` is dropped before receiving the
27//! result, the `Sender` will not be able to send the result, but it does not
28//! otherwise affect the task that produces the result.
29
30use alloc::rc::{Rc, Weak};
31use core::cell::RefCell;
32use core::fmt::Display;
33use core::future::Future;
34use core::pin::Pin;
35use core::task::{Context, Poll, Waker};
36
37/// State shared between the sender and receiver
38#[derive(Debug)]
39enum Relay<T> {
40 /// The result has not been computed yet, and the receiver has not been polled.
41 Pending,
42 /// The result has not been computed yet, and the receiver has been polled.
43 Polled(Waker),
44 /// The result has been computed, but the receiver has not received it yet.
45 Computed(T),
46 /// The receiver has received the result.
47 Done,
48}
49
50/// Sender half of the forwarder
51///
52/// See the [module-level documentation](self) for more information.
53#[derive(Debug)]
54pub struct Sender<T> {
55 relay: Weak<RefCell<Relay<T>>>,
56}
57
58/// Receiver half of the forwarder
59///
60/// Call [`try_receive`](Self::try_receive) to examine if the result has been
61/// sent from the sender. `Receiver` also implements the `Future` trait, so you
62/// can use it in an async block or function to receive the result
63/// asynchronously.
64///
65/// See also the [module-level documentation](self) for more information.
66#[derive(Debug)]
67pub struct Receiver<T> {
68 relay: Rc<RefCell<Relay<T>>>,
69}
70
71/// Creates a new forwarder.
72#[must_use]
73pub fn forwarder<T>() -> (Sender<T>, Receiver<T>) {
74 let relay = Rc::new(RefCell::new(Relay::Pending));
75 let sender = Sender {
76 relay: Rc::downgrade(&relay),
77 };
78 let receiver = Receiver { relay };
79 (sender, receiver)
80}
81
82/// Error returned when receiving a value fails
83///
84/// This error may be returned from the [`Receiver::try_receive`] method.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub enum TryReceiveError {
87 /// The sender has been dropped, which means the receiver will never receive
88 /// the value.
89 SenderDropped,
90 /// The value has not been sent yet.
91 NotSent,
92 /// The value has already been received.
93 AlreadyReceived,
94}
95
96impl<T> Sender<T> {
97 /// Sends a value to the receiver.
98 ///
99 /// The value is sent to the receiver. If the receiver has been dropped,
100 /// the value is returned back to the caller.
101 ///
102 /// This method consumes the sender, ensuring that the value is sent at most
103 /// once for each sender-receiver pair.
104 pub fn send(self, value: T) -> Result<(), T> {
105 let Some(relay) = self.relay.upgrade() else {
106 return Err(value);
107 };
108
109 let relay = &mut *relay.borrow_mut();
110 match core::mem::replace(relay, Relay::Computed(value)) {
111 Relay::Pending => Ok(()),
112 Relay::Polled(waker) => {
113 waker.wake();
114 Ok(())
115 }
116 // We can send only once, so these cases are impossible
117 Relay::Computed(_) | Relay::Done => unreachable!(),
118 }
119 }
120}
121
122impl<T> Receiver<T> {
123 /// Receives a value from the sender.
124 ///
125 /// This method is similar to [`poll`](Self::poll), but it does not require
126 /// a `Context` argument. If the value has not been sent yet, this method
127 /// returns `Err(TryReceiveError::NotSent)`.
128 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
129 let relay = &mut *self.relay.borrow_mut();
130 match relay {
131 Relay::Pending | Relay::Polled(_) => {
132 if Rc::weak_count(&self.relay) == 0 {
133 Err(TryReceiveError::SenderDropped)
134 } else {
135 Err(TryReceiveError::NotSent)
136 }
137 }
138
139 Relay::Computed(_) => {
140 let Relay::Computed(value) = core::mem::replace(relay, Relay::Done) else {
141 unreachable!()
142 };
143 Ok(value)
144 }
145
146 Relay::Done => Err(TryReceiveError::AlreadyReceived),
147 }
148 }
149}
150
151impl<T> Future for Receiver<T> {
152 type Output = T;
153
154 /// Polls the receiver to receive the value.
155 ///
156 /// This method is similar to [`try_receive`](Self::try_receive), but it
157 /// requires a `Context` argument. If the value has not been sent yet, this
158 /// method returns `Poll::Pending` and stores the `Waker` from the `Context`
159 /// for waking up the current task when the value is sent.
160 ///
161 /// This method should not be called after the value has been received.
162 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<T> {
163 let relay = &mut *self.relay.borrow_mut();
164 match relay {
165 Relay::Pending | Relay::Polled(_) => {
166 *relay = Relay::Polled(context.waker().clone());
167 Poll::Pending
168 }
169
170 Relay::Computed(_) => {
171 let Relay::Computed(value) = core::mem::replace(relay, Relay::Done) else {
172 unreachable!()
173 };
174 Poll::Ready(value)
175 }
176
177 Relay::Done => panic!("Receiver polled after receiving the value"),
178 }
179 }
180}
181
182impl Display for TryReceiveError {
183 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
184 match self {
185 TryReceiveError::SenderDropped => "sender already dropped".fmt(f),
186 TryReceiveError::NotSent => "result not sent yet".fmt(f),
187 TryReceiveError::AlreadyReceived => "result already received".fmt(f),
188 }
189 }
190}
191
192// TODO Bump MSRV to 1.81.0 to impl core::error::Error for TryReceiveError