Skip to main content

rakka_cluster_tools/
cluster_client.rs

1//! ClusterClient / ClusterReceptionist — dispatching to actors addressable by name.
2//! akka.net: `Akka.Cluster.Tools/Client/ClusterClient.cs`.
3//!
4//! Phase 7.D of `docs/full-port-plan.md` — adds initial-contacts +
5//! retry/backoff so a non-cluster client can discover the
6//! receptionist. The wire transport plugs in once Phase 5/6 ships.
7
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use parking_lot::RwLock;
14
15use rakka_core::actor::UntypedActorRef;
16
17#[derive(Default)]
18pub struct ClusterReceptionist {
19    services: RwLock<HashMap<String, UntypedActorRef>>,
20}
21
22impl ClusterReceptionist {
23    pub fn new() -> Arc<Self> {
24        Arc::new(Self::default())
25    }
26
27    pub fn register(&self, name: impl Into<String>, r: UntypedActorRef) {
28        self.services.write().insert(name.into(), r);
29    }
30
31    pub fn lookup(&self, name: &str) -> Option<UntypedActorRef> {
32        self.services.read().get(name).cloned()
33    }
34
35    pub fn unregister(&self, name: &str) {
36        self.services.write().remove(name);
37    }
38
39    pub fn registered(&self) -> Vec<String> {
40        let mut v: Vec<String> = self.services.read().keys().cloned().collect();
41        v.sort();
42        v
43    }
44}
45
46/// Settings for a `ClusterClient`. Mirrors akka.net's
47/// `ClusterClientSettings`.
48#[derive(Debug, Clone)]
49#[non_exhaustive]
50pub struct ClusterClientSettings {
51    /// Initial bootstrap addresses to try in order. The first one
52    /// that responds wins.
53    pub initial_contacts: Vec<String>,
54    /// Time to wait for an initial contact before trying the next.
55    pub establishing_get_contacts_interval: Duration,
56    /// Backoff between full re-attempts after exhausting all initial
57    /// contacts.
58    pub reconnect_timeout: Duration,
59    /// Total attempts before giving up.
60    pub max_attempts: u32,
61}
62
63impl Default for ClusterClientSettings {
64    fn default() -> Self {
65        Self {
66            initial_contacts: Vec::new(),
67            establishing_get_contacts_interval: Duration::from_secs(3),
68            reconnect_timeout: Duration::from_secs(10),
69            max_attempts: 10,
70        }
71    }
72}
73
74impl ClusterClientSettings {
75    pub fn with_initial_contacts<I, S>(mut self, contacts: I) -> Self
76    where
77        I: IntoIterator<Item = S>,
78        S: Into<String>,
79    {
80        self.initial_contacts = contacts.into_iter().map(Into::into).collect();
81        self
82    }
83
84    pub fn with_max_attempts(mut self, n: u32) -> Self {
85        self.max_attempts = n;
86        self
87    }
88}
89
90pub struct ClusterClient {
91    pub receptionist: Arc<ClusterReceptionist>,
92    settings: ClusterClientSettings,
93    /// Round-robin cursor over initial contacts.
94    contact_cursor: AtomicUsize,
95}
96
97impl ClusterClient {
98    pub fn new(receptionist: Arc<ClusterReceptionist>) -> Self {
99        Self::with_settings(receptionist, ClusterClientSettings::default())
100    }
101
102    pub fn with_settings(receptionist: Arc<ClusterReceptionist>, settings: ClusterClientSettings) -> Self {
103        Self { receptionist, settings, contact_cursor: AtomicUsize::new(0) }
104    }
105
106    /// Direct receptionist lookup (in-process / single-node case).
107    pub fn send(&self, name: &str) -> Option<UntypedActorRef> {
108        self.receptionist.lookup(name)
109    }
110
111    /// Pick the next initial contact (round-robin). Returns `None`
112    /// if `initial_contacts` is empty.
113    pub fn next_contact(&self) -> Option<String> {
114        if self.settings.initial_contacts.is_empty() {
115            return None;
116        }
117        let i = self.contact_cursor.fetch_add(1, Ordering::Relaxed) % self.settings.initial_contacts.len();
118        Some(self.settings.initial_contacts[i].clone())
119    }
120
121    /// `establish` — drive contact-point discovery using `try_resolve`
122    /// until it returns `Some(_)` or `max_attempts` is exhausted.
123    /// Sleeps `establishing_get_contacts_interval` between attempts.
124    pub async fn establish<F>(&self, mut try_resolve: F) -> Result<UntypedActorRef, ClusterClientError>
125    where
126        F: FnMut(&str) -> Option<UntypedActorRef>,
127    {
128        if self.settings.initial_contacts.is_empty() {
129            return Err(ClusterClientError::NoContacts);
130        }
131        for attempt in 0..self.settings.max_attempts {
132            let Some(contact) = self.next_contact() else {
133                break;
134            };
135            if let Some(r) = try_resolve(&contact) {
136                return Ok(r);
137            }
138            if attempt + 1 < self.settings.max_attempts {
139                tokio::time::sleep(self.settings.establishing_get_contacts_interval).await;
140            }
141        }
142        Err(ClusterClientError::Exhausted { attempts: self.settings.max_attempts })
143    }
144}
145
146#[derive(Debug, thiserror::Error)]
147#[non_exhaustive]
148pub enum ClusterClientError {
149    #[error("no initial contacts configured")]
150    NoContacts,
151    #[error("contact-point resolution failed after {attempts} attempts")]
152    Exhausted { attempts: u32 },
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use rakka_core::actor::Inbox;
159    use std::sync::atomic::AtomicU32;
160
161    #[test]
162    fn receptionist_register_lookup() {
163        let rec = ClusterReceptionist::new();
164        let inbox = Inbox::<u32>::new("svc");
165        rec.register("svc", inbox.actor_ref().as_untyped());
166        let c = ClusterClient::new(rec);
167        assert!(c.send("svc").is_some());
168    }
169
170    #[test]
171    fn next_contact_round_robins() {
172        let rec = ClusterReceptionist::new();
173        let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b", "c"]);
174        let c = ClusterClient::with_settings(rec, s);
175        assert_eq!(c.next_contact().as_deref(), Some("a"));
176        assert_eq!(c.next_contact().as_deref(), Some("b"));
177        assert_eq!(c.next_contact().as_deref(), Some("c"));
178        assert_eq!(c.next_contact().as_deref(), Some("a"));
179    }
180
181    #[tokio::test]
182    async fn establish_returns_first_resolved_contact() {
183        let rec = ClusterReceptionist::new();
184        let inbox = Inbox::<u32>::new("svc");
185        let target = inbox.actor_ref().as_untyped();
186        let target_clone = target.clone();
187        let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b"]).with_max_attempts(3);
188        let c = ClusterClient::with_settings(rec, s);
189        let calls = AtomicU32::new(0);
190        let result = c
191            .establish(|contact| {
192                calls.fetch_add(1, Ordering::SeqCst);
193                if contact == "b" {
194                    Some(target_clone.clone())
195                } else {
196                    None
197                }
198            })
199            .await
200            .unwrap();
201        assert_eq!(result.path(), target.path());
202        assert_eq!(calls.load(Ordering::SeqCst), 2);
203    }
204
205    #[tokio::test]
206    async fn establish_no_contacts_errors() {
207        let rec = ClusterReceptionist::new();
208        let c = ClusterClient::new(rec);
209        let r = c.establish(|_| None).await;
210        assert!(matches!(r, Err(ClusterClientError::NoContacts)));
211    }
212
213    #[tokio::test]
214    async fn establish_exhausts_after_max_attempts() {
215        let rec = ClusterReceptionist::new();
216        let s = ClusterClientSettings {
217            initial_contacts: vec!["x".into()],
218            establishing_get_contacts_interval: Duration::from_millis(1),
219            reconnect_timeout: Duration::from_millis(1),
220            max_attempts: 3,
221        };
222        let c = ClusterClient::with_settings(rec, s);
223        let r = c.establish(|_| None).await;
224        assert!(matches!(r, Err(ClusterClientError::Exhausted { attempts: 3 })));
225    }
226
227    #[test]
228    fn registered_lists_services_sorted() {
229        let rec = ClusterReceptionist::new();
230        let inbox = Inbox::<u32>::new("x");
231        rec.register("zebra", inbox.actor_ref().as_untyped());
232        rec.register("alpha", inbox.actor_ref().as_untyped());
233        rec.register("middle", inbox.actor_ref().as_untyped());
234        assert_eq!(rec.registered(), vec!["alpha", "middle", "zebra"]);
235    }
236}