use futures::{FutureExt, channel::oneshot};
use std::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
task::Poll,
};
use weak_addr::WeakAddr;
pub mod caller;
pub mod sender;
pub mod weak_addr;
pub mod weak_caller;
pub mod weak_sender;
#[cfg(test)]
mod tests;
use crate::{
RestartableActor,
actor::{Actor, ActorHandle, JoinFuture},
channel::Tx,
context::Core,
error::{ActorError, Result},
event_loop::Payload,
handler::Handler,
};
pub trait Message: 'static + Send {
type Response: 'static + Send;
}
impl Message for () {
type Response = ();
}
pub struct Addr<A> {
pub(crate) tx: Tx<A>,
pub(crate) core: Core,
}
impl<A: Actor> Clone for Addr<A> {
fn clone(&self) -> Self {
Addr {
tx: self.tx.clone(),
core: self.core.clone(),
}
}
}
impl<A: Actor> Addr<A> {
pub fn stop(&mut self) -> Result<()> {
if self.core.stopped() {
return Err(ActorError::AlreadyStopped);
}
log::trace!("stopping actor");
self.tx
.force_send(Payload::Stop)
.map_err(|_err| ActorError::AlreadyStopped)?;
Ok(())
}
pub async fn halt(mut self) -> Result<()> {
log::trace!("halting actor: stop and await");
self.stop()?;
self.await
}
pub fn running(&self) -> bool {
self.core.running()
}
pub fn stopped(&self) -> bool {
self.core.stopped()
}
pub async fn call<M: Message>(&self, msg: M) -> Result<M::Response>
where
A: Handler<M>,
{
if self.core.stopped() {
return Err(ActorError::AlreadyStopped);
}
let (tx_response, response) = oneshot::channel();
log::trace!("calling actor {}", std::any::type_name::<M>());
self.tx
.try_send(Payload::task(move |actor, ctx| {
log::trace!("handling task call");
Box::pin(async move {
log::trace!("actor handling call {}", std::any::type_name::<M>());
let res = Handler::handle(actor, ctx, msg).await;
let _ = tx_response.send(res);
})
}))
.map_err(|_err| ActorError::AlreadyStopped)?;
let response = response.await?;
log::trace!("received response from actor");
Ok(response)
}
pub async fn ping(&self) -> Result<()> {
if self.core.stopped() {
return Err(ActorError::AlreadyStopped);
}
log::trace!("pinging actor");
let (tx_response, response) = oneshot::channel();
self.tx
.try_send(Payload::task(move |_actor, _ctx| {
Box::pin(async move {
let _ = tx_response.send(());
})
}))
.map_err(|_err| ActorError::AlreadyStopped)?;
Ok(response.await?)
}
pub async fn send<M: Message<Response = ()>>(&self, msg: M) -> Result<()>
where
A: Handler<M>,
{
if self.core.stopped() {
return Err(ActorError::AlreadyStopped);
}
log::trace!("sending message to actor {}", std::any::type_name::<M>());
self.tx
.send(Payload::task(move |actor, ctx| {
Box::pin(Handler::handle(actor, ctx, msg))
}))
.await
.map_err(|_err| ActorError::AlreadyStopped)?;
Ok(())
}
pub fn try_send<M: Message<Response = ()>>(&self, msg: M) -> Result<()>
where
A: Handler<M>,
{
if self.core.stopped() {
return Err(ActorError::AlreadyStopped);
}
log::trace!("sending message to actor {}", std::any::type_name::<M>());
self.tx
.try_send(Payload::task(move |actor, ctx| {
Box::pin(Handler::handle(actor, ctx, msg))
}))
.map_err(|_err| ActorError::AlreadyStopped)?;
Ok(())
}
pub fn downgrade(&self) -> WeakAddr<A> {
WeakAddr::from(self)
}
pub fn sender<M: Message<Response = ()>>(&self) -> sender::Sender<M>
where
A: Handler<M>,
{
log::trace!("creating sender for actor {}", std::any::type_name::<M>());
sender::Sender::from(self.to_owned())
}
pub fn weak_sender<M: Message<Response = ()>>(&self) -> weak_sender::WeakSender<M>
where
A: Handler<M>,
{
weak_sender::WeakSender::from(self.to_owned())
}
pub fn caller<M: Message>(&self) -> caller::Caller<M>
where
A: Handler<M>,
{
caller::Caller::from(self.to_owned())
}
pub fn weak_caller<M: Message>(&self) -> weak_caller::WeakCaller<M>
where
A: Handler<M>,
{
weak_caller::WeakCaller::from(self.to_owned())
}
}
impl<A: RestartableActor> Addr<A> {
pub fn restart(&mut self) -> Result<()> {
if self.core.stopped() {
return Err(ActorError::AlreadyStopped);
}
self.tx
.force_send(Payload::Restart)
.map_err(|_err| ActorError::AlreadyStopped)?;
Ok(())
}
}
impl<A> Future for Addr<A> {
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
log::trace!("polling actor");
self.get_mut().core.poll_unpin(cx)
}
}
pub struct OwningAddr<A> {
pub(crate) addr: Addr<A>,
pub(crate) handle: ActorHandle<A>,
}
#[allow(deprecated)]
impl<A: Actor> OwningAddr<A> {
pub(crate) const fn new(addr: Addr<A>, handle: ActorHandle<A>) -> Self {
OwningAddr { addr, handle }
}
pub async fn join(&mut self) -> Option<A> {
log::trace!("joining actor");
self.handle.join().await
}
pub async fn consume(mut self) -> Result<A> {
log::trace!("consuming actor");
self.addr.stop()?;
self.join()
.await
.ok_or(crate::error::ActorError::AlreadyStopped)
}
pub fn consume_sync(mut self) -> Result<JoinFuture<A>> {
log::trace!("consuming actor synchronously");
self.addr.stop()?;
Ok(self.handle.join())
}
pub fn detach(self) -> Addr<A> {
log::trace!("detaching owning addr");
self.handle.detach();
self.addr
}
}
impl<A> AsRef<Addr<A>> for OwningAddr<A> {
fn as_ref(&self) -> &Addr<A> {
&self.addr
}
}
impl<A> Deref for OwningAddr<A> {
type Target = Addr<A>;
fn deref(&self) -> &Self::Target {
&self.addr
}
}
impl<A> DerefMut for OwningAddr<A> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.addr
}
}