atomr_remote/
remote_watcher.rs1use std::collections::HashSet;
11use std::sync::Arc;
12use std::time::Duration;
13
14use parking_lot::RwLock;
15
16use atomr_core::actor::{ActorPath, RemoteRef, RemoteSystemMsg, UntypedActorRef};
17
18use crate::endpoint_manager::EndpointManager;
19use crate::failure_detector_registry::FailureDetectorRegistry;
20use crate::remote_ref::RemoteActorRefImpl;
21use crate::serialization::SerializerRegistry;
22
23#[derive(Clone, Debug)]
24struct Watch {
25 watcher: UntypedActorRef,
26 watchee: ActorPath,
27}
28
29#[derive(Clone)]
30pub struct RemoteWatcher {
31 inner: Arc<RemoteWatcherInner>,
32}
33
34struct RemoteWatcherInner {
35 endpoint_manager: EndpointManager,
36 detectors: FailureDetectorRegistry,
37 registry: SerializerRegistry,
38 local_uid: u64,
39 watches: RwLock<Vec<Watch>>,
40 terminated_addresses: RwLock<HashSet<String>>,
41 started: std::sync::OnceLock<()>,
42}
43
44impl RemoteWatcher {
45 pub fn new(endpoint_manager: EndpointManager, registry: SerializerRegistry, local_uid: u64) -> Arc<Self> {
46 let detectors = endpoint_manager.failure_detectors();
47 Arc::new(Self {
48 inner: Arc::new(RemoteWatcherInner {
49 endpoint_manager,
50 detectors,
51 registry,
52 local_uid,
53 watches: RwLock::new(Vec::new()),
54 terminated_addresses: RwLock::new(HashSet::new()),
55 started: std::sync::OnceLock::new(),
56 }),
57 })
58 }
59
60 pub async fn watch(
63 self: &Arc<Self>,
64 watcher: UntypedActorRef,
65 watchee: ActorPath,
66 ) -> Result<(), crate::transport::TransportError> {
67 let target = watchee.address.clone();
68 let _ = self.inner.endpoint_manager.endpoint_for(&target).await?;
71 let remote_ref = RemoteActorRefImpl::new(
72 watchee.clone(),
73 self.inner.endpoint_manager.clone(),
74 self.inner.registry.clone(),
75 self.inner.local_uid,
76 );
77 remote_ref.tell_system(RemoteSystemMsg::Watch { watcher: watcher.path().clone() });
78 self.inner.watches.write().push(Watch { watcher, watchee });
79 self.start_supervisor();
80 Ok(())
81 }
82
83 pub async fn unwatch(self: &Arc<Self>, watcher: &UntypedActorRef, watchee: &ActorPath) {
84 self.inner.watches.write().retain(|w| !(w.watcher.path() == watcher.path() && &w.watchee == watchee));
85 let target = watchee.address.clone();
86 if self.inner.endpoint_manager.endpoint_for(&target).await.is_ok() {
87 let remote_ref = RemoteActorRefImpl::new(
88 watchee.clone(),
89 self.inner.endpoint_manager.clone(),
90 self.inner.registry.clone(),
91 self.inner.local_uid,
92 );
93 remote_ref.tell_system(RemoteSystemMsg::Unwatch { watcher: watcher.path().clone() });
94 }
95 }
96
97 pub fn check(&self) {
100 let mut bad: Vec<String> = Vec::new();
101 for addr_str in self.inner.detectors.addresses() {
102 if let Some(addr) = atomr_core::actor::Address::parse(&addr_str) {
103 if !self.inner.detectors.is_available(&addr) {
104 bad.push(addr_str);
105 }
106 }
107 }
108 if bad.is_empty() {
109 return;
110 }
111 let mut terminated = self.inner.terminated_addresses.write();
112 let watches = self.inner.watches.read();
113 for addr in bad {
114 if !terminated.insert(addr.clone()) {
115 continue;
116 }
117 for w in watches.iter() {
118 if w.watchee.address.to_string() == addr {
119 w.watcher.notify_watchers(w.watchee.clone());
120 }
121 }
122 }
123 }
124
125 fn start_supervisor(self: &Arc<Self>) {
126 if self.inner.started.set(()).is_err() {
127 return;
128 }
129 let this = self.clone();
130 tokio::spawn(async move {
131 let mut tick = tokio::time::interval(Duration::from_secs(1));
132 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
133 loop {
134 tick.tick().await;
135 this.check();
136 }
137 });
138 }
139
140 pub fn watch_count(&self) -> usize {
141 self.inner.watches.read().len()
142 }
143}
144
145pub(crate) struct RemoteWatcherProxy {
152 pub path: ActorPath,
153 pub endpoint_manager: Option<EndpointManager>,
154 pub registry: Option<SerializerRegistry>,
155 pub local_uid: u64,
156}
157
158impl std::fmt::Debug for RemoteWatcherProxy {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("RemoteWatcherProxy").field("path", &self.path).finish()
161 }
162}
163
164impl RemoteWatcherProxy {
165 pub fn new(
166 path: ActorPath,
167 endpoint_manager: EndpointManager,
168 registry: SerializerRegistry,
169 local_uid: u64,
170 ) -> Self {
171 Self { path, endpoint_manager: Some(endpoint_manager), registry: Some(registry), local_uid }
172 }
173}
174
175impl RemoteRef for RemoteWatcherProxy {
176 fn path(&self) -> &ActorPath {
177 &self.path
178 }
179
180 fn tell_serialized(&self, _msg: atomr_core::actor::SerializedMessage) {
181 }
184
185 fn tell_system(&self, msg: RemoteSystemMsg) {
186 let (Some(mgr), Some(reg)) = (self.endpoint_manager.clone(), self.registry.clone()) else {
187 return;
188 };
189 let target = self.path.clone();
190 let local_uid = self.local_uid;
191 let r = RemoteActorRefImpl::new(target, mgr, reg, local_uid);
192 r.tell_system(msg);
193 }
194}