atomr_cluster_tools/
cluster_client.rs1use std::collections::HashMap;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use parking_lot::RwLock;
13
14use atomr_core::actor::UntypedActorRef;
15
16#[derive(Default)]
17pub struct ClusterReceptionist {
18 services: RwLock<HashMap<String, UntypedActorRef>>,
19}
20
21impl ClusterReceptionist {
22 pub fn new() -> Arc<Self> {
23 Arc::new(Self::default())
24 }
25
26 pub fn register(&self, name: impl Into<String>, r: UntypedActorRef) {
27 self.services.write().insert(name.into(), r);
28 }
29
30 pub fn lookup(&self, name: &str) -> Option<UntypedActorRef> {
31 self.services.read().get(name).cloned()
32 }
33
34 pub fn unregister(&self, name: &str) {
35 self.services.write().remove(name);
36 }
37
38 pub fn registered(&self) -> Vec<String> {
39 let mut v: Vec<String> = self.services.read().keys().cloned().collect();
40 v.sort();
41 v
42 }
43}
44
45#[derive(Debug, Clone)]
47#[non_exhaustive]
48pub struct ClusterClientSettings {
49 pub initial_contacts: Vec<String>,
52 pub establishing_get_contacts_interval: Duration,
54 pub reconnect_timeout: Duration,
57 pub max_attempts: u32,
59}
60
61impl Default for ClusterClientSettings {
62 fn default() -> Self {
63 Self {
64 initial_contacts: Vec::new(),
65 establishing_get_contacts_interval: Duration::from_secs(3),
66 reconnect_timeout: Duration::from_secs(10),
67 max_attempts: 10,
68 }
69 }
70}
71
72impl ClusterClientSettings {
73 pub fn with_initial_contacts<I, S>(mut self, contacts: I) -> Self
74 where
75 I: IntoIterator<Item = S>,
76 S: Into<String>,
77 {
78 self.initial_contacts = contacts.into_iter().map(Into::into).collect();
79 self
80 }
81
82 pub fn with_max_attempts(mut self, n: u32) -> Self {
83 self.max_attempts = n;
84 self
85 }
86}
87
88pub struct ClusterClient {
89 pub receptionist: Arc<ClusterReceptionist>,
90 settings: ClusterClientSettings,
91 contact_cursor: AtomicUsize,
93}
94
95impl ClusterClient {
96 pub fn new(receptionist: Arc<ClusterReceptionist>) -> Self {
97 Self::with_settings(receptionist, ClusterClientSettings::default())
98 }
99
100 pub fn with_settings(receptionist: Arc<ClusterReceptionist>, settings: ClusterClientSettings) -> Self {
101 Self { receptionist, settings, contact_cursor: AtomicUsize::new(0) }
102 }
103
104 pub fn send(&self, name: &str) -> Option<UntypedActorRef> {
106 self.receptionist.lookup(name)
107 }
108
109 pub fn next_contact(&self) -> Option<String> {
112 if self.settings.initial_contacts.is_empty() {
113 return None;
114 }
115 let i = self.contact_cursor.fetch_add(1, Ordering::Relaxed) % self.settings.initial_contacts.len();
116 Some(self.settings.initial_contacts[i].clone())
117 }
118
119 pub async fn establish<F>(&self, mut try_resolve: F) -> Result<UntypedActorRef, ClusterClientError>
123 where
124 F: FnMut(&str) -> Option<UntypedActorRef>,
125 {
126 if self.settings.initial_contacts.is_empty() {
127 return Err(ClusterClientError::NoContacts);
128 }
129 for attempt in 0..self.settings.max_attempts {
130 let Some(contact) = self.next_contact() else {
131 break;
132 };
133 if let Some(r) = try_resolve(&contact) {
134 return Ok(r);
135 }
136 if attempt + 1 < self.settings.max_attempts {
137 tokio::time::sleep(self.settings.establishing_get_contacts_interval).await;
138 }
139 }
140 Err(ClusterClientError::Exhausted { attempts: self.settings.max_attempts })
141 }
142}
143
144#[derive(Debug, thiserror::Error)]
145#[non_exhaustive]
146pub enum ClusterClientError {
147 #[error("no initial contacts configured")]
148 NoContacts,
149 #[error("contact-point resolution failed after {attempts} attempts")]
150 Exhausted { attempts: u32 },
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use atomr_core::actor::Inbox;
157 use std::sync::atomic::AtomicU32;
158
159 #[test]
160 fn receptionist_register_lookup() {
161 let rec = ClusterReceptionist::new();
162 let inbox = Inbox::<u32>::new("svc");
163 rec.register("svc", inbox.actor_ref().as_untyped());
164 let c = ClusterClient::new(rec);
165 assert!(c.send("svc").is_some());
166 }
167
168 #[test]
169 fn next_contact_round_robins() {
170 let rec = ClusterReceptionist::new();
171 let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b", "c"]);
172 let c = ClusterClient::with_settings(rec, s);
173 assert_eq!(c.next_contact().as_deref(), Some("a"));
174 assert_eq!(c.next_contact().as_deref(), Some("b"));
175 assert_eq!(c.next_contact().as_deref(), Some("c"));
176 assert_eq!(c.next_contact().as_deref(), Some("a"));
177 }
178
179 #[tokio::test]
180 async fn establish_returns_first_resolved_contact() {
181 let rec = ClusterReceptionist::new();
182 let inbox = Inbox::<u32>::new("svc");
183 let target = inbox.actor_ref().as_untyped();
184 let target_clone = target.clone();
185 let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b"]).with_max_attempts(3);
186 let c = ClusterClient::with_settings(rec, s);
187 let calls = AtomicU32::new(0);
188 let result = c
189 .establish(|contact| {
190 calls.fetch_add(1, Ordering::SeqCst);
191 if contact == "b" {
192 Some(target_clone.clone())
193 } else {
194 None
195 }
196 })
197 .await
198 .unwrap();
199 assert_eq!(result.path(), target.path());
200 assert_eq!(calls.load(Ordering::SeqCst), 2);
201 }
202
203 #[tokio::test]
204 async fn establish_no_contacts_errors() {
205 let rec = ClusterReceptionist::new();
206 let c = ClusterClient::new(rec);
207 let r = c.establish(|_| None).await;
208 assert!(matches!(r, Err(ClusterClientError::NoContacts)));
209 }
210
211 #[tokio::test]
212 async fn establish_exhausts_after_max_attempts() {
213 let rec = ClusterReceptionist::new();
214 let s = ClusterClientSettings {
215 initial_contacts: vec!["x".into()],
216 establishing_get_contacts_interval: Duration::from_millis(1),
217 reconnect_timeout: Duration::from_millis(1),
218 max_attempts: 3,
219 };
220 let c = ClusterClient::with_settings(rec, s);
221 let r = c.establish(|_| None).await;
222 assert!(matches!(r, Err(ClusterClientError::Exhausted { attempts: 3 })));
223 }
224
225 #[test]
226 fn registered_lists_services_sorted() {
227 let rec = ClusterReceptionist::new();
228 let inbox = Inbox::<u32>::new("x");
229 rec.register("zebra", inbox.actor_ref().as_untyped());
230 rec.register("alpha", inbox.actor_ref().as_untyped());
231 rec.register("middle", inbox.actor_ref().as_untyped());
232 assert_eq!(rec.registered(), vec!["alpha", "middle", "zebra"]);
233 }
234}