use std::{
any,
collections::HashMap,
error, str,
sync::{Arc, LazyLock},
};
use futures::StreamExt;
use libp2p::{
PeerId, SwarmBuilder, mdns, noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux,
};
use tokio::sync::Mutex;
use crate::{
Actor,
actor::{ActorId, ActorRef, WeakActorRef},
error::{RegistryError, RemoteSendError},
links::Links,
mailbox::SignalMailbox,
};
#[doc(hidden)]
pub mod _internal;
mod behaviour;
pub mod messaging;
pub mod registry;
mod swarm;
pub use behaviour::*;
pub use swarm::*;
pub(crate) static REMOTE_REGISTRY: LazyLock<Mutex<HashMap<ActorId, RemoteRegistryActorRef>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
pub(crate) struct RemoteRegistryActorRef {
actor_ref: BoxRegisteredActorRef,
pub(crate) name: Option<Arc<str>>,
pub(crate) signal_mailbox: Box<dyn SignalMailbox>,
pub(crate) links: Links,
}
impl RemoteRegistryActorRef {
pub(crate) fn new<A: Actor>(actor_ref: ActorRef<A>, name: Option<Arc<str>>) -> Self {
let signal_mailbox = actor_ref.weak_signal_mailbox();
let links = actor_ref.links.clone();
RemoteRegistryActorRef {
actor_ref: BoxRegisteredActorRef::Strong(Box::new(actor_ref)),
name,
signal_mailbox,
links,
}
}
pub(crate) fn new_weak<A: Actor>(actor_ref: WeakActorRef<A>, name: Option<Arc<str>>) -> Self {
let signal_mailbox = actor_ref.weak_signal_mailbox();
let links = actor_ref.links.clone();
RemoteRegistryActorRef {
actor_ref: BoxRegisteredActorRef::Weak(Box::new(actor_ref)),
name,
signal_mailbox,
links,
}
}
pub(crate) fn downcast<A: Actor>(
&self,
) -> Result<ActorRef<A>, DowncastRegsiteredActorRefError> {
match &self.actor_ref {
BoxRegisteredActorRef::Strong(any) => any
.downcast_ref::<ActorRef<A>>()
.ok_or(DowncastRegsiteredActorRefError::BadActorType)
.cloned(),
BoxRegisteredActorRef::Weak(any) => any
.downcast_ref::<WeakActorRef<A>>()
.ok_or(DowncastRegsiteredActorRefError::BadActorType)?
.upgrade()
.ok_or(DowncastRegsiteredActorRefError::ActorNotRunning),
}
}
}
pub(crate) enum DowncastRegsiteredActorRefError {
BadActorType,
ActorNotRunning,
}
impl<E> From<DowncastRegsiteredActorRefError> for RemoteSendError<E> {
fn from(err: DowncastRegsiteredActorRefError) -> Self {
match err {
DowncastRegsiteredActorRefError::BadActorType => RemoteSendError::BadActorType,
DowncastRegsiteredActorRefError::ActorNotRunning => RemoteSendError::ActorNotRunning,
}
}
}
pub(crate) enum BoxRegisteredActorRef {
Strong(Box<dyn any::Any + Send + Sync>),
Weak(Box<dyn any::Any + Send + Sync>),
}
pub trait RemoteActor {
const REMOTE_ID: &'static str;
}
pub trait RemoteMessage<M> {
const REMOTE_ID: &'static str;
}
pub fn bootstrap() -> Result<PeerId, Box<dyn error::Error>> {
bootstrap_on("/ip4/0.0.0.0/tcp/0")
}
pub fn bootstrap_on(addr: &str) -> Result<PeerId, Box<dyn error::Error>> {
#[derive(NetworkBehaviour)]
struct BootstrapBehaviour {
kameo: Behaviour,
mdns: mdns::tokio::Behaviour,
}
let mut swarm = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_quic()
.with_behaviour(|key| {
let local_peer_id = key.public().to_peer_id();
let kameo = Behaviour::new(local_peer_id, messaging::Config::default());
let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)?;
Ok(BootstrapBehaviour { kameo, mdns })
})?
.build();
swarm.behaviour().kameo.try_init_global()?;
swarm.listen_on(addr.parse()?)?;
let local_peer_id = *swarm.local_peer_id();
tokio::spawn(async move {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(BootstrapBehaviourEvent::Mdns(mdns::Event::Discovered(
list,
))) => {
for (peer_id, multiaddr) in list {
#[cfg(feature = "tracing")]
tracing::info!("mDNS discovered a new peer: {peer_id}");
swarm.add_peer_address(peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(BootstrapBehaviourEvent::Mdns(mdns::Event::Expired(
list,
))) => {
for (peer_id, _multiaddr) in list {
#[cfg(feature = "tracing")]
tracing::warn!("mDNS discover peer has expired: {peer_id}");
let _ = swarm.disconnect_peer_id(peer_id);
}
}
#[cfg(feature = "tracing")]
SwarmEvent::NewListenAddr { address, .. } => {
tracing::info!("ActorSwarm listening on {address}");
}
_ => {}
}
}
});
Ok(local_peer_id)
}
pub async fn unregister(name: impl Into<Arc<str>>) -> Result<(), RegistryError> {
ActorSwarm::get()
.ok_or(RegistryError::SwarmNotBootstrapped)?
.unregister(name.into())
.await;
Ok(())
}