Skip to main content

atomr_remote/
remote_watcher.rs

1//! `RemoteWatcher`.
2//!
3//! Tracks local actors that are watching remote ones and surfaces
4//! `Terminated` when:
5//!
6//! * The watched actor's `ActorSystem` disassociates (graceful or quarantine).
7//! * The watched actor's UID changes (peer crash/restart).
8//! * The peer's failure detector trips.
9
10use std::collections::HashSet;
11use std::sync::Arc;
12use std::time::Duration;
13
14use parking_lot::RwLock;
15
16use atomr_core::actor::{ActorPath, RemoteRef, RemoteSystemMsg, UntypedActorRef};
17
18use crate::endpoint_manager::EndpointManager;
19use crate::failure_detector_registry::FailureDetectorRegistry;
20use crate::remote_ref::RemoteActorRefImpl;
21use crate::serialization::SerializerRegistry;
22
23#[derive(Clone, Debug)]
24struct Watch {
25    watcher: UntypedActorRef,
26    watchee: ActorPath,
27}
28
29#[derive(Clone)]
30pub struct RemoteWatcher {
31    inner: Arc<RemoteWatcherInner>,
32}
33
34struct RemoteWatcherInner {
35    endpoint_manager: EndpointManager,
36    detectors: FailureDetectorRegistry,
37    registry: SerializerRegistry,
38    local_uid: u64,
39    watches: RwLock<Vec<Watch>>,
40    terminated_addresses: RwLock<HashSet<String>>,
41    started: std::sync::OnceLock<()>,
42}
43
44impl RemoteWatcher {
45    pub fn new(endpoint_manager: EndpointManager, registry: SerializerRegistry, local_uid: u64) -> Arc<Self> {
46        let detectors = endpoint_manager.failure_detectors();
47        Arc::new(Self {
48            inner: Arc::new(RemoteWatcherInner {
49                endpoint_manager,
50                detectors,
51                registry,
52                local_uid,
53                watches: RwLock::new(Vec::new()),
54                terminated_addresses: RwLock::new(HashSet::new()),
55                started: std::sync::OnceLock::new(),
56            }),
57        })
58    }
59
60    /// Begin watching `watchee`. The local watcher receives
61    /// `SystemMsg::Terminated` if the watchee's host disassociates.
62    pub async fn watch(
63        self: &Arc<Self>,
64        watcher: UntypedActorRef,
65        watchee: ActorPath,
66    ) -> Result<(), crate::transport::TransportError> {
67        let target = watchee.address.clone();
68        // Inform the peer via a system PDU so it can echo Terminated
69        // when the actor stops there.
70        let _ = self.inner.endpoint_manager.endpoint_for(&target).await?;
71        let remote_ref = RemoteActorRefImpl::new(
72            watchee.clone(),
73            self.inner.endpoint_manager.clone(),
74            self.inner.registry.clone(),
75            self.inner.local_uid,
76        );
77        remote_ref.tell_system(RemoteSystemMsg::Watch { watcher: watcher.path().clone() });
78        self.inner.watches.write().push(Watch { watcher, watchee });
79        self.start_supervisor();
80        Ok(())
81    }
82
83    pub async fn unwatch(self: &Arc<Self>, watcher: &UntypedActorRef, watchee: &ActorPath) {
84        self.inner.watches.write().retain(|w| !(w.watcher.path() == watcher.path() && &w.watchee == watchee));
85        let target = watchee.address.clone();
86        if self.inner.endpoint_manager.endpoint_for(&target).await.is_ok() {
87            let remote_ref = RemoteActorRefImpl::new(
88                watchee.clone(),
89                self.inner.endpoint_manager.clone(),
90                self.inner.registry.clone(),
91                self.inner.local_uid,
92            );
93            remote_ref.tell_system(RemoteSystemMsg::Unwatch { watcher: watcher.path().clone() });
94        }
95    }
96
97    /// Driven by the periodic supervisor task. Surfaces `Terminated` for
98    /// any actor whose host has gone unavailable.
99    pub fn check(&self) {
100        let mut bad: Vec<String> = Vec::new();
101        for addr_str in self.inner.detectors.addresses() {
102            if let Some(addr) = atomr_core::actor::Address::parse(&addr_str) {
103                if !self.inner.detectors.is_available(&addr) {
104                    bad.push(addr_str);
105                }
106            }
107        }
108        if bad.is_empty() {
109            return;
110        }
111        let mut terminated = self.inner.terminated_addresses.write();
112        let watches = self.inner.watches.read();
113        for addr in bad {
114            if !terminated.insert(addr.clone()) {
115                continue;
116            }
117            for w in watches.iter() {
118                if w.watchee.address.to_string() == addr {
119                    w.watcher.notify_watchers(w.watchee.clone());
120                }
121            }
122        }
123    }
124
125    fn start_supervisor(self: &Arc<Self>) {
126        if self.inner.started.set(()).is_err() {
127            return;
128        }
129        let this = self.clone();
130        tokio::spawn(async move {
131            let mut tick = tokio::time::interval(Duration::from_secs(1));
132            tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
133            loop {
134                tick.tick().await;
135                this.check();
136            }
137        });
138    }
139
140    pub fn watch_count(&self) -> usize {
141        self.inner.watches.read().len()
142    }
143}
144
145/// Outbound `RemoteRef` proxy used by the daemon's death-watch
146/// book-keeping. Carries the `EndpointManager` + `SerializerRegistry`
147/// so the proxy can serialize and ship `RemoteSystemMsg::Terminated`
148/// over the wire without going through the local mailbox path.
149/// Wraps cheap clones of those handles; constructing the proxy
150/// itself is cheap.
151pub(crate) struct RemoteWatcherProxy {
152    pub path: ActorPath,
153    pub endpoint_manager: Option<EndpointManager>,
154    pub registry: Option<SerializerRegistry>,
155    pub local_uid: u64,
156}
157
158impl std::fmt::Debug for RemoteWatcherProxy {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct("RemoteWatcherProxy").field("path", &self.path).finish()
161    }
162}
163
164impl RemoteWatcherProxy {
165    pub fn new(
166        path: ActorPath,
167        endpoint_manager: EndpointManager,
168        registry: SerializerRegistry,
169        local_uid: u64,
170    ) -> Self {
171        Self { path, endpoint_manager: Some(endpoint_manager), registry: Some(registry), local_uid }
172    }
173}
174
175impl RemoteRef for RemoteWatcherProxy {
176    fn path(&self) -> &ActorPath {
177        &self.path
178    }
179
180    fn tell_serialized(&self, _msg: atomr_core::actor::SerializedMessage) {
181        // Watcher proxies only forward Terminated; user payloads flow
182        // through the regular RemoteActorRef path instead.
183    }
184
185    fn tell_system(&self, msg: RemoteSystemMsg) {
186        let (Some(mgr), Some(reg)) = (self.endpoint_manager.clone(), self.registry.clone()) else {
187            return;
188        };
189        let target = self.path.clone();
190        let local_uid = self.local_uid;
191        let r = RemoteActorRefImpl::new(target, mgr, reg, local_uid);
192        r.tell_system(msg);
193    }
194}