use std::{
any,
collections::HashMap,
str,
sync::{Arc, LazyLock},
};
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
use std::error;
use futures::StreamExt;
use libp2p::{PeerId, Swarm, swarm::NetworkBehaviour};
use tokio::sync::Mutex;
use crate::{
Actor,
actor::{ActorId, ActorRef, Links, WeakActorRef},
error::{RegistryError, RemoteSendError},
mailbox::SignalMailbox,
};
#[cfg(not(any(feature = "serde-codec", feature = "rkyv-codec")))]
compile_error!("The `remote` feature requires either `serde-codec` or `rkyv-codec`");
#[doc(hidden)]
pub mod _internal;
mod behaviour;
pub mod codec;
#[allow(missing_docs)] pub mod messaging;
pub mod registry;
pub mod session;
mod swarm;
pub mod wire;
pub use behaviour::*;
pub use session::applied_protocol;
pub use swarm::*;
pub(crate) static REMOTE_REGISTRY: LazyLock<Mutex<HashMap<ActorId, RemoteRegistryActorRef>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
pub async fn register_actor_local<A: Actor>(actor_ref: &ActorRef<A>, id: ActorId) {
let entry = RemoteRegistryActorRef::new(actor_ref.clone(), None);
REMOTE_REGISTRY.lock().await.insert(id, entry);
}
pub fn unregister_actor_local(id: &ActorId) -> bool {
match REMOTE_REGISTRY.try_lock() {
Ok(mut registry) => registry.remove(id).is_some(),
Err(_) => false,
}
}
pub(crate) struct RemoteRegistryActorRef {
actor_ref: BoxRegisteredActorRef,
pub(crate) name: Option<Arc<str>>,
pub(crate) signal_mailbox: Box<dyn SignalMailbox>,
pub(crate) links: Links,
}
impl RemoteRegistryActorRef {
pub(crate) fn new<A: Actor>(actor_ref: ActorRef<A>, name: Option<Arc<str>>) -> Self {
let signal_mailbox = actor_ref.weak_signal_mailbox();
let links = actor_ref.links.clone();
Self {
actor_ref: BoxRegisteredActorRef::Strong(Box::new(actor_ref)),
name,
signal_mailbox,
links,
}
}
pub(crate) fn new_weak<A: Actor>(actor_ref: WeakActorRef<A>, name: Option<Arc<str>>) -> Self {
let signal_mailbox = actor_ref.weak_signal_mailbox();
let links = actor_ref.links.clone();
Self {
actor_ref: BoxRegisteredActorRef::Weak(Box::new(actor_ref)),
name,
signal_mailbox,
links,
}
}
pub(crate) fn downcast<A: Actor>(
&self,
) -> Result<ActorRef<A>, DowncastRegsiteredActorRefError> {
match &self.actor_ref {
BoxRegisteredActorRef::Strong(any) => any
.downcast_ref::<ActorRef<A>>()
.ok_or(DowncastRegsiteredActorRefError::BadActorType)
.cloned(),
BoxRegisteredActorRef::Weak(any) => any
.downcast_ref::<WeakActorRef<A>>()
.ok_or(DowncastRegsiteredActorRefError::BadActorType)?
.upgrade()
.ok_or(DowncastRegsiteredActorRefError::ActorNotRunning),
}
}
}
pub(crate) enum DowncastRegsiteredActorRefError {
BadActorType,
ActorNotRunning,
}
impl<E> From<DowncastRegsiteredActorRefError> for RemoteSendError<E> {
fn from(err: DowncastRegsiteredActorRefError) -> Self {
match err {
DowncastRegsiteredActorRefError::BadActorType => RemoteSendError::BadActorType,
DowncastRegsiteredActorRefError::ActorNotRunning => RemoteSendError::ActorNotRunning,
}
}
}
pub(crate) enum BoxRegisteredActorRef {
Strong(Box<dyn any::Any + Send + Sync>),
Weak(Box<dyn any::Any + Send + Sync>),
}
pub trait RemoteActor {
const REMOTE_ID: &'static str;
}
pub trait RemoteMessage<M> {
const REMOTE_ID: &'static str;
}
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
pub fn bootstrap() -> Result<PeerId, Box<dyn error::Error>> {
bootstrap_on("/ip4/0.0.0.0/tcp/0")
}
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
pub fn bootstrap_on(addr: &str) -> Result<PeerId, Box<dyn error::Error>> {
use libp2p::{SwarmBuilder, mdns, noise, swarm::SwarmEvent, tcp, yamux};
#[derive(NetworkBehaviour)]
struct BootstrapBehaviour {
piying: Behaviour,
mdns: mdns::tokio::Behaviour,
}
let mut swarm = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_quic()
.with_behaviour(|key| {
let local_peer_id = key.public().to_peer_id();
let piying = Behaviour::new(local_peer_id, messaging::Config::default());
let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)?;
Ok(BootstrapBehaviour { piying, mdns })
})?
.build();
swarm.behaviour().piying.try_init_global()?;
swarm.listen_on(addr.parse()?)?;
let local_peer_id = *swarm.local_peer_id();
tokio::spawn(async move {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(BootstrapBehaviourEvent::Mdns(mdns::Event::Discovered(
list,
))) => {
for (peer_id, multiaddr) in list {
#[cfg(feature = "tracing")]
tracing::info!("mDNS discovered a new peer: {peer_id}");
swarm.add_peer_address(peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(BootstrapBehaviourEvent::Mdns(mdns::Event::Expired(
list,
))) => {
for (peer_id, _multiaddr) in list {
#[cfg(feature = "tracing")]
tracing::warn!("mDNS discover peer has expired: {peer_id}");
let _ = swarm.disconnect_peer_id(peer_id);
}
}
#[cfg(feature = "tracing")]
SwarmEvent::NewListenAddr { address, .. } => {
tracing::info!("ActorSwarm listening on {address}");
}
_ => {}
}
}
});
Ok(local_peer_id)
}
pub fn run_swarm<B>(mut swarm: Swarm<B>) -> PeerId
where
B: NetworkBehaviour + Send + 'static,
<B as NetworkBehaviour>::ToSwarm: Send,
{
let local_peer_id = *swarm.local_peer_id();
tokio::spawn(async move {
loop {
let _event = swarm.select_next_some().await;
}
});
local_peer_id
}
pub async fn clear_registry() {
REMOTE_REGISTRY.lock().await.clear();
}
pub fn clear_registry_sync() {
loop {
match REMOTE_REGISTRY.try_lock() {
Ok(mut registry) => {
registry.clear();
return;
}
Err(_) => std::thread::yield_now(),
}
}
}
pub fn register_actor_local_sync<A: Actor>(actor_ref: &ActorRef<A>, well_known_id: ActorId) {
loop {
match REMOTE_REGISTRY.try_lock() {
Ok(mut registry) => {
registry.insert(
well_known_id,
RemoteRegistryActorRef::new(actor_ref.clone(), None),
);
return;
}
Err(_) => std::thread::yield_now(),
}
}
}
pub fn is_registered_locally(actor_id: ActorId) -> bool {
match REMOTE_REGISTRY.try_lock() {
Ok(registry) => registry.contains_key(&actor_id),
Err(_) => false,
}
}
pub async fn unregister(name: impl Into<Arc<str>>) -> Result<(), RegistryError> {
ActorSwarm::get()
.ok_or(RegistryError::SwarmNotBootstrapped)?
.unregister(name.into())
.await;
Ok(())
}