impl ActorSwarm {
pub fn get() -> Option<Arc<Self>> {
THREAD_LOCAL_SWARM
.with(|tls| tls.borrow().clone())
.or_else(|| ACTOR_SWARM.load_full())
}
pub fn with<R>(f: impl FnOnce(&Self) -> R) -> Option<R> {
let tls = THREAD_LOCAL_SWARM.with(|tls| tls.borrow().clone());
if let Some(ref swarm) = tls {
return Some(f(swarm));
}
let guard = ACTOR_SWARM.load();
(*guard).as_ref().map(|arc| f(arc))
}
pub(crate) fn set(
swarm_tx: mpsc::UnboundedSender<SwarmCommand>,
local_peer_id: PeerId,
) -> Result<(), Self> {
let new = Self {
swarm_tx: SwarmSender(swarm_tx),
local_peer_id,
};
ACTOR_SWARM.store(Some(Arc::new(new)));
Ok(())
}
#[allow(dead_code)]
pub fn set_thread_local(
swarm_tx: mpsc::UnboundedSender<SwarmCommand>,
local_peer_id: PeerId,
) -> SwarmGuard {
let swarm = Arc::new(Self {
swarm_tx: SwarmSender(swarm_tx),
local_peer_id,
});
THREAD_LOCAL_SWARM.with(|tls| *tls.borrow_mut() = Some(swarm));
SwarmGuard {
_not_send: PhantomData,
}
}
pub fn local_peer_id(&self) -> PeerId {
self.local_peer_id
}
pub(crate) fn lookup_local<A: Actor + RemoteActor + 'static>(
&self,
name: Arc<str>,
) -> impl Future<Output = Result<Option<ActorRef<A>>, RegistryError>> {
let reply_rx = self
.swarm_tx
.send_with_reply(|reply| SwarmCommand::LookupLocal { name, reply });
async move {
let Some(ActorRegistration {
actor_id,
remote_id,
}) = reply_rx.await?
else {
return Ok(None);
};
if A::REMOTE_ID != remote_id {
return Err(RegistryError::BadActorType);
}
let registry = REMOTE_REGISTRY.lock().await;
let Some(actor_ref_any) = registry.get(&actor_id) else {
return Ok(None);
};
match actor_ref_any.downcast() {
Ok(actor_ref) => Ok(Some(actor_ref)),
Err(DowncastRegsiteredActorRefError::BadActorType) => {
Err(RegistryError::BadActorType)
}
Err(DowncastRegsiteredActorRefError::ActorNotRunning) => Ok(None),
}
}
}
pub(crate) async fn lookup<A: Actor + RemoteActor>(
&self,
name: Arc<str>,
) -> Result<Option<RemoteActorRef<A>>, RegistryError> {
#[cfg(all(debug_assertions, feature = "tracing"))]
let name_clone = name.clone();
let mut stream = self.lookup_all(name);
let first = stream.next().await.transpose()?;
#[cfg(all(debug_assertions, feature = "tracing"))]
if first.is_some() {
tokio::spawn(async move {
if let Ok(Some(_)) = stream.next().await.transpose() {
tracing::warn!(
"Multiple actors found for '{name_clone}'. Consider using lookup_all() for deterministic behavior when multiple actors may exist."
);
}
});
}
Ok(first)
}
pub(crate) fn lookup_all<A: Actor + RemoteActor>(&self, name: Arc<str>) -> LookupStream<A> {
let (reply_tx, reply_rx) = mpsc::unbounded_channel();
let cmd = SwarmCommand::Lookup {
name,
reply: reply_tx,
};
self.swarm_tx.send(cmd);
let swarm_tx = self.swarm_tx.clone();
LookupStream::new(swarm_tx, reply_rx)
}
pub(crate) fn register<A: Actor + RemoteActor + 'static>(
&self,
actor_ref: ActorRef<A>,
name: Arc<str>,
) -> impl Future<Output = Result<(), RegistryError>> {
let registration = ActorRegistration::new(actor_ref.id(), Cow::Borrowed(A::REMOTE_ID));
let reply_rx = self
.swarm_tx
.send_with_reply(|reply| SwarmCommand::Register {
name: name.clone(),
registration,
reply,
});
async move {
let res = reply_rx.await;
match res {
Ok(()) | Err(RegistryError::QuorumFailed { .. }) => {
REMOTE_REGISTRY.lock().await.insert(
actor_ref.id(),
RemoteRegistryActorRef::new(actor_ref, Some(name)),
);
Ok(())
}
Err(err) => Err(err),
}
}
}
pub fn unregister(&self, name: Arc<str>) -> impl Future<Output = ()> {
let reply_rx = self
.swarm_tx
.send_with_reply(|reply| SwarmCommand::Unregister { name, reply });
async move {
reply_rx.await;
}
}
pub(crate) fn link<A: Actor + RemoteActor, B: Actor + RemoteActor>(
&self,
actor_id: ActorId,
sibling_id: ActorId,
) -> impl Future<Output = Result<(), RemoteSendError<Infallible>>> {
let reply_rx = self.swarm_tx.send_with_reply(|reply| SwarmCommand::Link {
actor_id,
actor_remote_id: Cow::Borrowed(A::REMOTE_ID),
sibling_id,
sibling_remote_id: Cow::Borrowed(B::REMOTE_ID),
reply,
});
async move {
match reply_rx.await {
SwarmResponse::Link(result) => result.map_err(|e| e.into_infallible()),
SwarmResponse::OutboundFailure(err) => Err(err.into_infallible()),
_ => panic!("got an unexpected swarm response"),
}
}
}
pub(crate) fn unlink<B: Actor + RemoteActor>(
&self,
actor_id: ActorId,
sibling_id: ActorId,
) -> impl Future<Output = Result<(), RemoteSendError<Infallible>>> {
let reply_rx = self.swarm_tx.send_with_reply(|reply| SwarmCommand::Unlink {
actor_id,
sibling_id,
sibling_remote_id: Cow::Borrowed(B::REMOTE_ID),
reply,
});
async move {
match reply_rx.await {
SwarmResponse::Unlink(result) => result.map_err(|e| e.into_infallible()),
SwarmResponse::OutboundFailure(err) => Err(err.into_infallible()),
_ => panic!("got an unexpected swarm response"),
}
}
}
pub(crate) fn signal_link_died(
&self,
dead_actor_id: ActorId,
notified_actor_id: ActorId,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: ActorStopReason,
) -> impl Future<Output = Result<(), RemoteSendError<Infallible>>> {
let reply_rx = self
.swarm_tx
.send_with_reply(|reply| SwarmCommand::SignalLinkDied {
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
reply,
});
async move {
match reply_rx.await {
SwarmResponse::SignalLinkDied(result) => result.map_err(|e| e.into_infallible()),
SwarmResponse::OutboundFailure(err) => Err(err.into_infallible()),
_ => panic!("got an unexpected swarm response"),
}
}
}
pub fn sender(&self) -> &SwarmSender {
&self.swarm_tx
}
}