1use super::ActorError;
5use crate::{LocalTaskSpawner, TaskSpawner};
6use futures::{FutureExt, Sink, Stream};
7use std::{
8 any::type_name,
9 borrow::Borrow,
10 fmt::Debug,
11 future::Future,
12 pin::Pin,
13 task::{Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16#[cfg(feature = "js")]
17use tokio_with_wasm::alias as tokio;
18
19#[must_use]
24pub struct Response<T> {
25 tx: oneshot::Sender<T>,
26}
27impl<T> Response<T> {
28 pub fn respond(self, value: T) {
33 self.send(value).ok();
34 }
35
36 pub async fn respond_execute<Fut, F>(self, value: F)
38 where
39 Fut: Future<Output = T> + Send,
40 F: FnOnce() -> Fut + Send,
41 {
42 self.respond(value().await)
43 }
44
45 pub fn send(self, value: T) -> Result<(), ActorError> {
50 self.tx.send(value).map_err(|_| ActorError::Canceled)
51 }
52
53 pub async fn execute<Fut, F>(self, value: F) -> Result<(), ActorError>
55 where
56 Fut: Future<Output = T> + Send,
57 F: FnOnce() -> Fut + Send,
58 {
59 self.send(value().await)
60 }
61
62 #[inline]
64 #[track_caller]
65 pub fn spawn<Fut, F>(self, value: F)
66 where
67 Fut: Future<Output = T> + Send + 'static,
68 F: FnOnce() -> Fut + Send + 'static,
69 T: Send + 'static,
70 {
71 Self::spawn_with(self, TaskSpawner::default(), value);
72 }
73
74 #[inline]
76 #[track_caller]
77 pub fn spawn_with<Fut, F>(self, spawner: impl Borrow<TaskSpawner>, value: F)
78 where
79 Fut: Future<Output = T> + Send + 'static,
80 F: FnOnce() -> Fut + Send + 'static,
81 T: Send + 'static,
82 {
83 spawner.borrow().spawn(async move { self.send(value().await).ok() });
84 }
85
86 #[inline]
88 #[track_caller]
89 pub fn spawn_local<Fut, F>(self, spawner: impl LocalTaskSpawner, value: F)
90 where
91 Fut: Future<Output = T> + 'static,
92 F: FnOnce() -> Fut + 'static,
93 T: Send + 'static,
94 {
95 spawner.spawn_local(async move { self.send(value().await).ok() });
96 }
97}
98impl<T> Debug for Response<T> {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("Response")
101 .field("response_type", &type_name::<T>())
102 .field("tx_closed", &self.tx.is_closed())
103 .finish()
104 }
105}
106
107pub struct ResponseReceiver<T> {
108 rx: oneshot::Receiver<T>,
109}
110impl<T> ResponseReceiver<T> {
111 pub fn new() -> (Response<T>, ResponseReceiver<T>) {
112 let (tx, rx) = oneshot::channel();
113 (Response { tx }, ResponseReceiver { rx })
114 }
115}
116impl<T> Future for ResponseReceiver<T> {
117 type Output = Result<T, ActorError>;
118
119 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120 self.rx.poll_unpin(cx).map_err(|_e| ActorError::Canceled)
121 }
122}
123
124#[must_use]
126pub struct ResponseStream<T> {
127 tx: mpsc::UnboundedSender<T>,
128}
129impl<T> ResponseStream<T> {
130 pub fn send(&mut self, value: T) -> Result<(), ActorError> {
131 self.tx.send(value).map_err(|_| ActorError::Canceled)
132 }
133
134 pub fn is_closed(&self) -> bool {
136 self.tx.is_closed()
137 }
138
139 pub fn complete(self) -> Result<(), ActorError> {
140 Ok(())
142 }
143}
144impl<T> Sink<T> for ResponseStream<T> {
145 type Error = T;
146
147 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 Poll::Ready(Ok(()))
149 }
150
151 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
152 self.get_mut().tx.send(item).map_err(|err| err.0)
153 }
154
155 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 Poll::Ready(Ok(()))
157 }
158
159 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160 Poll::Ready(Ok(()))
161 }
162}
163impl<T> Debug for ResponseStream<T> {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("ResponseStream")
166 .field("response_type", &type_name::<T>())
167 .field("tx_closed", &self.tx.is_closed())
168 .finish()
169 }
170}
171
172pub struct ResponseStreamReceiver<T> {
173 rx: mpsc::UnboundedReceiver<T>,
174}
175impl<T> ResponseStreamReceiver<T> {
176 pub fn new() -> (ResponseStream<T>, ResponseStreamReceiver<T>) {
177 let (tx, rx) = mpsc::unbounded_channel();
178 (ResponseStream { tx }, ResponseStreamReceiver { rx })
179 }
180}
181impl<T> Stream for ResponseStreamReceiver<T> {
182 type Item = T;
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 self.rx.poll_recv(cx)
186 }
187}
188
189#[must_use]
191pub struct ResponseBackPressureStream<T> {
192 tx: mpsc::Sender<Result<T, ActorError>>,
193}
194impl<T> ResponseBackPressureStream<T> {
195 pub async fn send(&mut self, value: T) -> Result<(), ActorError> {
196 self.tx.send(Ok(value)).await.map_err(|_| ActorError::Canceled)
197 }
198
199 pub fn is_closed(&self) -> bool {
201 self.tx.is_closed()
202 }
203
204 pub fn complete(self) -> Result<(), ActorError> {
205 Ok(())
207 }
208}
209impl<T> Debug for ResponseBackPressureStream<T> {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("ResponseBackPressureStream")
212 .field("response_type", &type_name::<T>())
213 .field("tx_closed", &self.tx.is_closed())
214 .finish()
215 }
216}
217
218pub struct ResponseBackPressureStreamReceiver<T> {
219 rx: mpsc::Receiver<Result<T, ActorError>>,
220}
221impl<T> ResponseBackPressureStreamReceiver<T> {
222 pub fn new(buffer: usize) -> (ResponseBackPressureStream<T>, ResponseBackPressureStreamReceiver<T>) {
223 let (tx, rx) = mpsc::channel(buffer);
224 (ResponseBackPressureStream { tx }, ResponseBackPressureStreamReceiver { rx })
225 }
226}
227impl<T: Debug> Stream for ResponseBackPressureStreamReceiver<T> {
228 type Item = Result<T, ActorError>;
229
230 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231 self.rx.poll_recv(cx)
232 }
233}
234
235#[derive(Debug)]
236pub struct ResponseStreams<T> {
237 streams: Vec<ResponseStream<T>>,
238}
239impl<T> Default for ResponseStreams<T> {
240 fn default() -> Self {
241 Self { streams: Default::default() }
242 }
243}
244impl<T> ResponseStreams<T>
245where
246 T: Clone,
247{
248 pub fn push(&mut self, stream: ResponseStream<T>) {
249 self.streams.push(stream);
250 }
251
252 pub fn send(&mut self, value: T) {
253 self.streams
254 .retain_mut(|stream| !matches!(stream.send(value.clone()), Err(ActorError::Canceled)));
255 }
256
257 pub fn is_empty(&self) -> bool {
258 self.streams.is_empty() || self.is_closed()
259 }
260
261 pub fn is_closed(&self) -> bool {
263 !self.streams.iter().any(|s| !s.is_closed())
264 }
265}