Skip to main content

atomr_cluster_tools/
cluster_client.rs

1//! ClusterClient / ClusterReceptionist — dispatching to actors addressable by name.
2//!
3//! Phase 7.D of `docs/full-port-plan.md` — adds initial-contacts +
4//! retry/backoff so a non-cluster client can discover the
5//! receptionist. The wire transport plugs in once Phase 5/6 ships.
6
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use parking_lot::RwLock;
13
14use atomr_core::actor::UntypedActorRef;
15
16#[derive(Default)]
17pub struct ClusterReceptionist {
18    services: RwLock<HashMap<String, UntypedActorRef>>,
19}
20
21impl ClusterReceptionist {
22    pub fn new() -> Arc<Self> {
23        Arc::new(Self::default())
24    }
25
26    pub fn register(&self, name: impl Into<String>, r: UntypedActorRef) {
27        self.services.write().insert(name.into(), r);
28    }
29
30    pub fn lookup(&self, name: &str) -> Option<UntypedActorRef> {
31        self.services.read().get(name).cloned()
32    }
33
34    pub fn unregister(&self, name: &str) {
35        self.services.write().remove(name);
36    }
37
38    pub fn registered(&self) -> Vec<String> {
39        let mut v: Vec<String> = self.services.read().keys().cloned().collect();
40        v.sort();
41        v
42    }
43}
44
45/// Settings for a `ClusterClient`.
46#[derive(Debug, Clone)]
47#[non_exhaustive]
48pub struct ClusterClientSettings {
49    /// Initial bootstrap addresses to try in order. The first one
50    /// that responds wins.
51    pub initial_contacts: Vec<String>,
52    /// Time to wait for an initial contact before trying the next.
53    pub establishing_get_contacts_interval: Duration,
54    /// Backoff between full re-attempts after exhausting all initial
55    /// contacts.
56    pub reconnect_timeout: Duration,
57    /// Total attempts before giving up.
58    pub max_attempts: u32,
59}
60
61impl Default for ClusterClientSettings {
62    fn default() -> Self {
63        Self {
64            initial_contacts: Vec::new(),
65            establishing_get_contacts_interval: Duration::from_secs(3),
66            reconnect_timeout: Duration::from_secs(10),
67            max_attempts: 10,
68        }
69    }
70}
71
72impl ClusterClientSettings {
73    pub fn with_initial_contacts<I, S>(mut self, contacts: I) -> Self
74    where
75        I: IntoIterator<Item = S>,
76        S: Into<String>,
77    {
78        self.initial_contacts = contacts.into_iter().map(Into::into).collect();
79        self
80    }
81
82    pub fn with_max_attempts(mut self, n: u32) -> Self {
83        self.max_attempts = n;
84        self
85    }
86}
87
88pub struct ClusterClient {
89    pub receptionist: Arc<ClusterReceptionist>,
90    settings: ClusterClientSettings,
91    /// Round-robin cursor over initial contacts.
92    contact_cursor: AtomicUsize,
93}
94
95impl ClusterClient {
96    pub fn new(receptionist: Arc<ClusterReceptionist>) -> Self {
97        Self::with_settings(receptionist, ClusterClientSettings::default())
98    }
99
100    pub fn with_settings(receptionist: Arc<ClusterReceptionist>, settings: ClusterClientSettings) -> Self {
101        Self { receptionist, settings, contact_cursor: AtomicUsize::new(0) }
102    }
103
104    /// Direct receptionist lookup (in-process / single-node case).
105    pub fn send(&self, name: &str) -> Option<UntypedActorRef> {
106        self.receptionist.lookup(name)
107    }
108
109    /// Pick the next initial contact (round-robin). Returns `None`
110    /// if `initial_contacts` is empty.
111    pub fn next_contact(&self) -> Option<String> {
112        if self.settings.initial_contacts.is_empty() {
113            return None;
114        }
115        let i = self.contact_cursor.fetch_add(1, Ordering::Relaxed) % self.settings.initial_contacts.len();
116        Some(self.settings.initial_contacts[i].clone())
117    }
118
119    /// `establish` — drive contact-point discovery using `try_resolve`
120    /// until it returns `Some(_)` or `max_attempts` is exhausted.
121    /// Sleeps `establishing_get_contacts_interval` between attempts.
122    pub async fn establish<F>(&self, mut try_resolve: F) -> Result<UntypedActorRef, ClusterClientError>
123    where
124        F: FnMut(&str) -> Option<UntypedActorRef>,
125    {
126        if self.settings.initial_contacts.is_empty() {
127            return Err(ClusterClientError::NoContacts);
128        }
129        for attempt in 0..self.settings.max_attempts {
130            let Some(contact) = self.next_contact() else {
131                break;
132            };
133            if let Some(r) = try_resolve(&contact) {
134                return Ok(r);
135            }
136            if attempt + 1 < self.settings.max_attempts {
137                tokio::time::sleep(self.settings.establishing_get_contacts_interval).await;
138            }
139        }
140        Err(ClusterClientError::Exhausted { attempts: self.settings.max_attempts })
141    }
142}
143
144#[derive(Debug, thiserror::Error)]
145#[non_exhaustive]
146pub enum ClusterClientError {
147    #[error("no initial contacts configured")]
148    NoContacts,
149    #[error("contact-point resolution failed after {attempts} attempts")]
150    Exhausted { attempts: u32 },
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use atomr_core::actor::Inbox;
157    use std::sync::atomic::AtomicU32;
158
159    #[test]
160    fn receptionist_register_lookup() {
161        let rec = ClusterReceptionist::new();
162        let inbox = Inbox::<u32>::new("svc");
163        rec.register("svc", inbox.actor_ref().as_untyped());
164        let c = ClusterClient::new(rec);
165        assert!(c.send("svc").is_some());
166    }
167
168    #[test]
169    fn next_contact_round_robins() {
170        let rec = ClusterReceptionist::new();
171        let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b", "c"]);
172        let c = ClusterClient::with_settings(rec, s);
173        assert_eq!(c.next_contact().as_deref(), Some("a"));
174        assert_eq!(c.next_contact().as_deref(), Some("b"));
175        assert_eq!(c.next_contact().as_deref(), Some("c"));
176        assert_eq!(c.next_contact().as_deref(), Some("a"));
177    }
178
179    #[tokio::test]
180    async fn establish_returns_first_resolved_contact() {
181        let rec = ClusterReceptionist::new();
182        let inbox = Inbox::<u32>::new("svc");
183        let target = inbox.actor_ref().as_untyped();
184        let target_clone = target.clone();
185        let s = ClusterClientSettings::default().with_initial_contacts(vec!["a", "b"]).with_max_attempts(3);
186        let c = ClusterClient::with_settings(rec, s);
187        let calls = AtomicU32::new(0);
188        let result = c
189            .establish(|contact| {
190                calls.fetch_add(1, Ordering::SeqCst);
191                if contact == "b" {
192                    Some(target_clone.clone())
193                } else {
194                    None
195                }
196            })
197            .await
198            .unwrap();
199        assert_eq!(result.path(), target.path());
200        assert_eq!(calls.load(Ordering::SeqCst), 2);
201    }
202
203    #[tokio::test]
204    async fn establish_no_contacts_errors() {
205        let rec = ClusterReceptionist::new();
206        let c = ClusterClient::new(rec);
207        let r = c.establish(|_| None).await;
208        assert!(matches!(r, Err(ClusterClientError::NoContacts)));
209    }
210
211    #[tokio::test]
212    async fn establish_exhausts_after_max_attempts() {
213        let rec = ClusterReceptionist::new();
214        let s = ClusterClientSettings {
215            initial_contacts: vec!["x".into()],
216            establishing_get_contacts_interval: Duration::from_millis(1),
217            reconnect_timeout: Duration::from_millis(1),
218            max_attempts: 3,
219        };
220        let c = ClusterClient::with_settings(rec, s);
221        let r = c.establish(|_| None).await;
222        assert!(matches!(r, Err(ClusterClientError::Exhausted { attempts: 3 })));
223    }
224
225    #[test]
226    fn registered_lists_services_sorted() {
227        let rec = ClusterReceptionist::new();
228        let inbox = Inbox::<u32>::new("x");
229        rec.register("zebra", inbox.actor_ref().as_untyped());
230        rec.register("alpha", inbox.actor_ref().as_untyped());
231        rec.register("middle", inbox.actor_ref().as_untyped());
232        assert_eq!(rec.registered(), vec!["alpha", "middle", "zebra"]);
233    }
234}