atomr_cluster/
remote_adapter.rs1use std::sync::Arc;
15
16use atomr_core::actor::{ActorRef, ActorSystem, Address, Context, Props};
17use atomr_core::prelude::*;
18use atomr_remote::{RemoteSettings, RemoteSystem};
19use parking_lot::RwLock;
20
21use crate::gossip::Gossip;
22use crate::reachability::ReachabilityStatus;
23
24#[derive(Clone)]
25pub struct ClusterRemoteAdapter {
26 inner: Arc<ClusterRemoteAdapterInner>,
27}
28
29struct ClusterRemoteAdapterInner {
30 remote: RemoteSystem,
31 state: RwLock<Gossip>,
32 cluster_path: String,
33 self_address: Address,
34 cluster_ref: ActorRef<Gossip>,
35}
36
37struct ClusterActor {
38 state: Arc<RwLock<Gossip>>,
39}
40
41#[async_trait]
42impl Actor for ClusterActor {
43 type Msg = Gossip;
44 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Gossip) {
45 let merged = self.state.read().merge(&msg);
46 *self.state.write() = merged;
47 }
48}
49
50impl ClusterRemoteAdapter {
51 pub async fn start(
52 system: ActorSystem,
53 bind: std::net::SocketAddr,
54 settings: RemoteSettings,
55 ) -> Result<Self, atomr_remote::TransportError> {
56 let remote = RemoteSystem::start(system.clone(), bind, settings).await?;
57 remote.register_bincode::<Gossip>();
58
59 let state = Arc::new(RwLock::new(Gossip::new()));
60 let state_for_actor = state.clone();
61 let cluster_ref = system
62 .actor_of(Props::create(move || ClusterActor { state: state_for_actor.clone() }), "cluster")
63 .map_err(|e| atomr_remote::TransportError::Other(e.to_string()))?;
64 remote.expose_actor(cluster_ref.clone());
65
66 let cluster_path = "/user/cluster".to_string();
67 let self_address = remote.local_address.clone();
68
69 Ok(Self {
70 inner: Arc::new(ClusterRemoteAdapterInner {
71 remote,
72 state: RwLock::new(Gossip::new()),
73 cluster_path,
74 self_address,
75 cluster_ref,
76 }),
77 })
78 }
79
80 pub fn self_address(&self) -> &Address {
81 &self.inner.self_address
82 }
83
84 pub fn cluster_ref(&self) -> &ActorRef<Gossip> {
85 &self.inner.cluster_ref
86 }
87
88 pub async fn send_gossip(&self, peer: &Address) -> Result<(), atomr_remote::TransportError> {
90 let target = format!("{}{}", peer, self.inner.cluster_path);
91 let Some(handle) = self.inner.remote.actor_selection::<Gossip>(&target) else {
92 return Err(atomr_remote::TransportError::NotAssociated(target));
93 };
94 let g = self.inner.state.read().clone();
95 handle.tell(g);
96 Ok(())
97 }
98
99 pub fn update_local<F>(&self, f: F)
102 where
103 F: FnOnce(&mut Gossip),
104 {
105 let mut g = self.inner.state.write();
106 f(&mut g);
107 }
108
109 pub fn snapshot(&self) -> Gossip {
110 self.inner.state.read().clone()
111 }
112
113 pub fn mark_unreachable(&self, observer: &Address, peer: &Address) {
117 let mut g = self.inner.state.write();
118 g.state
119 .reachability
120 .records
121 .insert((observer.clone(), peer.clone()), ReachabilityStatus::Unreachable);
122 }
123
124 pub fn refresh_reachability(&self) {
127 let detectors = self.inner.remote.endpoint_manager().failure_detectors();
128 for addr_str in detectors.addresses() {
129 if let Some(addr) = Address::parse(&addr_str) {
130 if !detectors.is_available(&addr) {
131 self.mark_unreachable(&self.inner.self_address, &addr);
132 }
133 }
134 }
135 }
136
137 pub async fn shutdown(&self) {
138 self.inner.remote.shutdown().await;
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::member::Member;
146 use std::time::Duration;
147
148 async fn boot(name: &str) -> ClusterRemoteAdapter {
149 let sys = ActorSystem::create(name, atomr_config::Config::reference()).await.unwrap();
150 ClusterRemoteAdapter::start(sys, "127.0.0.1:0".parse().unwrap(), RemoteSettings::default())
151 .await
152 .unwrap()
153 }
154
155 #[tokio::test]
156 async fn gossip_propagates_between_two_nodes() {
157 let a = boot("ClusterA").await;
158 let b = boot("ClusterB").await;
159
160 a.update_local(|g| {
161 g.tick("ClusterA");
162 g.state.add_or_update(Member::new(a.self_address().clone(), vec![]));
163 });
164 b.update_local(|g| {
165 g.tick("ClusterB");
166 g.state.add_or_update(Member::new(b.self_address().clone(), vec![]));
167 });
168
169 a.send_gossip(b.self_address()).await.unwrap();
170 b.send_gossip(a.self_address()).await.unwrap();
171
172 let deadline = std::time::Instant::now() + Duration::from_secs(2);
174 while std::time::Instant::now() < deadline {
175 tokio::time::sleep(Duration::from_millis(50)).await;
180 }
181
182 a.send_gossip(b.self_address()).await.unwrap();
186 b.send_gossip(a.self_address()).await.unwrap();
187 tokio::time::sleep(Duration::from_millis(200)).await;
188
189 a.shutdown().await;
190 b.shutdown().await;
191 }
192}