use anyhow::Result;
use rsactor::{spawn, Actor, ActorRef, ActorResult, ActorWeak, Message};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};
use tracing::info;
#[derive(Debug)]
struct SafeMessage(u32);
#[derive(Debug)]
struct PanicMessage;
#[derive(Debug)]
struct ErrorMessage;
#[derive(Debug)]
struct CounterResetMessage;
#[derive(Debug)]
struct PanicTestActor {
counter: u32,
panic_threshold: Option<u32>,
}
impl Actor for PanicTestActor {
type Args = Option<u32>; type Error = anyhow::Error;
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("PanicTestActor started with ID: {}", actor_ref.identity());
Ok(PanicTestActor {
counter: 0,
panic_threshold: args,
})
}
async fn on_run(&mut self, _actor_ref: &ActorWeak<Self>) -> Result<bool, Self::Error> {
if let Some(threshold) = self.panic_threshold {
if self.counter >= threshold {
panic!("Counter reached threshold {threshold} - intentional panic in on_run!");
}
}
sleep(Duration::from_millis(10)).await;
Ok(true)
}
async fn on_stop(
&mut self,
_actor_ref: &ActorWeak<Self>,
_killed: bool,
) -> Result<(), Self::Error> {
info!("PanicTestActor stopping. Final counter: {}", self.counter);
Ok(())
}
}
impl Message<SafeMessage> for PanicTestActor {
type Reply = u32;
async fn handle(&mut self, msg: SafeMessage, _actor_ref: &ActorRef<Self>) -> Self::Reply {
self.counter += msg.0;
self.counter
}
}
impl Message<PanicMessage> for PanicTestActor {
type Reply = ();
async fn handle(&mut self, _msg: PanicMessage, _actor_ref: &ActorRef<Self>) -> Self::Reply {
panic!("Intentional panic in message handler!");
}
}
impl Message<ErrorMessage> for PanicTestActor {
type Reply = Result<(), anyhow::Error>;
async fn handle(&mut self, _msg: ErrorMessage, _actor_ref: &ActorRef<Self>) -> Self::Reply {
Err(anyhow::anyhow!("Intentional error in message handler"))
}
}
impl Message<CounterResetMessage> for PanicTestActor {
type Reply = u32;
async fn handle(
&mut self,
_msg: CounterResetMessage,
_actor_ref: &ActorRef<Self>,
) -> Self::Reply {
let old_counter = self.counter;
self.counter = 0;
old_counter
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::test;
#[test]
async fn test_normal_actor_operation() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
let result = actor_ref.ask(SafeMessage(5)).await?;
assert_eq!(result, 5);
let result = actor_ref.ask(SafeMessage(3)).await?;
assert_eq!(result, 8);
actor_ref.stop().await?;
let actor_result = join_handle.await.unwrap();
match actor_result {
ActorResult::Completed { actor, killed } => {
assert!(!killed);
assert_eq!(actor.counter, 8);
}
_ => panic!("Expected completed actor result"),
}
Ok(())
}
#[test]
async fn test_panic_in_message_handler() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
let result = actor_ref.ask(PanicMessage).await;
assert!(result.is_err());
let result = actor_ref.ask(SafeMessage(1)).await;
assert!(result.is_err());
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
Ok(())
}
#[test]
async fn test_panic_in_on_run() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(Some(3));
let result = actor_ref.ask(SafeMessage(1)).await?;
assert_eq!(result, 1);
let result = actor_ref.ask(SafeMessage(1)).await?;
assert_eq!(result, 2);
let result = actor_ref.ask(SafeMessage(1)).await?;
assert_eq!(result, 3);
sleep(Duration::from_millis(50)).await;
let result = actor_ref.ask(SafeMessage(1)).await;
assert!(result.is_err());
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
Ok(())
}
#[test]
async fn test_error_handling_vs_panic() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
let result = actor_ref.ask(ErrorMessage).await?;
assert!(result.is_err());
let result = actor_ref.ask(SafeMessage(5)).await?;
assert_eq!(result, 5);
actor_ref.stop().await?;
let actor_result = join_handle.await.unwrap();
match actor_result {
ActorResult::Completed { .. } => {}
_ => panic!("Expected completed actor result"),
}
Ok(())
}
#[test]
async fn test_actor_state_persistence_across_messages() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
assert_eq!(actor_ref.ask(SafeMessage(2)).await?, 2);
assert_eq!(actor_ref.ask(SafeMessage(3)).await?, 5);
assert_eq!(actor_ref.ask(SafeMessage(1)).await?, 6);
let old_counter = actor_ref.ask(CounterResetMessage).await?;
assert_eq!(old_counter, 6);
assert_eq!(actor_ref.ask(SafeMessage(1)).await?, 1);
actor_ref.stop().await?;
join_handle.await.unwrap();
Ok(())
}
#[test]
async fn test_multiple_actors_isolation() -> Result<()> {
let (actor_ref1, join_handle1) = spawn::<PanicTestActor>(None);
let (actor_ref2, join_handle2) = spawn::<PanicTestActor>(None);
let (actor_ref3, join_handle3) = spawn::<PanicTestActor>(None);
assert_eq!(actor_ref1.ask(SafeMessage(10)).await?, 10);
assert_eq!(actor_ref2.ask(SafeMessage(20)).await?, 20);
assert_eq!(actor_ref3.ask(SafeMessage(30)).await?, 30);
let result = actor_ref2.ask(PanicMessage).await;
assert!(result.is_err());
assert_eq!(actor_ref1.ask(SafeMessage(1)).await?, 11);
assert_eq!(actor_ref3.ask(SafeMessage(1)).await?, 31);
let result = actor_ref2.ask(SafeMessage(1)).await;
assert!(result.is_err());
actor_ref1.stop().await?;
actor_ref3.stop().await?;
let result1 = join_handle1.await.unwrap();
let result3 = join_handle3.await.unwrap();
let result2 = join_handle2.await;
assert!(matches!(result1, ActorResult::Completed { .. }));
assert!(matches!(result3, ActorResult::Completed { .. }));
assert!(result2.is_err() && result2.unwrap_err().is_panic());
Ok(())
}
#[test]
async fn test_actor_timeout_during_panic() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
let panic_future = actor_ref.ask(PanicMessage);
let timeout_result = timeout(Duration::from_millis(100), panic_future).await;
match timeout_result {
Ok(Err(_)) => {} Err(_) => panic!("Ask operation should fail quickly, not timeout"),
Ok(Ok(_)) => panic!("Panic message should not succeed"),
}
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
Ok(())
}
}
#[cfg(test)]
mod supervision_tests {
use super::*;
use tokio::test;
struct SimpleSupervisor {
max_restarts: u32,
restart_count: Arc<AtomicU32>,
}
impl SimpleSupervisor {
fn new(max_restarts: u32) -> Self {
Self {
max_restarts,
restart_count: Arc::new(AtomicU32::new(0)),
}
}
async fn supervise_actor(&mut self) -> Result<u32> {
let mut total_restarts = 0;
loop {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(Some(2));
#[allow(clippy::redundant_pattern_matching)]
while let Ok(_) = actor_ref.ask(SafeMessage(1)).await {
sleep(Duration::from_millis(20)).await;
}
let join_result = join_handle.await;
if join_result.is_err() && join_result.unwrap_err().is_panic() {
total_restarts = self.restart_count.fetch_add(1, Ordering::SeqCst) + 1;
if total_restarts >= self.max_restarts {
break;
}
sleep(Duration::from_millis(100)).await; continue;
} else {
break;
}
}
Ok(total_restarts)
}
}
#[test]
async fn test_supervisor_restart_limit() -> Result<()> {
let mut supervisor = SimpleSupervisor::new(3);
let restart_count = supervisor.supervise_actor().await?;
assert_eq!(restart_count, 3);
Ok(())
}
#[test]
async fn test_supervision_with_successful_completion() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
assert_eq!(actor_ref.ask(SafeMessage(1)).await?, 1);
assert_eq!(actor_ref.ask(SafeMessage(2)).await?, 3);
actor_ref.stop().await?;
let result = join_handle.await.unwrap();
assert!(matches!(result, ActorResult::Completed { .. }));
Ok(())
}
}
#[cfg(test)]
mod advanced_tests {
use super::*;
use tokio::test;
#[derive(Debug)]
struct StartupPanicActor {
_data: String,
}
impl Actor for StartupPanicActor {
type Args = bool; type Error = anyhow::Error;
async fn on_start(
should_panic: Self::Args,
_actor_ref: &ActorRef<Self>,
) -> Result<Self, Self::Error> {
if should_panic {
panic!("Intentional panic during on_start!");
}
Ok(StartupPanicActor {
_data: "initialized".to_string(),
})
}
}
#[test]
async fn test_panic_during_on_start() {
let result = std::panic::catch_unwind(|| {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let (_actor_ref, _join_handle) = spawn::<StartupPanicActor>(true);
})
});
assert!(result.is_err());
}
#[test]
async fn test_successful_on_start() -> Result<()> {
let (actor_ref, join_handle) = spawn::<StartupPanicActor>(false);
actor_ref.stop().await?;
let result = join_handle.await.unwrap();
assert!(matches!(result, ActorResult::Completed { .. }));
Ok(())
}
#[derive(Debug)]
struct StopPanicActor {
panic_on_stop: bool,
}
impl Actor for StopPanicActor {
type Args = bool; type Error = anyhow::Error;
async fn on_start(
panic_on_stop: Self::Args,
_actor_ref: &ActorRef<Self>,
) -> Result<Self, Self::Error> {
Ok(StopPanicActor { panic_on_stop })
}
async fn on_stop(
&mut self,
_actor_ref: &ActorWeak<Self>,
_killed: bool,
) -> Result<(), Self::Error> {
if self.panic_on_stop {
panic!("Intentional panic during on_stop!");
}
Ok(())
}
}
#[test]
async fn test_panic_during_on_stop() -> Result<()> {
let (actor_ref, join_handle) = spawn::<StopPanicActor>(true);
actor_ref.stop().await?;
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
Ok(())
}
#[test]
async fn test_normal_on_stop() -> Result<()> {
let (actor_ref, join_handle) = spawn::<StopPanicActor>(false);
actor_ref.stop().await?;
let result = join_handle.await.unwrap();
assert!(matches!(result, ActorResult::Completed { .. }));
Ok(())
}
#[derive(Debug)]
struct KillMessage;
#[derive(Debug)]
struct KillTestActor {
should_panic_on_stop: bool,
}
impl Actor for KillTestActor {
type Args = bool;
type Error = anyhow::Error;
async fn on_start(
should_panic_on_stop: Self::Args,
_actor_ref: &ActorRef<Self>,
) -> Result<Self, Self::Error> {
Ok(KillTestActor {
should_panic_on_stop,
})
}
async fn on_stop(
&mut self,
_actor_ref: &ActorWeak<Self>,
killed: bool,
) -> Result<(), Self::Error> {
if !killed && self.should_panic_on_stop {
panic!("Panic during graceful stop!");
}
Ok(())
}
}
impl Message<KillMessage> for KillTestActor {
type Reply = ();
async fn handle(&mut self, _msg: KillMessage, actor_ref: &ActorRef<Self>) -> Self::Reply {
let _ = actor_ref.kill();
}
}
#[test]
async fn test_kill_vs_graceful_stop_with_panic() -> Result<()> {
let (actor_ref1, join_handle1) = spawn::<KillTestActor>(true);
actor_ref1.stop().await?;
let result1 = join_handle1.await;
assert!(result1.is_err() && result1.unwrap_err().is_panic());
let (actor_ref2, join_handle2) = spawn::<KillTestActor>(true);
let _ = actor_ref2.kill();
let result2 = join_handle2.await.unwrap();
assert!(matches!(
result2,
ActorResult::Completed { killed: true, .. }
));
Ok(())
}
#[test]
async fn test_self_kill_during_message_handling() -> Result<()> {
let (actor_ref, join_handle) = spawn::<KillTestActor>(false);
actor_ref.tell(KillMessage).await?;
sleep(Duration::from_millis(50)).await;
let result = join_handle.await.unwrap();
assert!(matches!(
result,
ActorResult::Completed { killed: true, .. }
));
Ok(())
}
}
#[cfg(test)]
mod stress_tests {
use super::*;
use tokio::test;
#[test]
async fn test_concurrent_panic_actors() -> Result<()> {
let mut handles = Vec::new();
for _ in 0..10 {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
let task_handle = tokio::spawn(async move {
let _ = actor_ref.ask(PanicMessage).await;
});
handles.push((task_handle, join_handle));
}
for (task_handle, join_handle) in handles {
let _ = task_handle.await;
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
}
Ok(())
}
#[test]
async fn test_rapid_message_sending_before_panic() -> Result<()> {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(Some(100));
let mut tasks = Vec::new();
for _ in 0..50 {
let actor_ref_clone = actor_ref.clone();
let task = tokio::spawn(async move { actor_ref_clone.ask(SafeMessage(1)).await });
tasks.push(task);
}
let panic_task = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await; let _ = actor_ref.ask(PanicMessage).await;
});
let mut success_count = 0;
let mut error_count = 0;
for task in tasks {
match task.await.unwrap() {
Ok(_) => success_count += 1,
Err(_) => error_count += 1,
}
}
let _ = panic_task.await;
assert!(success_count > 0);
println!("Processed {success_count} messages before panic, {error_count} failed");
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
Ok(())
}
#[test]
async fn test_memory_cleanup_after_panic() -> Result<()> {
let mut actor_refs = Vec::new();
let mut join_handles = Vec::new();
for _ in 0..5 {
let (actor_ref, join_handle) = spawn::<PanicTestActor>(None);
actor_refs.push(actor_ref);
join_handles.push(join_handle);
}
for actor_ref in &actor_refs {
let _ = actor_ref.ask(PanicMessage).await;
}
for join_handle in join_handles {
let result = join_handle.await;
assert!(result.is_err());
assert!(result.unwrap_err().is_panic());
}
for actor_ref in actor_refs {
let result = actor_ref.ask(SafeMessage(1)).await;
assert!(result.is_err());
}
Ok(())
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
use tokio::test;
#[test]
async fn test_realistic_service_with_panic_recovery() -> Result<()> {
#[derive(Debug)]
struct WorkItem(u32);
#[derive(Debug)]
struct GetStats;
#[derive(Debug)]
struct ServiceActor {
processed_count: u32,
error_count: u32,
}
impl Actor for ServiceActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(
_args: Self::Args,
_actor_ref: &ActorRef<Self>,
) -> Result<Self, Self::Error> {
Ok(ServiceActor {
processed_count: 0,
error_count: 0,
})
}
}
impl Message<WorkItem> for ServiceActor {
type Reply = Result<String, String>;
async fn handle(&mut self, msg: WorkItem, _actor_ref: &ActorRef<Self>) -> Self::Reply {
if msg.0 == 13 {
panic!("Unlucky number 13!");
}
if msg.0 % 10 == 9 {
self.error_count += 1;
return Err(format!("Simulated error for item {}", msg.0));
}
self.processed_count += 1;
Ok(format!("Processed item {}", msg.0))
}
}
impl Message<GetStats> for ServiceActor {
type Reply = (u32, u32);
async fn handle(&mut self, _msg: GetStats, _actor_ref: &ActorRef<Self>) -> Self::Reply {
(self.processed_count, self.error_count)
}
}
let (actor_ref, join_handle) = spawn::<ServiceActor>(());
for i in 1..=15 {
if i == 13 {
let result = actor_ref.ask(WorkItem(i)).await;
assert!(result.is_err());
break;
} else {
let result = actor_ref.ask(WorkItem(i)).await?;
match result {
Ok(msg) => println!("Success: {msg}"),
Err(err) => println!("Expected error: {err}"),
}
}
}
let join_result = join_handle.await;
assert!(join_result.is_err());
assert!(join_result.unwrap_err().is_panic());
Ok(())
}
#[test]
async fn test_actor_isolation_during_panic() -> Result<()> {
#[derive(Debug)]
struct SimpleMsg;
#[derive(Debug)]
struct SimpleActor {
id: u32,
}
impl Actor for SimpleActor {
type Args = u32;
type Error = anyhow::Error;
async fn on_start(
id: Self::Args,
_actor_ref: &ActorRef<Self>,
) -> Result<Self, Self::Error> {
Ok(SimpleActor { id })
}
}
impl Message<SimpleMsg> for SimpleActor {
type Reply = u32;
async fn handle(
&mut self,
_msg: SimpleMsg,
_actor_ref: &ActorRef<Self>,
) -> Self::Reply {
if self.id == 2 {
panic!("Actor {} panicked!", self.id);
}
self.id
}
}
let (actor1_ref, actor1_handle) = spawn::<SimpleActor>(1);
let (actor2_ref, actor2_handle) = spawn::<SimpleActor>(2);
let (actor3_ref, actor3_handle) = spawn::<SimpleActor>(3);
assert_eq!(actor1_ref.ask(SimpleMsg).await?, 1);
assert_eq!(actor3_ref.ask(SimpleMsg).await?, 3);
let result = actor2_ref.ask(SimpleMsg).await;
assert!(result.is_err());
assert_eq!(actor1_ref.ask(SimpleMsg).await?, 1);
assert_eq!(actor3_ref.ask(SimpleMsg).await?, 3);
actor1_ref.stop().await?;
actor3_ref.stop().await?;
let result1 = actor1_handle.await.unwrap();
let result2 = actor2_handle.await;
let result3 = actor3_handle.await.unwrap();
assert!(matches!(result1, ActorResult::Completed { .. }));
assert!(result2.is_err() && result2.unwrap_err().is_panic());
assert!(matches!(result3, ActorResult::Completed { .. }));
Ok(())
}
}