spawned_concurrency/
response.rs1use crate::error::ActorError;
2use spawned_rt::tasks::oneshot;
3use std::future::Future;
4use std::pin::Pin;
5use std::time::Duration;
6
7pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
8
9enum ResponseState<T> {
18 Receiver(oneshot::Receiver<T>),
19 TimedReceiver(Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send>>),
20 Ready(Result<T, ActorError>),
21 Done,
22}
23
24pub struct Response<T>(ResponseState<T>);
41
42impl<T> Unpin for Response<T> {}
46
47impl<T> Response<T> {
48 pub fn ready(result: Result<T, ActorError>) -> Self {
53 Self(ResponseState::Ready(result))
54 }
55
56 pub fn unwrap(self) -> T {
61 match self.0 {
62 ResponseState::Ready(result) => result.unwrap(),
63 ResponseState::Receiver(_) | ResponseState::TimedReceiver(_) => {
64 panic!("called unwrap() on a pending Response; use .await in async contexts")
65 }
66 ResponseState::Done => panic!("Response already consumed"),
67 }
68 }
69
70 pub fn expect(self, msg: &str) -> T {
72 match self.0 {
73 ResponseState::Ready(result) => result.expect(msg),
74 ResponseState::Receiver(_) | ResponseState::TimedReceiver(_) => {
75 panic!("{msg}: called expect() on a pending Response; use .await in async contexts")
76 }
77 ResponseState::Done => panic!("{msg}: Response already consumed"),
78 }
79 }
80
81 pub fn is_ok(&self) -> bool {
85 matches!(&self.0, ResponseState::Ready(Ok(_)))
86 }
87
88 pub fn is_err(&self) -> bool {
92 matches!(&self.0, ResponseState::Ready(Err(_)))
93 }
94
95 pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Response<U> {
99 match self.0 {
100 ResponseState::Ready(result) => Response(ResponseState::Ready(result.map(f))),
101 ResponseState::Receiver(_) | ResponseState::TimedReceiver(_) => {
102 panic!("called map() on a pending Response; use .await in async contexts")
103 }
104 ResponseState::Done => panic!("Response already consumed"),
105 }
106 }
107}
108
109impl<T: Send + 'static> Response<T> {
110 pub fn from_with_timeout(
115 result: Result<oneshot::Receiver<T>, ActorError>,
116 duration: Duration,
117 ) -> Self {
118 match result {
119 Ok(rx) => {
120 let fut = Box::pin(async move {
121 match spawned_rt::tasks::timeout(duration, rx).await {
122 Ok(Ok(val)) => Ok(val),
123 Ok(Err(_)) => Err(ActorError::ActorStopped),
124 Err(_) => Err(ActorError::RequestTimeout),
125 }
126 });
127 Self(ResponseState::TimedReceiver(fut))
128 }
129 Err(e) => Self(ResponseState::Ready(Err(e))),
130 }
131 }
132}
133
134impl<T> From<Result<oneshot::Receiver<T>, ActorError>> for Response<T> {
135 fn from(result: Result<oneshot::Receiver<T>, ActorError>) -> Self {
136 match result {
137 Ok(rx) => Self(ResponseState::Receiver(rx)),
138 Err(e) => Self(ResponseState::Ready(Err(e))),
139 }
140 }
141}
142
143impl<T: Send + 'static> Future for Response<T> {
144 type Output = Result<T, ActorError>;
145
146 fn poll(
147 self: Pin<&mut Self>,
148 cx: &mut std::task::Context<'_>,
149 ) -> std::task::Poll<Self::Output> {
150 let this = self.get_mut();
151 match &mut this.0 {
152 ResponseState::Receiver(rx) => match Pin::new(rx).poll(cx) {
153 std::task::Poll::Ready(Ok(val)) => {
154 this.0 = ResponseState::Done;
155 std::task::Poll::Ready(Ok(val))
156 }
157 std::task::Poll::Ready(Err(_)) => {
158 this.0 = ResponseState::Done;
159 std::task::Poll::Ready(Err(ActorError::ActorStopped))
160 }
161 std::task::Poll::Pending => std::task::Poll::Pending,
162 },
163 ResponseState::TimedReceiver(fut) => match fut.as_mut().poll(cx) {
164 std::task::Poll::Ready(result) => {
165 this.0 = ResponseState::Done;
166 std::task::Poll::Ready(result)
167 }
168 std::task::Poll::Pending => std::task::Poll::Pending,
169 },
170 ResponseState::Ready(_) => match std::mem::replace(&mut this.0, ResponseState::Done) {
171 ResponseState::Ready(result) => std::task::Poll::Ready(result),
172 _ => unreachable!(),
173 },
174 ResponseState::Done => panic!("Response polled after completion"),
175 }
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use spawned_rt::tasks::oneshot;
183
184 #[test]
185 fn ready_ok_unwrap() {
186 let r: Response<i32> = Response::ready(Ok(42));
187 assert_eq!(r.unwrap(), 42);
188 }
189
190 #[test]
191 fn ready_err_is_err() {
192 let r: Response<i32> = Response::ready(Err(ActorError::ActorStopped));
193 assert!(r.is_err());
194 }
195
196 #[test]
197 #[should_panic(expected = "ActorStopped")]
198 fn ready_err_unwrap_panics() {
199 let r: Response<i32> = Response::ready(Err(ActorError::ActorStopped));
200 r.unwrap();
201 }
202
203 #[test]
204 fn future_resolves_from_receiver() {
205 let rt = spawned_rt::tasks::Runtime::new().unwrap();
206 rt.block_on(async {
207 let (tx, rx) = oneshot::channel::<i32>();
208 let resp: Response<i32> = Response::from(Ok(rx));
209 tx.send(99).unwrap();
210 let val = resp.await.unwrap();
211 assert_eq!(val, 99);
212 });
213 }
214
215 #[test]
216 fn future_err_on_dropped_sender() {
217 let rt = spawned_rt::tasks::Runtime::new().unwrap();
218 rt.block_on(async {
219 let (tx, rx) = oneshot::channel::<i32>();
220 let resp: Response<i32> = Response::from(Ok(rx));
221 drop(tx);
222 let result = resp.await;
223 assert!(matches!(result, Err(ActorError::ActorStopped)));
224 });
225 }
226
227 #[test]
228 fn map_transforms_value() {
229 let r: Response<i32> = Response::ready(Ok(2));
230 let mapped = r.map(|x| x * 3);
231 assert_eq!(mapped.unwrap(), 6);
232 }
233
234 #[test]
235 fn timed_receiver_resolves() {
236 let rt = spawned_rt::tasks::Runtime::new().unwrap();
237 rt.block_on(async {
238 let (tx, rx) = oneshot::channel::<i32>();
239 let resp = Response::from_with_timeout(Ok(rx), Duration::from_secs(5));
240 tx.send(42).unwrap();
241 assert_eq!(resp.await.unwrap(), 42);
242 });
243 }
244
245 #[test]
246 fn timed_receiver_times_out() {
247 let rt = spawned_rt::tasks::Runtime::new().unwrap();
248 rt.block_on(async {
249 let (_tx, rx) = oneshot::channel::<i32>();
250 let resp = Response::from_with_timeout(Ok(rx), Duration::from_millis(50));
251 let result = resp.await;
252 assert!(matches!(result, Err(ActorError::RequestTimeout)));
253 });
254 }
255}