use std::task;
use either::Either;
use futures::{StreamExt, stream::FuturesUnordered};
use libp2p::{
Multiaddr, PeerId,
core::{Endpoint, transport::PortUse},
swarm::{
ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect,
ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
},
};
use tokio::sync::mpsc;
use crate::error::{ActorStopReason, SwarmAlreadyBootstrappedError};
use super::{
ActorSwarm, REMOTE_REGISTRY, RemoteRegistryActorRef, SwarmCommand, SwarmSender, messaging,
registry,
};
#[allow(missing_debug_implementations)]
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
pub struct Behaviour<
C: libp2p::request_response::Codec + Clone + Send + 'static = messaging::DefaultCodec,
> {
pub messaging: messaging::Behaviour<C>,
pub registry: registry::Behaviour,
local_peer_id: PeerId,
cmd_tx: mpsc::UnboundedSender<SwarmCommand>,
cmd_rx: mpsc::UnboundedReceiver<SwarmCommand>,
}
#[allow(missing_debug_implementations)]
#[cfg(any(not(feature = "serde-codec"), feature = "rkyv-codec"))]
pub struct Behaviour<C: libp2p::request_response::Codec + Clone + Send + 'static> {
pub messaging: messaging::Behaviour<C>,
pub registry: registry::Behaviour,
local_peer_id: PeerId,
cmd_tx: mpsc::UnboundedSender<SwarmCommand>,
cmd_rx: mpsc::UnboundedReceiver<SwarmCommand>,
}
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
impl Behaviour<messaging::DefaultCodec> {
pub fn new(local_peer_id: PeerId, messaging_config: messaging::Config) -> Self {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let messaging = messaging::Behaviour::new(local_peer_id, messaging_config);
let registry = registry::Behaviour::new(local_peer_id);
Behaviour {
messaging,
registry,
local_peer_id,
cmd_tx,
cmd_rx,
}
}
}
impl<C> Behaviour<C>
where
C: libp2p::request_response::Codec<
Protocol = libp2p::StreamProtocol,
Request = messaging::SwarmRequest,
Response = messaging::SwarmResponse,
> + Clone
+ Send
+ 'static,
{
pub fn with_codec(
local_peer_id: PeerId,
messaging_config: messaging::Config,
codec: C,
) -> Self {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let messaging = messaging::Behaviour::with_codec(local_peer_id, messaging_config, codec);
let registry = registry::Behaviour::new(local_peer_id);
Behaviour {
messaging,
registry,
local_peer_id,
cmd_tx,
cmd_rx,
}
}
pub fn init_global(&self) {
self.try_init_global().unwrap()
}
pub fn try_init_global(&self) -> Result<(), SwarmAlreadyBootstrappedError> {
ActorSwarm::set(self.cmd_tx.clone(), self.local_peer_id)
.map_err(|_| SwarmAlreadyBootstrappedError)?;
Ok(())
}
pub fn swarm_sender(&self) -> SwarmSender {
SwarmSender(self.cmd_tx.clone())
}
fn handle_command(&mut self, cmd: SwarmCommand) -> bool {
match cmd {
SwarmCommand::Lookup { name, reply } => {
self.registry.lookup_with_reply(name, Some(reply));
true }
SwarmCommand::LookupLocal { name, reply } => {
let _ = reply.send(self.registry.lookup_local(&name));
false }
SwarmCommand::Register {
name,
registration,
reply,
} => match self
.registry
.register_with_reply(name, registration, Some(reply))
{
Ok(_) => true, Err((Some(reply), err)) => {
let _ = reply.send(Err(err.into()));
false
}
Err((None, _)) => unreachable!("we should have the reply type here"),
},
SwarmCommand::Unregister { name, reply } => {
self.registry.unregister(&name);
let _ = reply.send(());
false }
SwarmCommand::Ask {
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
reply,
} => {
self.messaging.ask_with_reply(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
Some(reply),
);
true
}
SwarmCommand::Tell {
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
reply,
} => {
self.messaging.tell_with_reply(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
reply,
);
true
}
SwarmCommand::Link {
actor_id,
actor_remote_id,
sibling_id,
sibling_remote_id,
reply,
} => {
self.messaging.link_with_reply(
actor_id,
actor_remote_id,
sibling_id,
sibling_remote_id,
Some(reply),
);
true
}
SwarmCommand::Unlink {
actor_id,
sibling_id,
sibling_remote_id,
reply,
} => {
self.messaging.unlink_with_reply(
sibling_id,
sibling_remote_id,
actor_id,
Some(reply),
);
true
}
SwarmCommand::SignalLinkDied {
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
reply,
} => {
self.messaging.signal_link_died_with_reply(
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
Some(reply),
);
true
}
}
}
}
include!("behaviour/events_and_network.rs");