use std::sync::Arc;
use std::time::Duration;
use atomr_config::Config;
use atomr_core::actor::{Actor, ActorSystem, Context, Inbox, Props};
use tokio::sync::{oneshot, Mutex};
#[derive(Default)]
struct Counter {
n: u32,
}
enum CounterMsg {
Inc,
Get(oneshot::Sender<u32>),
}
#[async_trait::async_trait]
impl Actor for Counter {
type Msg = CounterMsg;
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
match msg {
CounterMsg::Inc => self.n += 1,
CounterMsg::Get(reply) => {
let _ = reply.send(self.n);
}
}
}
}
#[tokio::test]
async fn spawn_tell_ask() {
let sys = ActorSystem::create("TestSys", Config::reference()).await.unwrap();
let r = sys.actor_of(Props::create(Counter::default), "c").unwrap();
r.tell(CounterMsg::Inc);
r.tell(CounterMsg::Inc);
r.tell(CounterMsg::Inc);
let v = r.ask_with(CounterMsg::Get, Duration::from_millis(200)).await.unwrap();
assert_eq!(v, 3);
sys.terminate().await;
}
#[tokio::test]
async fn watched_actor_notifies_on_stop() {
struct Watcher {
notify: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
enum WMsg {
WatchIt(atomr_core::actor::ActorRef<CounterMsg>),
#[allow(dead_code)]
Terminated(String),
}
#[async_trait::async_trait]
impl Actor for Watcher {
type Msg = WMsg;
async fn handle(&mut self, ctx: &mut Context<Self>, msg: Self::Msg) {
match msg {
WMsg::WatchIt(r) => {
ctx.watch(&r);
r.stop();
}
WMsg::Terminated(_) => {
if let Some(tx) = self.notify.lock().await.take() {
let _ = tx.send(());
}
}
}
}
}
let sys = ActorSystem::create("WSys", Config::reference()).await.unwrap();
let (tx, rx) = oneshot::channel();
let notify = Arc::new(Mutex::new(Some(tx)));
let _w = sys.actor_of(Props::create(move || Watcher { notify: notify.clone() }), "w").unwrap();
let target = sys.actor_of(Props::create(Counter::default), "t").unwrap();
_w.tell(WMsg::WatchIt(target));
let res = tokio::time::timeout(Duration::from_millis(300), rx).await;
let _ = res;
sys.terminate().await;
}
#[tokio::test]
async fn inbox_roundtrip() {
let mut inbox = Inbox::<u32>::new("ib");
inbox.actor_ref().tell(99);
let got = inbox.receive(Duration::from_millis(100)).await.unwrap();
assert_eq!(got, 99);
}
#[tokio::test]
async fn test_actor_of_after_stop() {
let sys = ActorSystem::create("ReuseSys", Config::reference()).await.unwrap();
let r1 = sys.actor_of(Props::create(Counter::default), "shared-name").unwrap();
r1.tell(CounterMsg::Inc);
let v1 = r1.ask_with(CounterMsg::Get, Duration::from_millis(200)).await.unwrap();
assert_eq!(v1, 1);
let collided = sys.actor_of(Props::create(Counter::default), "shared-name");
assert!(collided.is_err(), "expected NameTaken while first instance is alive");
r1.stop();
let mut tries = 0;
loop {
match sys.actor_of(Props::create(Counter::default), "shared-name") {
Ok(r2) => {
r2.tell(CounterMsg::Inc);
r2.tell(CounterMsg::Inc);
let v2 = r2.ask_with(CounterMsg::Get, Duration::from_millis(200)).await.unwrap();
assert_eq!(v2, 2, "fresh actor should start with state == 0");
break;
}
Err(_) if tries < 50 => {
tokio::time::sleep(Duration::from_millis(10)).await;
tries += 1;
}
Err(e) => panic!("name slot never freed after stop: {e}"),
}
}
sys.terminate().await;
}