use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::actor::derived_actor::DerivedActorRef;
use crate::common_test::periodic_check;
use crate::concurrency::sleep;
use crate::concurrency::Duration;
use crate::Actor;
use crate::ActorCell;
use crate::ActorProcessingErr;
use crate::ActorRef;
use crate::ActorStatus;
use crate::MessagingErr;
use crate::RactorErr;
use crate::SpawnErr;
use crate::SupervisionEvent;
struct EmptyMessage;
#[cfg(feature = "cluster")]
impl crate::Message for EmptyMessage {}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
async fn test_panic_on_start_captured() {
#[derive(Default)]
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
panic!("Boom!");
}
}
let actor_output = crate::spawn::<TestActor>(()).await;
assert!(matches!(actor_output, Err(SpawnErr::StartupFailed(_))));
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_error_on_start_captured() {
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Err(From::from("boom"))
}
}
let actor_output = Actor::spawn(None, TestActor, ()).await;
assert!(matches!(actor_output, Err(SpawnErr::StartupFailed(_))));
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_stop_higher_priority_over_messages() {
let message_counter = Arc::new(AtomicU8::new(0u8));
struct TestActor {
counter: Arc<AtomicU8>,
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.counter.fetch_add(1, Ordering::Relaxed);
crate::concurrency::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(
None,
TestActor {
counter: message_counter.clone(),
},
(),
)
.await
.expect("Actor failed to start");
#[cfg(feature = "cluster")]
assert!(!actor.supports_remoting());
for _i in 0..10 {
actor
.send_message(EmptyMessage)
.expect("Failed to send message to actor");
}
crate::concurrency::sleep(Duration::from_millis(10)).await;
actor.stop(None);
crate::concurrency::sleep(Duration::from_millis(10)).await;
assert_eq!(ActorStatus::Running, actor.get_status());
assert!(!handle.is_finished());
crate::concurrency::sleep(Duration::from_millis(150)).await;
tracing::info!("Counter: {}", message_counter.load(Ordering::Relaxed));
assert_eq!(ActorStatus::Stopped, actor.get_status());
assert!(handle.is_finished());
assert_eq!(1, message_counter.load(Ordering::Relaxed));
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_kill_terminates_work() {
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_secs(10)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, TestActor, ())
.await
.expect("Actor failed to start");
actor
.send_message(EmptyMessage)
.expect("Failed to send message to actor");
crate::concurrency::sleep(Duration::from_millis(10)).await;
actor.kill();
crate::concurrency::sleep(Duration::from_millis(10)).await;
assert_eq!(ActorStatus::Stopped, actor.get_status());
assert!(handle.is_finished());
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_stop_does_not_terminate_async_work() {
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, TestActor, ())
.await
.expect("Actor failed to start");
actor
.send_message(EmptyMessage)
.expect("Failed to send message to actor");
crate::concurrency::sleep(Duration::from_millis(10)).await;
actor.stop(None);
crate::concurrency::sleep(Duration::from_millis(10)).await;
assert_eq!(ActorStatus::Running, actor.get_status());
assert!(!handle.is_finished());
periodic_check(
|| ActorStatus::Stopped == actor.get_status(),
Duration::from_millis(500),
)
.await;
assert!(handle.is_finished());
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_kill_terminates_supervision_work() {
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
_message: SupervisionEvent,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, TestActor, ())
.await
.expect("Actor failed to start");
let actor_cell: ActorCell = actor.clone().into();
actor
.send_supervisor_evt(SupervisionEvent::ActorStarted(actor_cell))
.expect("Failed to send message to actor");
crate::concurrency::sleep(Duration::from_millis(10)).await;
actor.kill();
crate::concurrency::sleep(Duration::from_millis(10)).await;
assert_eq!(ActorStatus::Stopped, actor.get_status());
assert!(handle.is_finished());
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_sending_message_to_invalid_actor_type() {
struct TestActor1;
struct TestMessage1;
#[cfg(feature = "cluster")]
impl crate::Message for TestMessage1 {}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor1 {
type Msg = TestMessage1;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
struct TestActor2;
struct TestMessage2;
#[cfg(feature = "cluster")]
impl crate::Message for TestMessage2 {}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor2 {
type Msg = TestMessage2;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
let (actor1, handle1) = Actor::spawn(None, TestActor1, ())
.await
.expect("Failed to start test actor 1");
let (actor2, handle2) = Actor::spawn(None, TestActor2, ())
.await
.expect("Failed to start test actor 2");
assert!(actor2
.get_cell()
.send_message::<TestMessage1>(TestMessage1)
.is_err());
actor1.stop(None);
actor2.stop(None);
handle1.await.unwrap();
handle2.await.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_sending_message_to_dead_actor() {
#[derive(Default)]
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
let (actor, _) = crate::spawn::<TestActor>(())
.await
.expect("Actor failed to start");
actor
.stop_and_wait(None, None)
.await
.expect("Failed to stop");
assert!(actor.cast(EmptyMessage).is_err());
}
#[cfg(feature = "cluster")]
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_serialized_cast() {
use crate::message::BoxedDowncastErr;
use crate::message::SerializedMessage;
use crate::Message;
let counter = Arc::new(AtomicU8::new(0));
struct TestActor {
counter: Arc<AtomicU8>,
}
struct TestMessage;
impl Message for TestMessage {
fn serializable() -> bool {
true
}
fn deserialize(_bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
Ok(TestMessage)
}
fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
Ok(crate::message::SerializedMessage::Cast {
variant: "Cast".to_string(),
args: vec![],
metadata: None,
})
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = TestMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: TestMessage,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let (actor, handle) = Actor::spawn(
None,
TestActor {
counter: counter.clone(),
},
(),
)
.await
.expect("Failed to spawn test actor");
assert!(actor.supports_remoting());
let serialized = (TestMessage).serialize().unwrap();
actor
.send_serialized(serialized)
.expect("Serialized message send failed!");
periodic_check(
|| counter.load(Ordering::Relaxed) == 1,
Duration::from_secs(2),
)
.await;
actor.stop(None);
handle.await.unwrap();
}
#[cfg(feature = "cluster")]
fn port_forward<Tin, Tout, F>(
typed_port: crate::RpcReplyPort<Tout>,
converter: F,
) -> crate::RpcReplyPort<Tin>
where
Tin: Send + 'static,
Tout: crate::Message,
F: Fn(Tin) -> Tout + Send + 'static,
{
let (tx, rx) = crate::concurrency::oneshot();
let timeout = typed_port.get_timeout();
crate::concurrency::spawn(async move {
match typed_port.get_timeout() {
Some(timeout) => {
if let Ok(Ok(result)) = crate::concurrency::timeout(timeout, rx).await {
let _ = typed_port.send(converter(result));
}
}
None => {
if let Ok(result) = rx.await {
let _ = typed_port.send(converter(result));
}
}
}
});
if let Some(to) = timeout {
crate::RpcReplyPort::<_>::from((tx, to))
} else {
crate::RpcReplyPort::<_>::from(tx)
}
}
#[cfg(feature = "cluster")]
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_serialized_rpc() {
use crate::message::BoxedDowncastErr;
use crate::message::SerializedMessage;
use crate::Message;
use crate::RpcReplyPort;
let counter = Arc::new(AtomicU8::new(0));
struct TestActor {
counter: Arc<AtomicU8>,
}
enum TestMessage {
Rpc(RpcReplyPort<String>),
}
impl Message for TestMessage {
fn serializable() -> bool {
true
}
fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
match bytes {
SerializedMessage::Call { reply, .. } => {
let tx = port_forward(reply, |data: String| data.into_bytes());
Ok(Self::Rpc(tx))
}
_ => panic!("whoopsie"),
}
}
fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
match self {
Self::Rpc(port) => {
let tx = port_forward(port, |data| String::from_utf8(data).unwrap());
Ok(SerializedMessage::Call {
args: vec![],
reply: tx,
variant: "Call".to_string(),
metadata: None,
})
}
}
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = TestMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: TestMessage,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.counter.fetch_add(1, Ordering::Relaxed);
match message {
TestMessage::Rpc(port) => {
let _ = port.send("hello".to_string());
}
}
Ok(())
}
}
let (actor, handle) = Actor::spawn(
None,
TestActor {
counter: counter.clone(),
},
(),
)
.await
.expect("Failed to spawn test actor");
let (tx, rx) = crate::concurrency::oneshot();
let msg = TestMessage::Rpc((tx, Duration::from_millis(100)).into())
.serialize()
.unwrap();
actor
.send_serialized(msg)
.expect("Serialized message send failed!");
assert!(actor.supports_remoting());
let data = rx
.await
.expect("Failed to get reply from actor (within 100ms)");
assert_eq!(data, "hello".to_string());
periodic_check(
|| counter.load(Ordering::Relaxed) == 1,
Duration::from_secs(2),
)
.await;
actor.stop(None);
handle.await.unwrap();
}
#[cfg(feature = "cluster")]
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_remote_actor() {
use crate::message::BoxedDowncastErr;
use crate::message::SerializedMessage;
use crate::ActorId;
use crate::ActorRuntime;
use crate::Message;
let counter = Arc::new(AtomicU8::new(0));
struct DummySupervisor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for DummySupervisor {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
struct TestRemoteActor {
counter: Arc<AtomicU8>,
}
struct TestRemoteMessage;
impl Message for TestRemoteMessage {
fn serializable() -> bool {
true
}
fn deserialize(_bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
Ok(TestRemoteMessage)
}
fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
Ok(crate::message::SerializedMessage::Cast {
args: vec![],
variant: "Cast".to_string(),
metadata: None,
})
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestRemoteActor {
type Msg = TestRemoteMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: TestRemoteMessage,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
panic!("Remote actor's don't handle anything");
}
async fn handle_serialized(
&self,
_myself: ActorRef<Self::Msg>,
_message: SerializedMessage,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let (sup, sup_handle) = Actor::spawn(None, DummySupervisor, ()).await.unwrap();
let (actor, handle) = ActorRuntime::spawn_linked_remote(
None,
TestRemoteActor {
counter: counter.clone(),
},
ActorId::Remote { node_id: 1, pid: 1 },
(),
sup.get_cell(),
)
.await
.expect("Failed to spawn RemoteTestActor");
actor
.send_message(TestRemoteMessage)
.expect("Failed to send non-serialized message to RemoteActor");
periodic_check(
|| counter.load(Ordering::Relaxed) == 1,
Duration::from_secs(2),
)
.await;
actor.stop(None);
sup.stop(None);
handle.await.unwrap();
sup_handle.await.unwrap();
}
#[cfg(feature = "cluster")]
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn spawning_local_actor_as_remote_fails() {
use crate::ActorProcessingErr;
struct RemoteActor;
struct RemoteActorMessage;
impl crate::Message for RemoteActorMessage {}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for RemoteActor {
type Msg = RemoteActorMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
struct EmptyActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for EmptyActor {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
let remote_pid = crate::ActorId::Local(1);
let (actor, handle) = Actor::spawn(None, EmptyActor, ())
.await
.expect("Actor failed to start");
let remote_spawn_result = crate::ActorRuntime::spawn_linked_remote(
None,
RemoteActor,
remote_pid,
(),
actor.get_cell(),
)
.await;
assert!(remote_spawn_result.is_err());
actor.stop(None);
handle.await.expect("Failed to clean stop the actor");
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn instant_spawns() {
let counter = Arc::new(AtomicU8::new(0));
struct EmptyActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for EmptyActor {
type Msg = String;
type State = Arc<AtomicU8>;
type Arguments = Arc<AtomicU8>;
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
counter: Arc<AtomicU8>,
) -> Result<Self::State, ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_millis(200)).await;
Ok(counter)
}
async fn handle(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_message: String,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let (actor, handles) = crate::ActorRuntime::spawn_instant(None, EmptyActor, counter.clone())
.expect("Failed to instant spawn");
for i in 0..10 {
actor
.cast(format!("I = {i}"))
.expect("Actor couldn't receive message!");
}
assert_eq!(0, counter.load(Ordering::Relaxed));
periodic_check(
|| counter.load(Ordering::Relaxed) == 10,
Duration::from_secs(2),
)
.await;
actor.stop(None);
handles
.await
.unwrap()
.expect("Actor's pre_start routine panicked")
.await
.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn stop_and_wait() {
struct SlowActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for SlowActor {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_millis(200)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, SlowActor, ())
.await
.expect("Failed to spawn actor");
actor
.stop_and_wait(None, None)
.await
.expect("Failed to wait for actor death");
periodic_check(|| handle.is_finished(), Duration::from_millis(500)).await;
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn kill_and_wait() {
struct SlowActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for SlowActor {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_millis(200)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, SlowActor, ())
.await
.expect("Failed to spawn actor");
actor
.kill_and_wait(None)
.await
.expect("Failed to wait for actor death");
periodic_check(|| handle.is_finished(), Duration::from_millis(500)).await;
}
#[test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
fn test_err_map() {
let err: RactorErr<i32> = RactorErr::Messaging(MessagingErr::SendErr(123));
let _: RactorErr<()> = err.map(|_| ());
}
#[test]
fn returns_actor_references() {
fn dummy_actor_cell() -> ActorCell {
ActorCell::new::<TestActor>(None).unwrap().0
}
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
let tests = [
(true, SupervisionEvent::ActorStarted(dummy_actor_cell())),
(
true,
SupervisionEvent::ActorFailed(dummy_actor_cell(), "Bang!".into()),
),
(
true,
SupervisionEvent::ActorTerminated(dummy_actor_cell(), None, Some("Foo!".to_owned())),
),
(
false,
SupervisionEvent::ProcessGroupChanged(crate::pg::GroupChangeMessage::Leave(
"Foo".into(),
"Bar".into(),
vec![dummy_actor_cell()],
)),
),
];
for (want, event) in tests {
assert_eq!(event.actor_cell(), event.actor_cell().clone());
assert_eq!(event.actor_cell().is_some(), want);
assert_eq!(event.actor_id().is_some(), want);
}
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
#[derive(Default)]
struct Test;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Test {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(&self, _: ActorRef<Self::Msg>, _: ()) -> Result<(), ActorProcessingErr> {
Err("something".into())
}
}
#[derive(Default)]
struct Test2;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Test2 {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(&self, _: ActorRef<Self::Msg>, _: ()) -> Result<(), ActorProcessingErr> {
Ok(())
}
}
let a = crate::spawn_named::<Test>("test".to_owned(), ()).await;
assert!(a.is_err());
drop(a);
let (a, h) = Actor::spawn(Some("test".to_owned()), Test2, ())
.await
.expect("Failed to spawn second actor with name clash");
a.stop(None);
h.await.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn actor_post_stop_executed_before_stop_and_wait_returns() {
struct TestActor {
signal: Arc<AtomicU8>,
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
sleep(Duration::from_millis(1000)).await;
self.signal.store(1, Ordering::SeqCst);
Ok(())
}
}
let signal = Arc::new(AtomicU8::new(0));
let (actor, handle) = Actor::spawn(
None,
TestActor {
signal: signal.clone(),
},
(),
)
.await
.expect("Failed to spawn test actor");
actor
.stop_and_wait(None, None)
.await
.expect("Failed to stop and wait");
assert_eq!(1, signal.load(Ordering::SeqCst));
handle.await.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn actor_drain_messages() {
struct TestActor {
signal: Arc<AtomicU32>,
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_: ActorRef<Self::Msg>,
_: Self::Msg,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
sleep(Duration::from_millis(10)).await;
let _ = self.signal.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
let signal = Arc::new(AtomicU32::new(0));
let (actor, handle) = Actor::spawn(
None,
TestActor {
signal: signal.clone(),
},
(),
)
.await
.expect("Failed to spawn test actor");
for _ in 0..1000 {
actor
.cast(EmptyMessage)
.expect("Failed to send message to actor");
}
assert!(signal.load(Ordering::SeqCst) < 1000);
actor.drain().expect("Failed to trigger actor draining");
assert!(actor.cast(EmptyMessage).is_err());
handle.await.unwrap();
assert_eq!(1000, signal.load(Ordering::SeqCst));
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn runtime_message_typing() {
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, TestActor, ())
.await
.expect("Failed to start test actor");
let actor: ActorCell = actor.into();
assert_eq!(Some(true), actor.is_message_type_of::<EmptyMessage>());
assert_eq!(Some(false), actor.is_message_type_of::<i64>());
actor.stop(None);
handle.await.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn wait_for_death() {
struct TestActor;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
crate::concurrency::sleep(Duration::from_millis(10)).await;
Ok(())
}
}
let (actor, handle) = Actor::spawn(None, TestActor, ())
.await
.expect("Failed to start test actor");
actor.stop(None);
assert!(actor.wait(Some(Duration::from_millis(100))).await.is_ok());
actor.stop(None);
handle.await.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn derived_actor_ref() {
let result_counter = Arc::new(AtomicU32::new(0));
struct TestActor {
counter: Arc<AtomicU32>,
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = u32;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.counter.fetch_add(message, Ordering::Relaxed);
Ok(())
}
}
let (actor, handle) = Actor::spawn(
None,
TestActor {
counter: result_counter.clone(),
},
(),
)
.await
.expect("Actor failed to start");
let mut sum: u32 = 0;
let from_u8: DerivedActorRef<u8> = actor.clone().get_derived();
let u8_message: u8 = 1;
sum += u8_message as u32;
from_u8
.send_message(u8_message)
.expect("Failed to send message to actor");
periodic_check(
|| result_counter.load(Ordering::Relaxed) == sum,
Duration::from_millis(500),
)
.await;
let from_u16: DerivedActorRef<u16> = actor.get_derived();
let u16_message: u16 = 2;
sum += u16_message as u32;
from_u16
.send_message(u16_message)
.expect("Failed to send message to actor");
actor
.get_derived()
.send_after(Duration::from_millis(10), move || u16_message)
.await
.expect("Failed to await the join handle")
.expect("Failed to send message to actor");
sum += u16_message as u32;
crate::concurrency::sleep(Duration::from_millis(50)).await;
actor
.drain_and_wait(None)
.await
.expect("Failed to drain actor");
handle.await.unwrap();
assert_eq!(result_counter.load(Ordering::Relaxed), sum);
let message: u16 = 3;
let res = from_u16.send_message(message);
assert!(res.is_err());
if let Err(MessagingErr::SendErr(failed_message)) = res {
assert_eq!(failed_message, message);
} else {
panic!("Invalid error type");
}
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn can_use_call_in_actor() {
enum TestActorMessage {
Call(crate::RpcReplyPort<u32>),
}
#[cfg(feature = "cluster")]
impl crate::Message for TestActorMessage {}
struct TestActor1;
struct TestActor2;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor1 {
type Msg = TestActorMessage;
type Arguments = ();
type State = ActorRef<TestActorMessage>;
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
let child_actor = Actor::spawn(None, TestActor2, ())
.await
.expect("Failed to spawn child actor");
Ok(child_actor.0)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
TestActorMessage::Call(reply_port) => {
let result = crate::call!(state, TestActorMessage::Call)?;
reply_port.send(result).expect("Failed to send response");
}
}
Ok(())
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor2 {
type Msg = TestActorMessage;
type Arguments = ();
type State = ();
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
TestActorMessage::Call(reply_port) => {
reply_port.send(42).expect("Failed to send response");
}
}
Ok(())
}
}
let (actor1, _) = Actor::spawn(None, TestActor1, ())
.await
.expect("Failed to spawn actor");
let result = crate::call!(actor1, TestActorMessage::Call).expect("Failed to call actor");
assert!(result == 42);
}