use super::ActorError;
use crate::{LocalTaskSpawner, TaskSpawner};
use futures::{FutureExt, Sink, Stream};
use std::{
any::type_name,
borrow::Borrow,
fmt::Debug,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "js")]
use tokio_with_wasm::alias as tokio;
#[must_use]
pub struct Response<T> {
tx: oneshot::Sender<T>,
}
impl<T> Response<T> {
pub fn respond(self, value: T) {
self.send(value).ok();
}
pub async fn respond_execute<Fut, F>(self, value: F)
where
Fut: Future<Output = T> + Send,
F: FnOnce() -> Fut + Send,
{
self.respond(value().await)
}
pub fn send(self, value: T) -> Result<(), ActorError> {
self.tx.send(value).map_err(|_| ActorError::Canceled)
}
pub async fn execute<Fut, F>(self, value: F) -> Result<(), ActorError>
where
Fut: Future<Output = T> + Send,
F: FnOnce() -> Fut + Send,
{
self.send(value().await)
}
#[inline]
#[track_caller]
pub fn spawn<Fut, F>(self, value: F)
where
Fut: Future<Output = T> + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
T: Send + 'static,
{
Self::spawn_with(self, TaskSpawner::default(), value);
}
#[inline]
#[track_caller]
pub fn spawn_with<Fut, F>(self, spawner: impl Borrow<TaskSpawner>, value: F)
where
Fut: Future<Output = T> + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
T: Send + 'static,
{
spawner.borrow().spawn(async move { self.send(value().await).ok() });
}
#[inline]
#[track_caller]
pub fn spawn_local<Fut, F>(self, spawner: impl LocalTaskSpawner, value: F)
where
Fut: Future<Output = T> + 'static,
F: FnOnce() -> Fut + 'static,
T: Send + 'static,
{
spawner.spawn_local(async move { self.send(value().await).ok() });
}
}
impl<T> Debug for Response<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Response")
.field("response_type", &type_name::<T>())
.field("tx_closed", &self.tx.is_closed())
.finish()
}
}
pub struct ResponseReceiver<T> {
rx: oneshot::Receiver<T>,
}
impl<T> ResponseReceiver<T> {
pub fn new() -> (Response<T>, ResponseReceiver<T>) {
let (tx, rx) = oneshot::channel();
(Response { tx }, ResponseReceiver { rx })
}
}
impl<T> Future for ResponseReceiver<T> {
type Output = Result<T, ActorError>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.rx.poll_unpin(cx).map_err(|_e| ActorError::Canceled)
}
}
#[must_use]
pub struct ResponseStream<T> {
tx: mpsc::UnboundedSender<T>,
}
impl<T> ResponseStream<T> {
pub fn send(&mut self, value: T) -> Result<(), ActorError> {
self.tx.send(value).map_err(|_| ActorError::Canceled)
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
pub fn complete(self) -> Result<(), ActorError> {
Ok(())
}
}
impl<T> Sink<T> for ResponseStream<T> {
type Error = T;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.get_mut().tx.send(item).map_err(|err| err.0)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
impl<T> Debug for ResponseStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResponseStream")
.field("response_type", &type_name::<T>())
.field("tx_closed", &self.tx.is_closed())
.finish()
}
}
pub struct ResponseStreamReceiver<T> {
rx: mpsc::UnboundedReceiver<T>,
}
impl<T> ResponseStreamReceiver<T> {
pub fn new() -> (ResponseStream<T>, ResponseStreamReceiver<T>) {
let (tx, rx) = mpsc::unbounded_channel();
(ResponseStream { tx }, ResponseStreamReceiver { rx })
}
}
impl<T> Stream for ResponseStreamReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
#[must_use]
pub struct ResponseBackPressureStream<T> {
tx: mpsc::Sender<Result<T, ActorError>>,
}
impl<T> ResponseBackPressureStream<T> {
pub async fn send(&mut self, value: T) -> Result<(), ActorError> {
self.tx.send(Ok(value)).await.map_err(|_| ActorError::Canceled)
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
pub fn complete(self) -> Result<(), ActorError> {
Ok(())
}
}
impl<T> Debug for ResponseBackPressureStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResponseBackPressureStream")
.field("response_type", &type_name::<T>())
.field("tx_closed", &self.tx.is_closed())
.finish()
}
}
pub struct ResponseBackPressureStreamReceiver<T> {
rx: mpsc::Receiver<Result<T, ActorError>>,
}
impl<T> ResponseBackPressureStreamReceiver<T> {
pub fn new(buffer: usize) -> (ResponseBackPressureStream<T>, ResponseBackPressureStreamReceiver<T>) {
let (tx, rx) = mpsc::channel(buffer);
(ResponseBackPressureStream { tx }, ResponseBackPressureStreamReceiver { rx })
}
}
impl<T: Debug> Stream for ResponseBackPressureStreamReceiver<T> {
type Item = Result<T, ActorError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
#[derive(Debug)]
pub struct ResponseStreams<T> {
streams: Vec<ResponseStream<T>>,
}
impl<T> Default for ResponseStreams<T> {
fn default() -> Self {
Self { streams: Default::default() }
}
}
impl<T> ResponseStreams<T>
where
T: Clone,
{
pub fn push(&mut self, stream: ResponseStream<T>) {
self.streams.push(stream);
}
pub fn send(&mut self, value: T) {
self.streams
.retain_mut(|stream| !matches!(stream.send(value.clone()), Err(ActorError::Canceled)));
}
pub fn is_empty(&self) -> bool {
self.streams.is_empty() || self.is_closed()
}
pub fn is_closed(&self) -> bool {
!self.streams.iter().any(|s| !s.is_closed())
}
}