use std::sync::Arc;
use crate::{
actor::Actor,
cfg_runtime,
context::{InputHandle, Signal},
envelope::{EnvelopeProxy, MessageEnvelope, NotificationEnvelope},
errors::SendError,
handler::{Handler, Notifiable},
};
use futures::{lock::Mutex, Stream, StreamExt};
pub struct Address<A> {
sender: async_channel::Sender<Signal<InputHandle<A>>>,
stop_handle: Arc<Mutex<()>>,
}
impl<A> std::fmt::Debug for Address<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Address").finish()
}
}
impl<A> Clone for Address<A> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
stop_handle: self.stop_handle.clone(),
}
}
}
impl<A> Address<A> {
pub(crate) fn new(
sender: async_channel::Sender<Signal<InputHandle<A>>>,
stop_handle: Arc<Mutex<()>>,
) -> Self {
Self {
sender,
stop_handle,
}
}
pub async fn send<IN>(&mut self, message: IN) -> Result<A::Result, SendError>
where
A: Actor + Send + Handler<IN> + 'static,
IN: Send + 'static,
A::Result: Send + Sync + 'static,
{
let (sender, receiver) = async_oneshot::oneshot();
let envelope: MessageEnvelope<A, IN> = MessageEnvelope::new(message, sender);
let message = Box::new(envelope) as Box<dyn EnvelopeProxy<A> + Send + 'static>;
self.sender
.send(Signal::Message(message))
.await
.map_err(|_| SendError::ReceiverDisconnected)?;
receiver.await.map_err(|_| SendError::ReceiverDisconnected)
}
pub async fn notify<IN>(&mut self, message: IN) -> Result<(), SendError>
where
A: Actor + Send + Notifiable<IN> + 'static,
IN: Send + 'static,
{
let envelope: NotificationEnvelope<A, IN> = NotificationEnvelope::new(message);
let message = Box::new(envelope) as Box<dyn EnvelopeProxy<A> + Send + 'static>;
self.sender
.send(Signal::Message(message))
.await
.map_err(|_| SendError::ReceiverDisconnected)?;
Ok(())
}
pub async fn into_stream_forwarder<IN, S>(mut self, mut stream: S) -> Result<(), SendError>
where
A: Actor + Send + Notifiable<IN> + 'static,
S: Send + Stream<Item = IN> + Unpin,
IN: Send + 'static,
{
while let Some(message) = stream.next().await {
self.notify(message).await?;
}
Ok(())
}
#[must_use]
pub fn connected(&self) -> bool {
!self.sender.is_closed()
}
pub async fn stop(&mut self) {
drop(self.sender.send(Signal::Stop).await);
}
pub async fn wait_for_stop(&self) {
while self.connected() {
self.stop_handle.lock().await;
}
}
}
cfg_runtime! {
use crate::{
handler::Coroutine,
envelope::CoroutineEnvelope
};
impl<A> Address<A> {
pub fn spawn_stream_forwarder<IN, S>(self, stream: S) -> crate::runtime::JoinHandle<Result<(), SendError>>
where
A: Actor + Send + Notifiable<IN> + 'static,
S: Send + Stream<Item = IN> + Unpin + 'static,
IN: Send + 'static,
{
crate::runtime::spawn(self.into_stream_forwarder(stream))
}
pub async fn calculate<IN>(&self, message: IN) -> Result<A::Result, SendError>
where
A: Actor + Send + Coroutine<IN> + 'static,
IN: Send + 'static,
A::Result: Send + Sync + 'static,
{
let addr = self.sender.clone();
let (sender, receiver) = async_oneshot::oneshot();
let envelope: CoroutineEnvelope<A, IN> = CoroutineEnvelope::new(message, sender);
let message = Box::new(envelope) as Box<dyn EnvelopeProxy<A> + Send + 'static>;
addr
.send(Signal::Message(message))
.await
.map_err(|_| SendError::ReceiverDisconnected)?;
receiver.await.map_err(|_| SendError::ReceiverDisconnected)
}
}
}