#![cfg(test)]
mod tests {
use std::any::Any;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};
use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor_system::ActorSystem;
use crate::actor::future::{FutureError, FutureProcess};
use crate::actor::message::message::Message;
use crate::actor::message::message_handle::MessageHandle;
use crate::actor::process::{Process, ProcessHandle};
use crate::actor::util::async_barrier::AsyncBarrier;
#[derive(Debug, Clone)]
struct MockProcess {
name: String,
received: Arc<AtomicBool>,
notify: Arc<Notify>,
pid: Option<ExtendedPid>,
}
impl MockProcess {
async fn new(actor_system: ActorSystem, name: &str) -> Self {
let mut process = MockProcess {
name: name.to_string(),
received: Arc::new(AtomicBool::new(false)),
notify: Arc::new(Notify::new()),
pid: None,
};
let id = actor_system.get_process_registry().await.next_id();
let (pid, ok) = actor_system
.get_process_registry()
.await
.add_process(ProcessHandle::new(process.clone()), &format!("mock_{}", id));
if !ok {
panic!("failed to register mock process");
}
process.pid = Some(pid);
process
}
pub fn get_pid(&self) -> ExtendedPid {
self.pid.clone().unwrap()
}
}
#[async_trait]
impl Process for MockProcess {
async fn send_user_message(&self, _: Option<&ExtendedPid>, _: MessageHandle) {
tracing::debug!("MockProcess {} received message", self.name); self.received.store(true, Ordering::SeqCst);
self.notify.notify_one();
}
async fn send_system_message(&self, _: &ExtendedPid, _: MessageHandle) {}
async fn stop(&self, _: &ExtendedPid) {}
fn set_dead(&self) {}
fn as_any(&self) -> &dyn Any {
self
}
}
#[tokio::test]
async fn test_future_pipe_to_message() {
let system = ActorSystem::new().await;
let a1 = Arc::new(MockProcess::new(system.clone(), "a1").await);
let a2 = Arc::new(MockProcess::new(system.clone(), "a2").await);
let a3 = Arc::new(MockProcess::new(system.clone(), "a3").await);
let barrier = AsyncBarrier::new(4);
let future_process = FutureProcess::new(system, Duration::from_secs(1)).await;
future_process.pipe_to(a1.get_pid()).await;
future_process.pipe_to(a2.get_pid()).await;
future_process.pipe_to(a3.get_pid()).await;
for process in [a1.clone(), a2.clone(), a3.clone()] {
let barrier = barrier.clone();
let received = process.notify.clone();
tokio::spawn(async move {
received.notified().await;
barrier.wait().await;
});
}
future_process
.send_user_message(None, MessageHandle::new("hello".to_string()))
.await;
let timeout_result = tokio::time::timeout(Duration::from_secs(5), barrier.wait()).await;
assert!(
timeout_result.is_ok(),
"Test timed out waiting for all processes to receive the message"
);
for process in [a1.clone(), a2.clone(), a3.clone()] {
assert!(
process.received.load(Ordering::SeqCst),
"{} did not receive message",
process.name
);
}
let is_empty = future_process.is_empty().await;
assert!(!is_empty, "future should not be empty after completion");
let result = future_process.result().await;
assert!(result.is_ok(), "Expected Ok result, got {:?}", result);
let message = result.unwrap();
assert_eq!(message.as_any().downcast_ref::<String>().unwrap(), "hello");
}
#[tokio::test]
async fn test_future_pipe_to_timeout_sends_error() {
let system = ActorSystem::new().await;
let a1 = Arc::new(MockProcess::new(system.clone(), "a1").await);
let a2 = Arc::new(MockProcess::new(system.clone(), "a2").await);
let a3 = Arc::new(MockProcess::new(system.clone(), "a3").await);
let barrier = AsyncBarrier::new(4);
let future_process = FutureProcess::new(system, Duration::from_millis(100)).await;
future_process.pipe_to(a1.get_pid()).await;
future_process.pipe_to(a2.get_pid()).await;
future_process.pipe_to(a3.get_pid()).await;
for process in [a1.clone(), a2.clone(), a3.clone()] {
let barrier = barrier.clone();
let received = process.notify.clone();
tokio::spawn(async move {
received.notified().await;
barrier.wait().await;
});
}
let err = future_process.result().await;
assert!(err.is_err());
assert!(matches!(err.unwrap_err(), FutureError::Timeout));
barrier.wait().await;
for process in [a1.clone(), a2.clone(), a3.clone()] {
assert!(
process.received.load(Ordering::SeqCst),
"{} did not receive message",
process.name
);
}
assert!(
!future_process.is_empty().await,
"future should not be empty after timeout"
);
}
#[tokio::test]
async fn test_new_future_timeout_no_race() {
let system = ActorSystem::new().await;
let future_process = FutureProcess::new(system, Duration::from_millis(200)).await;
let barrier = AsyncBarrier::new(2);
tokio::spawn({
let future = future_process.clone();
let barrier = barrier.clone();
async move {
sleep(Duration::from_millis(100)).await;
future.complete(MessageHandle::new("response".to_string())).await;
barrier.wait().await;
}
});
barrier.wait().await;
let result = future_process.result().await;
assert!(result.is_ok(), "Expected Ok, got {:?}", result);
}
async fn assert_future_success(future_process: &FutureProcess) -> MessageHandle {
match future_process.result().await {
Ok(res) => res,
Err(e) => panic!("Future failed: {:?}", e),
}
}
#[tokio::test]
async fn test_future_result_dead_letter_response() {
let system = ActorSystem::new().await;
let future_process = FutureProcess::new(system, Duration::from_secs(1)).await;
future_process.fail(FutureError::DeadLetter).await;
let result = future_process.result().await;
assert!(matches!(result.unwrap_err(), FutureError::DeadLetter));
}
#[tokio::test]
async fn test_future_result_timeout() {
let system = ActorSystem::new().await;
let future_process = FutureProcess::new(system, Duration::from_millis(50)).await;
sleep(Duration::from_millis(100)).await;
let result = future_process.result().await;
assert!(matches!(result.unwrap_err(), FutureError::Timeout));
}
#[tokio::test]
async fn test_future_result_success() {
let system = ActorSystem::new().await;
let future_process = FutureProcess::new(system, Duration::from_secs(1)).await;
future_process
.complete(MessageHandle::new("response".to_string()))
.await;
let result = assert_future_success(&future_process).await;
assert_eq!(result.as_any().downcast_ref::<String>().unwrap(), "response");
}
}