use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;
use crate::{Actor, Error};
use crate::result::Result;
use crate::executor::ActorSysMsg;
pub struct ActorRef<A>
where A: Actor + ?Sized
{
outbox: Sender<ActorSysMsg<A::SendMessage, A::CallMessage, A::ErrorType>>,
pub(crate) terminate_token: CancellationToken,
}
impl<A> ActorRef<A> where A: Actor {
pub(crate) fn new(outbox: Sender<ActorSysMsg<A::SendMessage, A::CallMessage, A::ErrorType>>) -> Self {
Self {
outbox,
terminate_token: CancellationToken::new(),
}
}
pub async fn send(&self, msg: A::SendMessage) -> Result<()> {
self.outbox.send(ActorSysMsg::Send(msg)).await.map_err(|_| Error::UnableToSend)?;
Ok(())
}
pub async fn call(&self, msg: A::CallMessage) -> Result<std::result::Result<A::CallMessage, A::ErrorType>> {
let (send, recv) = tokio::sync::oneshot::channel();
self.outbox.send(ActorSysMsg::Call(msg, send)).await.map_err(|_| Error::UnableToSend)?;
let reply = recv.await.map_err(|_| Error::UnableToReceive)?;
Ok(reply)
}
pub async fn shutdown(&self) -> Result<()> {
self.outbox.send(ActorSysMsg::Shutdown).await.map_err(|_| Error::UnableToSend)?;
Ok(())
}
pub fn terminate(&self) {
self.terminate_token.cancel();
}
}
impl<A> Clone for ActorRef<A>
where
A: Actor,
{
fn clone(&self) -> Self {
Self {
outbox: self.outbox.clone(),
terminate_token: self.terminate_token.clone(),
}
}
}
#[cfg(test)]
mod tests {
use crate::create_actor;
use super::*;
use std::sync::atomic::Ordering;
use crate::test_code::tests::*;
#[tokio::test]
async fn test_shutdown_process() {
let instance = DelayingActor::new();
let (actor, handle) = create_actor(instance).await.unwrap();
for _i in 0..8 {
let r = actor.send(DelayingSends::Ping).await;
assert!(r.is_ok());
}
let r = actor.shutdown().await;
assert!(r.is_ok());
let v = COUNTER.load(Ordering::Relaxed);
assert_eq!(v, 0);
let r = actor.call(DelayingCalls::DoPong).await;
assert!(r.is_err());
assert_eq!(r, Err(Error::UnableToReceive));
let r = actor.send(DelayingSends::Ping).await;
assert!(r.is_err());
assert_eq!(r, Err(Error::UnableToSend));
handle.await.unwrap();
let v = COUNTER.load(Ordering::Relaxed);
assert_eq!(v, 8);
}
#[tokio::test]
async fn test_ref_clone() {
let instance = SimpleCounter::new(false);
let (actor, handle) = create_actor(instance).await.unwrap();
let act_clone = actor.clone();
if let CounterCalls::Reply(a) = actor.call(CounterCalls::GetCount).await.unwrap().unwrap() {
if let CounterCalls::Reply(b) = act_clone.call(CounterCalls::GetCount).await.unwrap().unwrap() {
assert_eq!(a, b);
assert_eq!(a, 0);
} else {
assert!(false);
}
} else {
assert!(false);
}
let r = actor.send(CounterSends::Count).await;
assert!(r.is_ok());
if let CounterCalls::Reply(a) = actor.call(CounterCalls::GetCount).await.unwrap().unwrap() {
if let CounterCalls::Reply(b) = act_clone.call(CounterCalls::GetCount).await.unwrap().unwrap() {
assert_eq!(a, b);
assert_eq!(a, 1);
} else {
assert!(false);
}
} else {
assert!(false);
}
let r = actor.shutdown().await;
assert!(r.is_ok());
let r = handle.await;
assert!(r.is_ok());
let r = act_clone.send(CounterSends::Count).await;
assert!(r.is_err());
}
}