#[cfg(test)]
mod test {
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::actor::actor::actor_error::ActorError;
use crate::actor::actor::actor_inner_error::ActorInnerError;
use crate::actor::actor::taks::Task;
use crate::actor::dispatch::default_mailbox::DefaultMailbox;
use crate::actor::dispatch::dispatcher::{CurrentThreadDispatcher, DispatcherHandle};
use crate::actor::dispatch::mailbox::Mailbox;
use crate::actor::dispatch::message_invoker::{MessageInvoker, MessageInvokerHandle};
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::util::queue::mpsc_unbounded_channel_queue::MpscUnboundedChannelQueue;
#[derive(Debug, Clone, PartialEq)]
enum ReceivedMessage {
System,
User,
Task,
Failure(String),
}
#[derive(Debug)]
struct TestMessageInvoker {
received: Arc<Mutex<Vec<ReceivedMessage>>>,
}
impl TestMessageInvoker {
fn new() -> Self {
TestMessageInvoker {
received: Arc::new(Mutex::new(Vec::new())),
}
}
async fn get_received(&self) -> Vec<ReceivedMessage> {
self.received.lock().await.clone()
}
}
#[async_trait]
impl MessageInvoker for TestMessageInvoker {
async fn invoke_system_message(&mut self, _: MessageHandle) -> Result<(), ActorError> {
self.received.lock().await.push(ReceivedMessage::System);
Ok(())
}
async fn invoke_user_message(&mut self, message_handle: MessageHandle) -> Result<(), ActorError> {
if message_handle.is_typed::<TestTask>() {
self.received.lock().await.push(ReceivedMessage::Task);
} else {
self.received.lock().await.push(ReceivedMessage::User);
}
Ok(())
}
async fn escalate_failure(&mut self, reason: ActorInnerError, _: MessageHandle) {
let reason_msg = if reason.is_type::<&str>() {
reason.clone().take::<&str>().unwrap().to_string()
} else if reason.is_type::<String>() {
reason.clone().take::<String>().unwrap()
} else {
"Unknown error".to_string()
};
self.received.lock().await.push(ReceivedMessage::Failure(reason_msg));
}
}
#[derive(Debug, Clone)]
struct TestSystemMessage;
impl Message for TestSystemMessage {
fn eq_message(&self, other: &dyn Message) -> bool {
other.as_any().is::<TestSystemMessage>()
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}
#[derive(Debug, Clone)]
struct TestUserMessage;
impl Message for TestUserMessage {
fn eq_message(&self, other: &dyn Message) -> bool {
other.as_any().is::<TestUserMessage>()
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}
#[derive(Debug, Clone)]
struct TestTask;
impl Message for TestTask {
fn eq_message(&self, other: &dyn Message) -> bool {
other.as_any().is::<TestTask>()
}
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
self
}
}
#[async_trait]
impl Task for TestTask {
async fn run(&self) {}
}
#[tokio::test]
async fn test_mailbox_with_test_invoker() {
let mut mailbox = DefaultMailbox::new(MpscUnboundedChannelQueue::new(), MpscUnboundedChannelQueue::new());
let invoker = Arc::new(Mutex::new(TestMessageInvoker::new()));
let invoker_handle = MessageInvokerHandle::new(invoker.clone());
let dispatcher = Arc::new(CurrentThreadDispatcher::new().unwrap());
let dispatcher_handle = DispatcherHandle::new_arc(dispatcher.clone());
mailbox
.register_handlers(Some(invoker_handle.clone()), Some(dispatcher_handle.clone()))
.await;
let system_message = MessageHandle::new(TestSystemMessage);
let user_message = MessageHandle::new(TestUserMessage);
let task = MessageHandle::new(TestTask);
mailbox.post_system_message(system_message).await;
mailbox.post_user_message(user_message).await;
mailbox.post_user_message(task).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let invoker_mg = invoker.lock().await;
let received = invoker_mg.get_received().await;
assert_eq!(received.len(), 3);
assert_eq!(received[0], ReceivedMessage::System);
assert_eq!(received[1], ReceivedMessage::User);
assert_eq!(received[2], ReceivedMessage::Task);
}
}