use std::{
any::{Any, TypeId},
collections::HashMap,
sync::LazyLock,
};
use futures::FutureExt as _;
use super::*;
use crate::{Addr, event_loop::EventLoop};
type AnyBox = Box<dyn Any + Send + Sync>;
static REGISTRY: LazyLock<async_lock::RwLock<HashMap<TypeId, AnyBox>>> =
LazyLock::new(Default::default);
impl<A: Service> Addr<A> {
pub async fn register(self) -> crate::error::Result<(Self, Option<Self>)> {
let key = TypeId::of::<A>();
let mut registry = REGISTRY.write().await;
log::trace!("registering service {}", std::any::type_name::<A>());
let replaced = if let Some(existing) = registry.get(&key) {
if existing
.downcast_ref::<Addr<A>>()
.is_some_and(Addr::stopped)
{
registry
.insert(key, Box::new(self.clone()))
.and_then(|addr| addr.downcast::<Addr<A>>().ok())
.map(|addr| *addr)
} else {
return Err(crate::error::ActorError::ServiceStillRunning);
}
} else {
registry
.insert(key, Box::new(self.clone()))
.and_then(|addr| addr.downcast::<Addr<A>>().ok())
.map(|addr| *addr)
};
Ok((self, replaced))
}
pub async fn replace(self) -> Option<Self> {
let key = TypeId::of::<A>();
log::trace!("replacing service {}", std::any::type_name::<A>());
let mut registry = REGISTRY.write().await;
registry
.insert(key, Box::new(self.clone()))
.and_then(|addr| addr.downcast::<Addr<A>>().ok())
.map(|addr| *addr)
}
pub async fn unregister(self) -> Option<Addr<A>> {
let key = TypeId::of::<A>();
log::trace!("unregistering service {}", std::any::type_name::<A>());
let mut registry = REGISTRY.write().await;
registry
.remove(&key)
.and_then(|addr| addr.downcast::<Addr<A>>().ok())
.map(|addr| *addr)
}
}
pub trait Service: Actor + Default {
fn setup() -> impl Future<Output = DynResult<()>> {
Self::from_registry_and_spawn().map(|_| Ok(()))
}
fn already_running() -> impl Future<Output = Option<bool>> {
async {
let key = TypeId::of::<Self>();
let registry = REGISTRY.read().await;
registry
.get(&key)
.and_then(|addr| addr.downcast_ref::<Addr<Self>>().map(Addr::stopped))
}
}
fn from_registry() -> impl Future<Output = Addr<Self>> {
log::trace!(
"getting service from registry {}",
std::any::type_name::<Self>()
);
Self::from_registry_and_spawn()
}
fn try_from_registry() -> Option<Addr<Self>> {
let key = TypeId::of::<Self>();
log::trace!("trying to get service from registry");
REGISTRY
.try_read()?
.get(&key)
.and_then(|addr| addr.downcast_ref::<Addr<Self>>())
.filter(|addr| addr.running())
.cloned()
}
fn unregister() -> impl Future<Output = Option<Addr<Self>>> {
async {
let key = TypeId::of::<Self>();
log::trace!("unregistering service {}", std::any::type_name::<Self>());
let mut registry = REGISTRY.write().await;
registry
.remove(&key)
.and_then(|addr| addr.downcast::<Addr<Self>>().ok())
.map(|addr| *addr)
}
}
}
pub(crate) trait SpawnableService: Service {
#[allow(clippy::async_yields_async)]
fn from_registry_and_spawn() -> impl Future<Output = Addr<Self>> {
log::trace!(
"spawning new instance of {} service in registry",
std::any::type_name::<Self>()
);
async {
let key = TypeId::of::<Self>();
let mut registry = REGISTRY.write().await;
if let Some(addr) = registry
.get_mut(&key)
.and_then(|addr| addr.downcast_ref::<Addr<Self>>())
.map(ToOwned::to_owned)
.filter(Addr::running)
{
log::trace!("service already running {}", std::any::type_name::<Self>());
addr
} else {
log::trace!("spawning new service {}", std::any::type_name::<Self>());
let (event_loop, addr) = EventLoop::unbounded().create(Self::default());
let handle = ActorHandle::spawn(event_loop);
handle.detach();
registry.insert(key, Box::new(addr.clone()));
debug_assert!(addr.ping().await.is_ok(), "service failed ping");
addr
}
}
}
}
impl<A> SpawnableService for A where A: Service {}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use crate::{
Service,
actor::tests::{Identify, Ping, TokioActor},
prelude::Spawnable as _,
};
#[test_log::test(tokio::test)]
async fn get_service_from_registry() {
#[derive(Default)]
struct Me;
type Svc = TokioActor<Me>;
let mut svc_addr = Svc::from_registry().await;
assert!(!svc_addr.stopped());
svc_addr.call(Ping).await.unwrap();
svc_addr.stop().unwrap();
svc_addr.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn reregistering_service_only_if_stopped() {
#[derive(Default)]
struct Me;
type Svc = TokioActor<Me>;
let (first_svc, replaced) = crate::setup_actor(Svc::new(1337))
.unbounded()
.spawn()
.register()
.await
.unwrap();
assert!(replaced.is_none());
assert_eq!(first_svc.call(Identify).await, Ok(1337));
let first_svc_again = Svc::from_registry().await;
assert_eq!(first_svc_again.call(Identify).await, Ok(1337));
first_svc_again.halt().await.unwrap();
let (second_svc, replaced_first) = Svc::new(1338).spawn().register().await.unwrap();
assert!(replaced_first.is_some());
assert_eq!(second_svc.call(Identify).await, Ok(1338));
assert!(replaced_first.unwrap().call(Identify).await.is_err());
assert!(Svc::new(1338).spawn().register().await.is_err());
}
#[test_log::test(tokio::test)]
async fn replace_service() {
#[derive(Default)]
struct Me;
type Svc = TokioActor<Me>;
let (first_svc, _) = Svc::new(1000).spawn().register().await.unwrap();
assert_eq!(first_svc.call(Identify).await, Ok(1000));
let second_svc = Svc::new(2000).spawn();
let replaced = second_svc.replace().await;
assert!(replaced.is_some());
let old_svc = replaced.unwrap();
assert_eq!(old_svc.call(Identify).await, Ok(1000));
let from_registry = Svc::from_registry().await;
assert_eq!(from_registry.call(Identify).await, Ok(2000));
let mut current_svc = Svc::from_registry().await;
current_svc.stop().unwrap();
current_svc.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn replace_service_when_none_exists() {
#[derive(Default)]
struct Me;
type Svc = TokioActor<Me>;
let new_svc = Svc::new(3000).spawn();
let replaced = new_svc.replace().await;
assert!(replaced.is_none());
let from_registry = Svc::from_registry().await;
assert_eq!(from_registry.call(Identify).await, Ok(3000));
let mut current_svc = Svc::from_registry().await;
current_svc.stop().unwrap();
current_svc.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn unregister_service() {
#[derive(Default)]
struct Me;
type Svc = TokioActor<Me>;
let (svc, _) = Svc::new(42).spawn().register().await.unwrap();
assert_eq!(svc.call(Identify).await, Ok(42));
let unregistered = svc.unregister().await;
assert!(unregistered.is_some());
let unregistered2 = Svc::unregister().await;
assert!(unregistered2.is_none());
let unregistered_svc = unregistered.unwrap();
assert_eq!(unregistered_svc.call(Identify).await, Ok(42));
let new_from_registry = Svc::from_registry().await;
assert_eq!(new_from_registry.call(Identify).await, Ok(0)); }
#[test_log::test(tokio::test)]
async fn already_running_service() {
#[derive(Default)]
struct Me;
type Svc = TokioActor<Me>;
let running_status = Svc::already_running().await;
assert!(running_status.is_none());
let (svc, _) = Svc::new(0).spawn().register().await.unwrap();
let running_status = Svc::already_running().await;
assert_eq!(running_status, Some(false));
let mut svc = svc;
svc.stop().unwrap();
svc.await.unwrap();
let running_status = Svc::already_running().await;
assert_eq!(running_status, Some(true));
Svc::unregister().await;
let running_status = Svc::already_running().await;
assert!(running_status.is_none());
}
}