use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt,
future::Future,
sync::LazyLock,
task,
time::Duration,
};
use futures::FutureExt;
use libp2p::{
PeerId, StreamProtocol, request_response,
swarm::{
ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
};
use tokio::{sync::oneshot, task::JoinSet};
use crate::{
actor::ActorId,
error::{ActorStopReason, Infallible, RemoteSendError},
};
use super::wire::{WireActorId, WireActorStopReason, WireRemoteSendError};
use super::_internal::{
REMOTE_ACTORS, REMOTE_MESSAGES, RemoteActorFns, RemoteMessageFns, RemoteMessageRegistrationID,
};
fn proto_name() -> StreamProtocol {
use std::sync::LazyLock;
static NAME: LazyLock<StreamProtocol> = LazyLock::new(|| {
StreamProtocol::try_from_owned(super::session::applied_protocol("/piying/messaging/1.0.0"))
.expect("messaging protocol id must start with '/'")
});
NAME.clone()
}
static REMOTE_ACTORS_MAP: LazyLock<HashMap<&'static str, RemoteActorFns>> = LazyLock::new(|| {
let mut existing_ids = HashSet::new();
for (id, _) in REMOTE_ACTORS {
if !existing_ids.insert(id) {
panic!("duplicate remote actor detected for actor '{id}'");
}
}
REMOTE_ACTORS.iter().copied().collect()
});
static REMOTE_MESSAGES_MAP: LazyLock<
HashMap<RemoteMessageRegistrationID<'static>, RemoteMessageFns>,
> = LazyLock::new(|| {
let mut existing_ids = HashSet::new();
for (id, _) in REMOTE_MESSAGES {
if !existing_ids.insert(id) {
panic!(
"duplicate remote message detected for actor '{}' and message '{}'",
id.actor_remote_id, id.message_remote_id
);
}
}
REMOTE_MESSAGES.iter().copied().collect()
});
type AskResult = Result<Vec<u8>, RemoteSendError<Vec<u8>>>;
type TellResult = Result<(), RemoteSendError>;
type LinkResult = Result<(), RemoteSendError>;
type UnlinkResult = Result<(), RemoteSendError>;
type SignalLinkDiedResult = Result<(), RemoteSendError>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum RequestId {
Local(u64),
Outbound(request_response::OutboundRequestId),
}
impl RequestId {
fn unwrap_outbound(self) -> request_response::OutboundRequestId {
match self {
RequestId::Local(_) => panic!("called unwrap_outbound on a local request id"),
RequestId::Outbound(id) => id,
}
}
}
impl PartialEq<request_response::OutboundRequestId> for RequestId {
fn eq(&self, other: &request_response::OutboundRequestId) -> bool {
match self {
RequestId::Local(_) => false,
RequestId::Outbound(id) => id.eq(other),
}
}
}
impl PartialEq<RequestId> for request_response::OutboundRequestId {
fn eq(&self, other: &RequestId) -> bool {
match other {
RequestId::Local(_) => false,
RequestId::Outbound(other) => self.eq(other),
}
}
}
impl fmt::Display for RequestId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RequestId::Local(id) => write!(f, "{id}"),
RequestId::Outbound(id) => id.fmt(f),
}
}
}
enum ReplyChannel {
Event(RequestId),
Local(oneshot::Sender<SwarmResponse>),
Remote(request_response::ResponseChannel<SwarmResponse>),
}
include!("messaging/protocol.rs");
include!("messaging/events.rs");
include!("messaging/behaviour.rs");
include!("messaging/handlers.rs");
include!("messaging/tests.rs");