rakka_cluster_tools/
cluster_client.rs1use std::collections::HashMap;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use parking_lot::RwLock;
14
15use rakka_core::actor::UntypedActorRef;
16
17#[derive(Default)]
18pub struct ClusterReceptionist {
19 services: RwLock<HashMap<String, UntypedActorRef>>,
20}
21
22impl ClusterReceptionist {
23 pub fn new() -> Arc<Self> {
24 Arc::new(Self::default())
25 }
26
27 pub fn register(&self, name: impl Into<String>, r: UntypedActorRef) {
28 self.services.write().insert(name.into(), r);
29 }
30
31 pub fn lookup(&self, name: &str) -> Option<UntypedActorRef> {
32 self.services.read().get(name).cloned()
33 }
34
35 pub fn unregister(&self, name: &str) {
36 self.services.write().remove(name);
37 }
38
39 pub fn registered(&self) -> Vec<String> {
40 let mut v: Vec<String> = self.services.read().keys().cloned().collect();
41 v.sort();
42 v
43 }
44}
45
46#[derive(Debug, Clone)]
49#[non_exhaustive]
50pub struct ClusterClientSettings {
51 pub initial_contacts: Vec<String>,
54 pub establishing_get_contacts_interval: Duration,
56 pub reconnect_timeout: Duration,
59 pub max_attempts: u32,
61}
62
63impl Default for ClusterClientSettings {
64 fn default() -> Self {
65 Self {
66 initial_contacts: Vec::new(),
67 establishing_get_contacts_interval: Duration::from_secs(3),
68 reconnect_timeout: Duration::from_secs(10),
69 max_attempts: 10,
70 }
71 }
72}
73
74impl ClusterClientSettings {
75 pub fn with_initial_contacts<I, S>(mut self, contacts: I) -> Self
76 where
77 I: IntoIterator<Item = S>,
78 S: Into<String>,
79 {
80 self.initial_contacts = contacts.into_iter().map(Into::into).collect();
81 self
82 }
83
84 pub fn with_max_attempts(mut self, n: u32) -> Self {
85 self.max_attempts = n;
86 self
87 }
88}
89
90pub struct ClusterClient {
91 pub receptionist: Arc<ClusterReceptionist>,
92 settings: ClusterClientSettings,
93 contact_cursor: AtomicUsize,
95}
96
97impl ClusterClient {
98 pub fn new(receptionist: Arc<ClusterReceptionist>) -> Self {
99 Self::with_settings(receptionist, ClusterClientSettings::default())
100 }
101
102 pub fn with_settings(receptionist: Arc<ClusterReceptionist>, settings: ClusterClientSettings) -> Self {
103 Self { receptionist, settings, contact_cursor: AtomicUsize::new(0) }
104 }
105
106 pub fn send(&self, name: &str) -> Option<UntypedActorRef> {
108 self.receptionist.lookup(name)
109 }
110
111 pub fn next_contact(&self) -> Option<String> {
114 if self.settings.initial_contacts.is_empty() {
115 return None;
116 }
117 let i = self.contact_cursor.fetch_add(1, Ordering::Relaxed) % self.settings.initial_contacts.len();
118 Some(self.settings.initial_contacts[i].clone())
119 }
120
121 pub async fn establish<F>(&self, mut try_resolve: F) -> Result<UntypedActorRef, ClusterClientError>
125 where
126 F: FnMut(&str) -> Option<UntypedActorRef>,
127 {
128 if self.settings.initial_contacts.is_empty() {
129 return Err(ClusterClientError::NoContacts);
130 }
131 for attempt in 0..self.settings.max_attempts {
132 let Some(contact) = self.next_contact() else {
133 break;
134 };
135 if let Some(r) = try_resolve(&contact) {
136 return Ok(r);
137 }
138 if attempt + 1 < self.settings.max_attempts {
139 tokio::time::sleep(self.settings.establishing_get_contacts_interval).await;
140 }
141 }
142 Err(ClusterClientError::Exhausted { attempts: self.settings.max_attempts })
143 }
144}
145
146#[derive(Debug, thiserror::Error)]
147#[non_exhaustive]
148pub enum ClusterClientError {
149 #[error("no initial contacts configured")]
150 NoContacts,
151 #[error("contact-point resolution failed after {attempts} attempts")]
152 Exhausted { attempts: u32 },
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use rakka_core::actor::Inbox;
159 use std::sync::atomic::AtomicU32;
160
161 #[test]
162 fn receptionist_register_lookup() {
163 let rec = ClusterReceptionist::new();
164 let inbox = Inbox::<u32>::new("svc");
165 rec.register("svc", inbox.actor_ref().as_untyped());
166 let c = ClusterClient::new(rec);
167 assert!(c.send("svc").is_some());
168 }
169
170 #[test]
171 fn next_contact_round_robins() {
172 let rec = ClusterReceptionist::new();
173 let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b", "c"]);
174 let c = ClusterClient::with_settings(rec, s);
175 assert_eq!(c.next_contact().as_deref(), Some("a"));
176 assert_eq!(c.next_contact().as_deref(), Some("b"));
177 assert_eq!(c.next_contact().as_deref(), Some("c"));
178 assert_eq!(c.next_contact().as_deref(), Some("a"));
179 }
180
181 #[tokio::test]
182 async fn establish_returns_first_resolved_contact() {
183 let rec = ClusterReceptionist::new();
184 let inbox = Inbox::<u32>::new("svc");
185 let target = inbox.actor_ref().as_untyped();
186 let target_clone = target.clone();
187 let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b"]).with_max_attempts(3);
188 let c = ClusterClient::with_settings(rec, s);
189 let calls = AtomicU32::new(0);
190 let result = c
191 .establish(|contact| {
192 calls.fetch_add(1, Ordering::SeqCst);
193 if contact == "b" {
194 Some(target_clone.clone())
195 } else {
196 None
197 }
198 })
199 .await
200 .unwrap();
201 assert_eq!(result.path(), target.path());
202 assert_eq!(calls.load(Ordering::SeqCst), 2);
203 }
204
205 #[tokio::test]
206 async fn establish_no_contacts_errors() {
207 let rec = ClusterReceptionist::new();
208 let c = ClusterClient::new(rec);
209 let r = c.establish(|_| None).await;
210 assert!(matches!(r, Err(ClusterClientError::NoContacts)));
211 }
212
213 #[tokio::test]
214 async fn establish_exhausts_after_max_attempts() {
215 let rec = ClusterReceptionist::new();
216 let s = ClusterClientSettings {
217 initial_contacts: vec!["x".into()],
218 establishing_get_contacts_interval: Duration::from_millis(1),
219 reconnect_timeout: Duration::from_millis(1),
220 max_attempts: 3,
221 };
222 let c = ClusterClient::with_settings(rec, s);
223 let r = c.establish(|_| None).await;
224 assert!(matches!(r, Err(ClusterClientError::Exhausted { attempts: 3 })));
225 }
226
227 #[test]
228 fn registered_lists_services_sorted() {
229 let rec = ClusterReceptionist::new();
230 let inbox = Inbox::<u32>::new("x");
231 rec.register("zebra", inbox.actor_ref().as_untyped());
232 rec.register("alpha", inbox.actor_ref().as_untyped());
233 rec.register("middle", inbox.actor_ref().as_untyped());
234 assert_eq!(rec.registered(), vec!["alpha", "middle", "zebra"]);
235 }
236}