Skip to main content

co_actor/
response.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use super::ActorError;
5use crate::{LocalTaskSpawner, TaskSpawner};
6use futures::{FutureExt, Sink, Stream};
7use std::{
8	any::type_name,
9	borrow::Borrow,
10	fmt::Debug,
11	future::Future,
12	pin::Pin,
13	task::{Context, Poll},
14};
15use tokio::sync::{mpsc, oneshot};
16#[cfg(feature = "js")]
17use tokio_with_wasm::alias as tokio;
18
19/// Response.
20///
21/// # Notes
22/// - When the response is dropped inside the actor and has not been used we receive a canceled on the caller side.
23#[must_use]
24pub struct Response<T> {
25	tx: oneshot::Sender<T>,
26}
27impl<T> Response<T> {
28	/// Send Response to caller.
29	///
30	/// # Notes
31	/// - Ignores when the caller is not waiting for the response. When you want to handle this use [`Response::send`].
32	pub fn respond(self, value: T) {
33		self.send(value).ok();
34	}
35
36	/// Executes closure and respond with the result
37	pub async fn respond_execute<Fut, F>(self, value: F)
38	where
39		Fut: Future<Output = T> + Send,
40		F: FnOnce() -> Fut + Send,
41	{
42		self.respond(value().await)
43	}
44
45	/// Try to send response to caller.
46	///
47	/// # Errors
48	/// - Fails with [`ActorError::Canceled`] when the caller is not waiting for the result.
49	pub fn send(self, value: T) -> Result<(), ActorError> {
50		self.tx.send(value).map_err(|_| ActorError::Canceled)
51	}
52
53	/// Executes closure and respond with the result
54	pub async fn execute<Fut, F>(self, value: F) -> Result<(), ActorError>
55	where
56		Fut: Future<Output = T> + Send,
57		F: FnOnce() -> Fut + Send,
58	{
59		self.send(value().await)
60	}
61
62	/// Spawns a new task and executes given closure in it
63	#[inline]
64	#[track_caller]
65	pub fn spawn<Fut, F>(self, value: F)
66	where
67		Fut: Future<Output = T> + Send + 'static,
68		F: FnOnce() -> Fut + Send + 'static,
69		T: Send + 'static,
70	{
71		Self::spawn_with(self, TaskSpawner::default(), value);
72	}
73
74	/// Spawns a new task using the given spawner and executes given closure in it
75	#[inline]
76	#[track_caller]
77	pub fn spawn_with<Fut, F>(self, spawner: impl Borrow<TaskSpawner>, value: F)
78	where
79		Fut: Future<Output = T> + Send + 'static,
80		F: FnOnce() -> Fut + Send + 'static,
81		T: Send + 'static,
82	{
83		spawner.borrow().spawn(async move { self.send(value().await).ok() });
84	}
85
86	/// Spawns a new task using the given spawner and executes given closure in it
87	#[inline]
88	#[track_caller]
89	pub fn spawn_local<Fut, F>(self, spawner: impl LocalTaskSpawner, value: F)
90	where
91		Fut: Future<Output = T> + 'static,
92		F: FnOnce() -> Fut + 'static,
93		T: Send + 'static,
94	{
95		spawner.spawn_local(async move { self.send(value().await).ok() });
96	}
97}
98impl<T> Debug for Response<T> {
99	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100		f.debug_struct("Response")
101			.field("response_type", &type_name::<T>())
102			.field("tx_closed", &self.tx.is_closed())
103			.finish()
104	}
105}
106
107pub struct ResponseReceiver<T> {
108	rx: oneshot::Receiver<T>,
109}
110impl<T> ResponseReceiver<T> {
111	pub fn new() -> (Response<T>, ResponseReceiver<T>) {
112		let (tx, rx) = oneshot::channel();
113		(Response { tx }, ResponseReceiver { rx })
114	}
115}
116impl<T> Future for ResponseReceiver<T> {
117	type Output = Result<T, ActorError>;
118
119	fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120		self.rx.poll_unpin(cx).map_err(|_e| ActorError::Canceled)
121	}
122}
123
124/// A streaming response.
125#[must_use]
126pub struct ResponseStream<T> {
127	tx: mpsc::UnboundedSender<T>,
128}
129impl<T> ResponseStream<T> {
130	pub fn send(&mut self, value: T) -> Result<(), ActorError> {
131		self.tx.send(value).map_err(|_| ActorError::Canceled)
132	}
133
134	/// Test if the stream has been closed by the caller.
135	pub fn is_closed(&self) -> bool {
136		self.tx.is_closed()
137	}
138
139	pub fn complete(self) -> Result<(), ActorError> {
140		// will be closed on drop
141		Ok(())
142	}
143}
144impl<T> Sink<T> for ResponseStream<T> {
145	type Error = T;
146
147	fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148		Poll::Ready(Ok(()))
149	}
150
151	fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
152		self.get_mut().tx.send(item).map_err(|err| err.0)
153	}
154
155	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156		Poll::Ready(Ok(()))
157	}
158
159	fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160		Poll::Ready(Ok(()))
161	}
162}
163impl<T> Debug for ResponseStream<T> {
164	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165		f.debug_struct("ResponseStream")
166			.field("response_type", &type_name::<T>())
167			.field("tx_closed", &self.tx.is_closed())
168			.finish()
169	}
170}
171
172pub struct ResponseStreamReceiver<T> {
173	rx: mpsc::UnboundedReceiver<T>,
174}
175impl<T> ResponseStreamReceiver<T> {
176	pub fn new() -> (ResponseStream<T>, ResponseStreamReceiver<T>) {
177		let (tx, rx) = mpsc::unbounded_channel();
178		(ResponseStream { tx }, ResponseStreamReceiver { rx })
179	}
180}
181impl<T> Stream for ResponseStreamReceiver<T> {
182	type Item = T;
183
184	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185		self.rx.poll_recv(cx)
186	}
187}
188
189/// A streaming response with backpressure (bounded).
190#[must_use]
191pub struct ResponseBackPressureStream<T> {
192	tx: mpsc::Sender<Result<T, ActorError>>,
193}
194impl<T> ResponseBackPressureStream<T> {
195	pub async fn send(&mut self, value: T) -> Result<(), ActorError> {
196		self.tx.send(Ok(value)).await.map_err(|_| ActorError::Canceled)
197	}
198
199	/// Test if the stream has been closed by the caller.
200	pub fn is_closed(&self) -> bool {
201		self.tx.is_closed()
202	}
203
204	pub fn complete(self) -> Result<(), ActorError> {
205		// will be closed on drop
206		Ok(())
207	}
208}
209impl<T> Debug for ResponseBackPressureStream<T> {
210	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211		f.debug_struct("ResponseBackPressureStream")
212			.field("response_type", &type_name::<T>())
213			.field("tx_closed", &self.tx.is_closed())
214			.finish()
215	}
216}
217
218pub struct ResponseBackPressureStreamReceiver<T> {
219	rx: mpsc::Receiver<Result<T, ActorError>>,
220}
221impl<T> ResponseBackPressureStreamReceiver<T> {
222	pub fn new(buffer: usize) -> (ResponseBackPressureStream<T>, ResponseBackPressureStreamReceiver<T>) {
223		let (tx, rx) = mpsc::channel(buffer);
224		(ResponseBackPressureStream { tx }, ResponseBackPressureStreamReceiver { rx })
225	}
226}
227impl<T: Debug> Stream for ResponseBackPressureStreamReceiver<T> {
228	type Item = Result<T, ActorError>;
229
230	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231		self.rx.poll_recv(cx)
232	}
233}
234
235#[derive(Debug)]
236pub struct ResponseStreams<T> {
237	streams: Vec<ResponseStream<T>>,
238}
239impl<T> Default for ResponseStreams<T> {
240	fn default() -> Self {
241		Self { streams: Default::default() }
242	}
243}
244impl<T> ResponseStreams<T>
245where
246	T: Clone,
247{
248	pub fn push(&mut self, stream: ResponseStream<T>) {
249		self.streams.push(stream);
250	}
251
252	pub fn send(&mut self, value: T) {
253		self.streams
254			.retain_mut(|stream| !matches!(stream.send(value.clone()), Err(ActorError::Canceled)));
255	}
256
257	pub fn is_empty(&self) -> bool {
258		self.streams.is_empty() || self.is_closed()
259	}
260
261	/// Test if the streams has been closed by the caller.
262	pub fn is_closed(&self) -> bool {
263		!self.streams.iter().any(|s| !s.is_closed())
264	}
265}