Skip to main content

spawned_concurrency/
response.rs

1use crate::error::ActorError;
2use spawned_rt::tasks::oneshot;
3use std::future::Future;
4use std::pin::Pin;
5use std::time::Duration;
6
7pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
8
9// ---------------------------------------------------------------------------
10// Response<T> — unified wrapper for protocol request-response (tasks + threads)
11//
12// Note: this type uses spawned_rt::tasks::oneshot internally, so it transitively
13// depends on tokio even when only the threads runtime is used. Threads mode
14// only constructs the Ready variant, so the tokio types are never instantiated.
15// ---------------------------------------------------------------------------
16
17enum ResponseState<T> {
18    Receiver(oneshot::Receiver<T>),
19    TimedReceiver(Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send>>),
20    Ready(Result<T, ActorError>),
21    Done,
22}
23
24/// Concrete wrapper for protocol request-response methods that works in both
25/// `tasks` (async) and `threads` (blocking) modes.
26///
27/// - **Tasks mode**: wraps a oneshot receiver; `.await` returns `Result<T, ActorError>`
28/// - **Threads mode**: wraps a pre-computed result; use `.unwrap()` / `.expect()` directly
29///
30/// Protocol methods return `Response<T>`:
31/// ```ignore
32/// #[protocol]
33/// pub trait MyProtocol: Send + Sync {
34///     fn query(&self) -> Response<String>;
35/// }
36///
37/// // Tasks: actor.query().await.unwrap()
38/// // Threads: actor.query().unwrap()
39/// ```
40pub struct Response<T>(ResponseState<T>);
41
42// Safety: tokio::sync::oneshot::Receiver<T> implements Unpin, so pinning
43// Response<T> is safe. This is required for the Future::poll implementation
44// which uses self.get_mut().
45impl<T> Unpin for Response<T> {}
46
47impl<T> Response<T> {
48    /// Create a `Response` from a pre-computed result.
49    ///
50    /// Used by the threads runtime where the request blocks at call time
51    /// and the result is immediately available.
52    pub fn ready(result: Result<T, ActorError>) -> Self {
53        Self(ResponseState::Ready(result))
54    }
55
56    /// Extract the value, panicking on error.
57    ///
58    /// For ready (pre-computed) responses. Panics on pending responses —
59    /// use `.await.unwrap()` in async contexts instead.
60    pub fn unwrap(self) -> T {
61        match self.0 {
62            ResponseState::Ready(result) => result.unwrap(),
63            ResponseState::Receiver(_) | ResponseState::TimedReceiver(_) => {
64                panic!("called unwrap() on a pending Response; use .await in async contexts")
65            }
66            ResponseState::Done => panic!("Response already consumed"),
67        }
68    }
69
70    /// Extract the value, panicking with a custom message on error.
71    pub fn expect(self, msg: &str) -> T {
72        match self.0 {
73            ResponseState::Ready(result) => result.expect(msg),
74            ResponseState::Receiver(_) | ResponseState::TimedReceiver(_) => {
75                panic!("{msg}: called expect() on a pending Response; use .await in async contexts")
76            }
77            ResponseState::Done => panic!("{msg}: Response already consumed"),
78        }
79    }
80
81    /// Returns `true` if the response contains `Ok`.
82    /// Only meaningful for ready responses.
83    /// Returns `false` for pending (`Receiver`) or consumed (`Done`) states.
84    pub fn is_ok(&self) -> bool {
85        matches!(&self.0, ResponseState::Ready(Ok(_)))
86    }
87
88    /// Returns `true` if the response contains `Err`.
89    /// Only meaningful for ready responses.
90    /// Returns `false` for pending (`Receiver`) or consumed (`Done`) states.
91    pub fn is_err(&self) -> bool {
92        matches!(&self.0, ResponseState::Ready(Err(_)))
93    }
94
95    /// Maps the inner value if the response is ready and `Ok`.
96    ///
97    /// Panics on pending or consumed responses.
98    pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Response<U> {
99        match self.0 {
100            ResponseState::Ready(result) => Response(ResponseState::Ready(result.map(f))),
101            ResponseState::Receiver(_) | ResponseState::TimedReceiver(_) => {
102                panic!("called map() on a pending Response; use .await in async contexts")
103            }
104            ResponseState::Done => panic!("Response already consumed"),
105        }
106    }
107}
108
109impl<T: Send + 'static> Response<T> {
110    /// Create a `Response` from a oneshot receiver with a timeout.
111    ///
112    /// Used by tasks-mode protocol blanket impls. Returns
113    /// `Err(ActorError::RequestTimeout)` if the timeout expires.
114    pub fn from_with_timeout(
115        result: Result<oneshot::Receiver<T>, ActorError>,
116        duration: Duration,
117    ) -> Self {
118        match result {
119            Ok(rx) => {
120                let fut = Box::pin(async move {
121                    match spawned_rt::tasks::timeout(duration, rx).await {
122                        Ok(Ok(val)) => Ok(val),
123                        Ok(Err(_)) => Err(ActorError::ActorStopped),
124                        Err(_) => Err(ActorError::RequestTimeout),
125                    }
126                });
127                Self(ResponseState::TimedReceiver(fut))
128            }
129            Err(e) => Self(ResponseState::Ready(Err(e))),
130        }
131    }
132}
133
134impl<T> From<Result<oneshot::Receiver<T>, ActorError>> for Response<T> {
135    fn from(result: Result<oneshot::Receiver<T>, ActorError>) -> Self {
136        match result {
137            Ok(rx) => Self(ResponseState::Receiver(rx)),
138            Err(e) => Self(ResponseState::Ready(Err(e))),
139        }
140    }
141}
142
143impl<T: Send + 'static> Future for Response<T> {
144    type Output = Result<T, ActorError>;
145
146    fn poll(
147        self: Pin<&mut Self>,
148        cx: &mut std::task::Context<'_>,
149    ) -> std::task::Poll<Self::Output> {
150        let this = self.get_mut();
151        match &mut this.0 {
152            ResponseState::Receiver(rx) => match Pin::new(rx).poll(cx) {
153                std::task::Poll::Ready(Ok(val)) => {
154                    this.0 = ResponseState::Done;
155                    std::task::Poll::Ready(Ok(val))
156                }
157                std::task::Poll::Ready(Err(_)) => {
158                    this.0 = ResponseState::Done;
159                    std::task::Poll::Ready(Err(ActorError::ActorStopped))
160                }
161                std::task::Poll::Pending => std::task::Poll::Pending,
162            },
163            ResponseState::TimedReceiver(fut) => match fut.as_mut().poll(cx) {
164                std::task::Poll::Ready(result) => {
165                    this.0 = ResponseState::Done;
166                    std::task::Poll::Ready(result)
167                }
168                std::task::Poll::Pending => std::task::Poll::Pending,
169            },
170            ResponseState::Ready(_) => match std::mem::replace(&mut this.0, ResponseState::Done) {
171                ResponseState::Ready(result) => std::task::Poll::Ready(result),
172                _ => unreachable!(),
173            },
174            ResponseState::Done => panic!("Response polled after completion"),
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use spawned_rt::tasks::oneshot;
183
184    #[test]
185    fn ready_ok_unwrap() {
186        let r: Response<i32> = Response::ready(Ok(42));
187        assert_eq!(r.unwrap(), 42);
188    }
189
190    #[test]
191    fn ready_err_is_err() {
192        let r: Response<i32> = Response::ready(Err(ActorError::ActorStopped));
193        assert!(r.is_err());
194    }
195
196    #[test]
197    #[should_panic(expected = "ActorStopped")]
198    fn ready_err_unwrap_panics() {
199        let r: Response<i32> = Response::ready(Err(ActorError::ActorStopped));
200        r.unwrap();
201    }
202
203    #[test]
204    fn future_resolves_from_receiver() {
205        let rt = spawned_rt::tasks::Runtime::new().unwrap();
206        rt.block_on(async {
207            let (tx, rx) = oneshot::channel::<i32>();
208            let resp: Response<i32> = Response::from(Ok(rx));
209            tx.send(99).unwrap();
210            let val = resp.await.unwrap();
211            assert_eq!(val, 99);
212        });
213    }
214
215    #[test]
216    fn future_err_on_dropped_sender() {
217        let rt = spawned_rt::tasks::Runtime::new().unwrap();
218        rt.block_on(async {
219            let (tx, rx) = oneshot::channel::<i32>();
220            let resp: Response<i32> = Response::from(Ok(rx));
221            drop(tx);
222            let result = resp.await;
223            assert!(matches!(result, Err(ActorError::ActorStopped)));
224        });
225    }
226
227    #[test]
228    fn map_transforms_value() {
229        let r: Response<i32> = Response::ready(Ok(2));
230        let mapped = r.map(|x| x * 3);
231        assert_eq!(mapped.unwrap(), 6);
232    }
233
234    #[test]
235    fn timed_receiver_resolves() {
236        let rt = spawned_rt::tasks::Runtime::new().unwrap();
237        rt.block_on(async {
238            let (tx, rx) = oneshot::channel::<i32>();
239            let resp = Response::from_with_timeout(Ok(rx), Duration::from_secs(5));
240            tx.send(42).unwrap();
241            assert_eq!(resp.await.unwrap(), 42);
242        });
243    }
244
245    #[test]
246    fn timed_receiver_times_out() {
247        let rt = spawned_rt::tasks::Runtime::new().unwrap();
248        rt.block_on(async {
249            let (_tx, rx) = oneshot::channel::<i32>();
250            let resp = Response::from_with_timeout(Ok(rx), Duration::from_millis(50));
251            let result = resp.await;
252            assert!(matches!(result, Err(ActorError::RequestTimeout)));
253        });
254    }
255}