use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use ractor::concurrency::Duration;
use ractor::ActorStatus;
use crate::common_test::periodic_check;
use super::*;
type TestMessage = u32;
struct HappyActor;
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Actor for HappyActor {
type Msg = TestMessage;
type State = Arc<AtomicU32>;
type Arguments = Arc<AtomicU32>;
async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
start: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(start)
}
async fn handle(
&self,
_: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.fetch_add(message, Ordering::Relaxed);
Ok(())
}
}
struct BoomActor;
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Actor for BoomActor {
type Msg = TestMessage;
type State = Arc<AtomicU32>;
type Arguments = Arc<AtomicU32>;
async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
start: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(start)
}
async fn handle(
&self,
_: ActorRef<Self::Msg>,
_: Self::Msg,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Err(From::from("boom!"))
}
}
#[ractor::concurrency::test]
#[tracing_test::traced_test]
async fn test_broadcast() {
let counter = Arc::new(AtomicU32::new(0));
let mut actors = vec![];
for _ in 0..3 {
actors.push(
Actor::spawn(None, HappyActor, counter.clone())
.await
.expect("Failed to start test actor"),
);
}
let initial_targets = actors
.iter()
.map(|(a, _)| a.clone())
.map(Broadcaster::get_unit_mapped_target)
.collect::<Vec<_>>();
let config = BroadcasterConfig {
continue_with_dead_targets: false,
initial_targets,
};
let (bactor, bhandle) = Actor::spawn(None, Broadcaster::new(), config)
.await
.expect("Failed to start broadcast actor");
for _ in 0..3 {
bactor
.cast(BroadcasterMessage::Broadcast(12))
.expect("Failed to broadcast");
}
periodic_check(
|| {
let expected: u32 = 3 * 3 * 12;
expected == counter.load(Ordering::Relaxed)
},
Duration::from_secs(1),
)
.await;
bactor.stop(None);
bhandle.await.unwrap();
for (actor, _) in actors.iter() {
actor.stop(None);
}
for (_, handle) in actors.into_iter() {
handle.await.unwrap();
}
}
#[ractor::concurrency::test]
#[tracing_test::traced_test]
async fn test_broadcast_bad_targets() {
let counter = Arc::new(AtomicU32::new(0));
let mut actors = vec![];
for i in 0..3 {
if i == 0 {
actors.push(
Actor::spawn(None, HappyActor, counter.clone())
.await
.expect("Failed to start test actor"),
);
} else {
actors.push(
Actor::spawn(None, BoomActor, counter.clone())
.await
.expect("Failed to start test actor"),
);
}
}
let initial_targets = actors
.iter()
.map(|(a, _)| a.clone())
.map(Broadcaster::get_unit_mapped_target)
.collect::<Vec<_>>();
let config = BroadcasterConfig {
continue_with_dead_targets: true,
initial_targets,
};
let (bactor, bhandle) = Actor::spawn(None, Broadcaster::new(), config)
.await
.expect("Failed to start broadcast actor");
for _ in 0..3 {
bactor
.cast(BroadcasterMessage::Broadcast(12))
.expect("Failed to broadcast");
}
periodic_check(
|| {
let expected: u32 = 3 * 12;
expected == counter.load(Ordering::Relaxed)
},
Duration::from_secs(1),
)
.await;
bactor.stop(None);
bhandle.await.unwrap();
for (actor, _) in actors.iter() {
actor.stop(None);
}
for (_, handle) in actors.into_iter() {
handle.await.unwrap();
}
}
#[ractor::concurrency::test]
#[tracing_test::traced_test]
async fn test_broadcast_bad_targets_fails_on_target_failure() {
let counter = Arc::new(AtomicU32::new(0));
let mut actors = vec![];
for i in 0..3 {
if i == 0 {
actors.push(
Actor::spawn(None, HappyActor, counter.clone())
.await
.expect("Failed to start test actor"),
);
} else {
actors.push(
Actor::spawn(None, BoomActor, counter.clone())
.await
.expect("Failed to start test actor"),
);
}
}
let initial_targets = actors
.iter()
.map(|(a, _)| a.clone())
.map(Broadcaster::get_unit_mapped_target)
.collect::<Vec<_>>();
let config: BroadcasterConfig<u32> = BroadcasterConfig {
continue_with_dead_targets: false,
initial_targets,
};
let (bactor, bhandle) = Actor::spawn(None, Broadcaster::new(), config)
.await
.expect("Failed to start broadcast actor");
periodic_check(
|| {
let _ = bactor.cast(BroadcasterMessage::Broadcast(12));
bactor.get_status() == ActorStatus::Stopped
},
Duration::from_secs(1),
)
.await;
bhandle.await.unwrap();
for (actor, _) in actors.iter() {
actor.stop(None);
}
for (_, handle) in actors.into_iter() {
handle.await.unwrap();
}
}