use rsactor::{spawn, Actor, ActorRef, Message};
#[derive(Debug)]
struct WorkerActor;
#[derive(Debug)]
struct Ping;
struct ForwardPing(ActorRef<WorkerActor>);
struct SequentialPing(ActorRef<WorkerActor>, ActorRef<WorkerActor>);
impl Actor for WorkerActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(WorkerActor)
}
}
impl Message<Ping> for WorkerActor {
type Reply = String;
async fn handle(&mut self, _: Ping, _: &ActorRef<Self>) -> String {
"pong".to_string()
}
}
impl Message<ForwardPing> for WorkerActor {
type Reply = String;
async fn handle(&mut self, msg: ForwardPing, _: &ActorRef<Self>) -> String {
msg.0.ask(Ping).await.unwrap()
}
}
impl Message<SequentialPing> for WorkerActor {
type Reply = (String, String);
async fn handle(&mut self, msg: SequentialPing, _: &ActorRef<Self>) -> (String, String) {
let r1 = msg.0.ask(Ping).await.unwrap();
let r2 = msg.1.ask(Ping).await.unwrap();
(r1, r2)
}
}
#[derive(Debug)]
struct SelfAskActor;
#[derive(Debug)]
struct TriggerSelfAsk;
impl Actor for SelfAskActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(SelfAskActor)
}
}
impl Message<TriggerSelfAsk> for SelfAskActor {
type Reply = ();
async fn handle(&mut self, _: TriggerSelfAsk, actor_ref: &ActorRef<Self>) {
let _ = actor_ref.ask(TriggerSelfAsk).await;
}
}
#[tokio::test]
async fn test_self_ask_deadlock_detected() {
let (actor_ref, join_handle) = spawn::<SelfAskActor>(());
let _ = actor_ref.ask(TriggerSelfAsk).await;
let join_err = join_handle.await.unwrap_err();
assert!(join_err.is_panic());
let payload = join_err.into_panic();
let msg = payload
.downcast_ref::<String>()
.expect("panic payload should be String");
assert!(
msg.contains("Deadlock detected"),
"Expected deadlock panic, got: {msg}"
);
}
#[derive(Debug)]
struct CycleActorA;
#[derive(Debug)]
struct CycleActorB;
struct StartCycle(ActorRef<CycleActorB>);
struct PingFromA(ActorRef<CycleActorA>);
#[derive(Debug)]
struct CyclePong;
impl Actor for CycleActorA {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(CycleActorA)
}
}
impl Actor for CycleActorB {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(CycleActorB)
}
}
impl Message<StartCycle> for CycleActorA {
type Reply = String;
async fn handle(&mut self, msg: StartCycle, actor_ref: &ActorRef<Self>) -> String {
msg.0
.ask(PingFromA(actor_ref.clone()))
.await
.unwrap_or_default()
}
}
impl Message<CyclePong> for CycleActorA {
type Reply = String;
async fn handle(&mut self, _: CyclePong, _: &ActorRef<Self>) -> String {
"pong from A".to_string()
}
}
impl Message<PingFromA> for CycleActorB {
type Reply = String;
async fn handle(&mut self, msg: PingFromA, _: &ActorRef<Self>) -> String {
msg.0.ask(CyclePong).await.unwrap_or_default()
}
}
#[tokio::test]
async fn test_two_actor_cycle_deadlock_detected() {
let (a_ref, a_handle) = spawn::<CycleActorA>(());
let (b_ref, b_handle) = spawn::<CycleActorB>(());
let _ = a_ref.ask(StartCycle(b_ref.clone())).await;
let b_err = b_handle.await.unwrap_err();
assert!(b_err.is_panic());
let payload = b_err.into_panic();
let msg = payload
.downcast_ref::<String>()
.expect("panic payload should be String");
assert!(
msg.contains("Deadlock detected"),
"Expected deadlock panic, got: {msg}"
);
drop(a_ref);
drop(b_ref);
let _ = a_handle.await;
}
#[derive(Debug)]
struct ChainActorA;
#[derive(Debug)]
struct ChainActorB;
#[derive(Debug)]
struct ChainActorC;
struct StartChain {
b_ref: ActorRef<ChainActorB>,
c_ref: ActorRef<ChainActorC>,
}
struct ForwardToC {
a_ref: ActorRef<ChainActorA>,
c_ref: ActorRef<ChainActorC>,
}
struct AskBackToA(ActorRef<ChainActorA>);
#[derive(Debug)]
struct ChainPong;
impl Actor for ChainActorA {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(ChainActorA)
}
}
impl Actor for ChainActorB {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(ChainActorB)
}
}
impl Actor for ChainActorC {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: (), _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(ChainActorC)
}
}
impl Message<StartChain> for ChainActorA {
type Reply = String;
async fn handle(&mut self, msg: StartChain, actor_ref: &ActorRef<Self>) -> String {
msg.b_ref
.ask(ForwardToC {
a_ref: actor_ref.clone(),
c_ref: msg.c_ref,
})
.await
.unwrap_or_default()
}
}
impl Message<ChainPong> for ChainActorA {
type Reply = String;
async fn handle(&mut self, _: ChainPong, _: &ActorRef<Self>) -> String {
"chain pong".to_string()
}
}
impl Message<ForwardToC> for ChainActorB {
type Reply = String;
async fn handle(&mut self, msg: ForwardToC, _: &ActorRef<Self>) -> String {
msg.c_ref
.ask(AskBackToA(msg.a_ref))
.await
.unwrap_or_default()
}
}
impl Message<AskBackToA> for ChainActorC {
type Reply = String;
async fn handle(&mut self, msg: AskBackToA, _: &ActorRef<Self>) -> String {
msg.0.ask(ChainPong).await.unwrap_or_default()
}
}
#[tokio::test]
async fn test_three_actor_chain_deadlock_detected() {
let (a_ref, a_handle) = spawn::<ChainActorA>(());
let (b_ref, b_handle) = spawn::<ChainActorB>(());
let (c_ref, c_handle) = spawn::<ChainActorC>(());
let _ = a_ref
.ask(StartChain {
b_ref: b_ref.clone(),
c_ref: c_ref.clone(),
})
.await;
let c_err = c_handle.await.unwrap_err();
assert!(c_err.is_panic());
let payload = c_err.into_panic();
let msg = payload
.downcast_ref::<String>()
.expect("panic payload should be String");
assert!(
msg.contains("Deadlock detected"),
"Expected deadlock panic, got: {msg}"
);
drop(a_ref);
drop(b_ref);
drop(c_ref);
let _ = a_handle.await;
let _ = b_handle.await;
}
#[tokio::test]
async fn test_non_cycle_ask_succeeds() {
let (a_ref, a_handle) = spawn::<WorkerActor>(());
let (b_ref, b_handle) = spawn::<WorkerActor>(());
let result = a_ref.ask(ForwardPing(b_ref.clone())).await;
assert_eq!(result.unwrap(), "pong");
drop(a_ref);
drop(b_ref);
let _ = a_handle.await;
let _ = b_handle.await;
}
#[tokio::test]
async fn test_sequential_ask_no_false_positive() {
let (a_ref, a_handle) = spawn::<WorkerActor>(());
let (b_ref, b_handle) = spawn::<WorkerActor>(());
let (c_ref, c_handle) = spawn::<WorkerActor>(());
let result = a_ref
.ask(SequentialPing(b_ref.clone(), c_ref.clone()))
.await;
let (r1, r2) = result.unwrap();
assert_eq!(r1, "pong");
assert_eq!(r2, "pong");
drop(a_ref);
drop(b_ref);
drop(c_ref);
let _ = a_handle.await;
let _ = b_handle.await;
let _ = c_handle.await;
}