use std::sync::Arc;
use atomr_core::actor::{ActorRef, ActorSystem, Address, Context, Props};
use atomr_core::prelude::*;
use atomr_remote::{RemoteSettings, RemoteSystem};
use parking_lot::RwLock;
use crate::gossip::Gossip;
use crate::reachability::ReachabilityStatus;
#[derive(Clone)]
pub struct ClusterRemoteAdapter {
inner: Arc<ClusterRemoteAdapterInner>,
}
struct ClusterRemoteAdapterInner {
remote: RemoteSystem,
state: RwLock<Gossip>,
cluster_path: String,
self_address: Address,
cluster_ref: ActorRef<Gossip>,
}
struct ClusterActor {
state: Arc<RwLock<Gossip>>,
}
#[async_trait]
impl Actor for ClusterActor {
type Msg = Gossip;
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Gossip) {
let merged = self.state.read().merge(&msg);
*self.state.write() = merged;
}
}
impl ClusterRemoteAdapter {
pub async fn start(
system: ActorSystem,
bind: std::net::SocketAddr,
settings: RemoteSettings,
) -> Result<Self, atomr_remote::TransportError> {
let remote = RemoteSystem::start(system.clone(), bind, settings).await?;
remote.register_bincode::<Gossip>();
let state = Arc::new(RwLock::new(Gossip::new()));
let state_for_actor = state.clone();
let cluster_ref = system
.actor_of(Props::create(move || ClusterActor { state: state_for_actor.clone() }), "cluster")
.map_err(|e| atomr_remote::TransportError::Other(e.to_string()))?;
remote.expose_actor(cluster_ref.clone());
let cluster_path = "/user/cluster".to_string();
let self_address = remote.local_address.clone();
Ok(Self {
inner: Arc::new(ClusterRemoteAdapterInner {
remote,
state: RwLock::new(Gossip::new()),
cluster_path,
self_address,
cluster_ref,
}),
})
}
pub fn self_address(&self) -> &Address {
&self.inner.self_address
}
pub fn cluster_ref(&self) -> &ActorRef<Gossip> {
&self.inner.cluster_ref
}
pub async fn send_gossip(&self, peer: &Address) -> Result<(), atomr_remote::TransportError> {
let target = format!("{}{}", peer, self.inner.cluster_path);
let Some(handle) = self.inner.remote.actor_selection::<Gossip>(&target) else {
return Err(atomr_remote::TransportError::NotAssociated(target));
};
let g = self.inner.state.read().clone();
handle.tell(g);
Ok(())
}
pub fn update_local<F>(&self, f: F)
where
F: FnOnce(&mut Gossip),
{
let mut g = self.inner.state.write();
f(&mut g);
}
pub fn snapshot(&self) -> Gossip {
self.inner.state.read().clone()
}
pub fn mark_unreachable(&self, observer: &Address, peer: &Address) {
let mut g = self.inner.state.write();
g.state
.reachability
.records
.insert((observer.clone(), peer.clone()), ReachabilityStatus::Unreachable);
}
pub fn refresh_reachability(&self) {
let detectors = self.inner.remote.endpoint_manager().failure_detectors();
for addr_str in detectors.addresses() {
if let Some(addr) = Address::parse(&addr_str) {
if !detectors.is_available(&addr) {
self.mark_unreachable(&self.inner.self_address, &addr);
}
}
}
}
pub async fn shutdown(&self) {
self.inner.remote.shutdown().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::member::Member;
use std::time::Duration;
async fn boot(name: &str) -> ClusterRemoteAdapter {
let sys = ActorSystem::create(name, atomr_config::Config::reference()).await.unwrap();
ClusterRemoteAdapter::start(sys, "127.0.0.1:0".parse().unwrap(), RemoteSettings::default())
.await
.unwrap()
}
#[tokio::test]
async fn gossip_propagates_between_two_nodes() {
let a = boot("ClusterA").await;
let b = boot("ClusterB").await;
a.update_local(|g| {
g.tick("ClusterA");
g.state.add_or_update(Member::new(a.self_address().clone(), vec![]));
});
b.update_local(|g| {
g.tick("ClusterB");
g.state.add_or_update(Member::new(b.self_address().clone(), vec![]));
});
a.send_gossip(b.self_address()).await.unwrap();
b.send_gossip(a.self_address()).await.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(50)).await;
}
a.send_gossip(b.self_address()).await.unwrap();
b.send_gossip(a.self_address()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
a.shutdown().await;
b.shutdown().await;
}
}