use std::sync::Arc;
use std::time::Duration;
use atomr_config::Config;
use atomr_core::actor::{Actor, ActorSystem, Context, Inbox, Props};
use tokio::sync::{oneshot, Mutex};
#[derive(Default)]
struct Counter {
n: u32,
}
enum CounterMsg {
Inc,
Get(oneshot::Sender<u32>),
}
#[async_trait::async_trait]
impl Actor for Counter {
type Msg = CounterMsg;
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
match msg {
CounterMsg::Inc => self.n += 1,
CounterMsg::Get(reply) => {
let _ = reply.send(self.n);
}
}
}
}
#[tokio::test]
async fn spawn_tell_ask() {
let sys = ActorSystem::create("TestSys", Config::reference()).await.unwrap();
let r = sys.actor_of(Props::create(Counter::default), "c").unwrap();
r.tell(CounterMsg::Inc);
r.tell(CounterMsg::Inc);
r.tell(CounterMsg::Inc);
let v = r.ask_with(CounterMsg::Get, Duration::from_millis(200)).await.unwrap();
assert_eq!(v, 3);
sys.terminate().await;
}
#[tokio::test]
async fn watched_actor_notifies_on_stop() {
struct Watcher {
notify: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
enum WMsg {
WatchIt(atomr_core::actor::ActorRef<CounterMsg>),
#[allow(dead_code)]
Terminated(String),
}
#[async_trait::async_trait]
impl Actor for Watcher {
type Msg = WMsg;
async fn handle(&mut self, ctx: &mut Context<Self>, msg: Self::Msg) {
match msg {
WMsg::WatchIt(r) => {
ctx.watch(&r);
r.stop();
}
WMsg::Terminated(_) => {
if let Some(tx) = self.notify.lock().await.take() {
let _ = tx.send(());
}
}
}
}
}
let sys = ActorSystem::create("WSys", Config::reference()).await.unwrap();
let (tx, rx) = oneshot::channel();
let notify = Arc::new(Mutex::new(Some(tx)));
let _w = sys.actor_of(Props::create(move || Watcher { notify: notify.clone() }), "w").unwrap();
let target = sys.actor_of(Props::create(Counter::default), "t").unwrap();
_w.tell(WMsg::WatchIt(target));
let res = tokio::time::timeout(Duration::from_millis(300), rx).await;
let _ = res;
sys.terminate().await;
}
#[tokio::test]
async fn inbox_roundtrip() {
let mut inbox = Inbox::<u32>::new("ib");
inbox.actor_ref().tell(99);
let got = inbox.receive(Duration::from_millis(100)).await.unwrap();
assert_eq!(got, 99);
}