use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
use atomr_core::actor::{ActorPath, RemoteRef, RemoteSystemMsg, UntypedActorRef};
use crate::endpoint_manager::EndpointManager;
use crate::failure_detector_registry::FailureDetectorRegistry;
use crate::remote_ref::RemoteActorRefImpl;
use crate::serialization::SerializerRegistry;
#[derive(Clone, Debug)]
struct Watch {
watcher: UntypedActorRef,
watchee: ActorPath,
}
#[derive(Clone)]
pub struct RemoteWatcher {
inner: Arc<RemoteWatcherInner>,
}
struct RemoteWatcherInner {
endpoint_manager: EndpointManager,
detectors: FailureDetectorRegistry,
registry: SerializerRegistry,
local_uid: u64,
watches: RwLock<Vec<Watch>>,
terminated_addresses: RwLock<HashSet<String>>,
started: std::sync::OnceLock<()>,
}
impl RemoteWatcher {
pub fn new(endpoint_manager: EndpointManager, registry: SerializerRegistry, local_uid: u64) -> Arc<Self> {
let detectors = endpoint_manager.failure_detectors();
Arc::new(Self {
inner: Arc::new(RemoteWatcherInner {
endpoint_manager,
detectors,
registry,
local_uid,
watches: RwLock::new(Vec::new()),
terminated_addresses: RwLock::new(HashSet::new()),
started: std::sync::OnceLock::new(),
}),
})
}
pub async fn watch(
self: &Arc<Self>,
watcher: UntypedActorRef,
watchee: ActorPath,
) -> Result<(), crate::transport::TransportError> {
let target = watchee.address.clone();
let _ = self.inner.endpoint_manager.endpoint_for(&target).await?;
let remote_ref = RemoteActorRefImpl::new(
watchee.clone(),
self.inner.endpoint_manager.clone(),
self.inner.registry.clone(),
self.inner.local_uid,
);
remote_ref.tell_system(RemoteSystemMsg::Watch { watcher: watcher.path().clone() });
self.inner.watches.write().push(Watch { watcher, watchee });
self.start_supervisor();
Ok(())
}
pub async fn unwatch(self: &Arc<Self>, watcher: &UntypedActorRef, watchee: &ActorPath) {
self.inner.watches.write().retain(|w| !(w.watcher.path() == watcher.path() && &w.watchee == watchee));
let target = watchee.address.clone();
if self.inner.endpoint_manager.endpoint_for(&target).await.is_ok() {
let remote_ref = RemoteActorRefImpl::new(
watchee.clone(),
self.inner.endpoint_manager.clone(),
self.inner.registry.clone(),
self.inner.local_uid,
);
remote_ref.tell_system(RemoteSystemMsg::Unwatch { watcher: watcher.path().clone() });
}
}
pub fn check(&self) {
let mut bad: Vec<String> = Vec::new();
for addr_str in self.inner.detectors.addresses() {
if let Some(addr) = atomr_core::actor::Address::parse(&addr_str) {
if !self.inner.detectors.is_available(&addr) {
bad.push(addr_str);
}
}
}
if bad.is_empty() {
return;
}
let mut terminated = self.inner.terminated_addresses.write();
let watches = self.inner.watches.read();
for addr in bad {
if !terminated.insert(addr.clone()) {
continue;
}
for w in watches.iter() {
if w.watchee.address.to_string() == addr {
w.watcher.notify_watchers(w.watchee.clone());
}
}
}
}
fn start_supervisor(self: &Arc<Self>) {
if self.inner.started.set(()).is_err() {
return;
}
let this = self.clone();
tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_secs(1));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tick.tick().await;
this.check();
}
});
}
pub fn watch_count(&self) -> usize {
self.inner.watches.read().len()
}
}
pub(crate) struct RemoteWatcherProxy {
pub path: ActorPath,
pub endpoint_manager: Option<EndpointManager>,
pub registry: Option<SerializerRegistry>,
pub local_uid: u64,
}
impl std::fmt::Debug for RemoteWatcherProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteWatcherProxy").field("path", &self.path).finish()
}
}
impl RemoteWatcherProxy {
pub fn new(
path: ActorPath,
endpoint_manager: EndpointManager,
registry: SerializerRegistry,
local_uid: u64,
) -> Self {
Self { path, endpoint_manager: Some(endpoint_manager), registry: Some(registry), local_uid }
}
}
impl RemoteRef for RemoteWatcherProxy {
fn path(&self) -> &ActorPath {
&self.path
}
fn tell_serialized(&self, _msg: atomr_core::actor::SerializedMessage) {
}
fn tell_system(&self, msg: RemoteSystemMsg) {
let (Some(mgr), Some(reg)) = (self.endpoint_manager.clone(), self.registry.clone()) else {
return;
};
let target = self.path.clone();
let local_uid = self.local_uid;
let r = RemoteActorRefImpl::new(target, mgr, reg, local_uid);
r.tell_system(msg);
}
}