pub mod util;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate async_trait;
#[macro_use]
extern crate coerce_macros;
use coerce::actor::system::ActorSystem;
use coerce::remote::system::RemoteActorSystem;
use coerce::actor::{ActorCreationErr, ActorFactory, ActorRecipe};
use util::*;
#[derive(Serialize, Deserialize)]
pub struct TestActorRecipe {
name: String,
}
impl ActorRecipe for TestActorRecipe {
fn read_from_bytes(bytes: &Vec<u8>) -> Option<Self> {
serde_json::from_slice(bytes).unwrap()
}
fn write_to_bytes(&self) -> Option<Vec<u8>> {
serde_json::to_vec(&self).ok()
}
}
#[derive(Clone)]
pub struct TestActorFactory;
#[async_trait]
impl ActorFactory for TestActorFactory {
type Actor = TestActor;
type Recipe = TestActorRecipe;
async fn create(&self, _recipe: Self::Recipe) -> Result<TestActor, ActorCreationErr> {
tracing::trace!("recipe create :D");
Ok(TestActor {
status: None,
counter: 0,
})
}
}
#[derive(Serialize, Deserialize)]
pub struct EchoActorRecipe {}
impl ActorRecipe for EchoActorRecipe {
fn read_from_bytes(bytes: &Vec<u8>) -> Option<Self> {
serde_json::from_slice(bytes).unwrap()
}
fn write_to_bytes(&self) -> Option<Vec<u8>> {
serde_json::to_vec(&self).ok()
}
}
#[derive(Clone)]
pub struct EchoActorFactory;
#[async_trait]
impl ActorFactory for EchoActorFactory {
type Actor = EchoActor;
type Recipe = EchoActorRecipe;
async fn create(&self, _recipe: Self::Recipe) -> Result<EchoActor, ActorCreationErr> {
tracing::trace!("recipe create :D");
Ok(EchoActor {})
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
pub async fn test_remote_cluster_workers() {
util::create_trace_logger();
let system = ActorSystem::new();
let _actor = system.new_tracked_actor(TestActor::new()).await.unwrap();
let remote = RemoteActorSystem::builder()
.with_tag("remote-1")
.with_id(1)
.with_actor_system(system)
.client_auth_jwt("token2", None)
.build()
.await;
let remote_c = remote.clone();
let remote_2 = RemoteActorSystem::builder()
.with_tag("remote-2")
.with_id(2)
.with_actor_system(ActorSystem::new())
.client_auth_jwt("token2", None)
.build()
.await;
let remote_2_c = remote_2.clone();
let remote_3 = RemoteActorSystem::builder()
.with_tag("remote-3")
.with_id(3)
.with_actor_system(ActorSystem::new())
.client_auth_jwt("token2", None)
.build()
.await;
let remote_3_c = remote_3.clone();
remote
.cluster_worker()
.listen_addr("localhost:30101")
.start()
.await;
remote_2
.cluster_worker()
.listen_addr("localhost:30102")
.with_seed_addr("localhost:30101")
.start()
.await;
remote_3
.cluster_worker()
.listen_addr("localhost:30103")
.with_seed_addr("localhost:30101")
.start()
.await;
let nodes_a = remote_c.get_nodes().await;
let nodes_b = remote_2_c.get_nodes().await;
let nodes_c = remote_3_c.get_nodes().await;
tracing::info!("a: {:?}", &nodes_a);
tracing::info!("b: {:?}", &nodes_b);
tracing::info!("c: {:?}", &nodes_c);
assert_eq!(nodes_a.len(), 3);
assert_eq!(nodes_b.len(), 3);
assert_eq!(nodes_c.len(), 3);
let nodes_a_in_b = nodes_a.iter().filter(|n| nodes_b.contains(n)).count();
let nodes_a_in_c = nodes_a.iter().filter(|n| nodes_c.contains(n)).count();
let nodes_b_in_a = nodes_b.iter().filter(|n| nodes_a.contains(n)).count();
let nodes_b_in_c = nodes_b.iter().filter(|n| nodes_c.contains(n)).count();
let nodes_c_in_a = nodes_c.iter().filter(|n| nodes_a.contains(n)).count();
let nodes_c_in_b = nodes_c.iter().filter(|n| nodes_b.contains(n)).count();
assert_eq!(nodes_a_in_b, nodes_a.len());
assert_eq!(nodes_a_in_c, nodes_a.len());
assert_eq!(nodes_b_in_a, nodes_b.len());
assert_eq!(nodes_b_in_c, nodes_b.len());
assert_eq!(nodes_c_in_a, nodes_c.len());
assert_eq!(nodes_c_in_b, nodes_c.len());
}