use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt,
future::Future,
sync::LazyLock,
task,
time::Duration,
};
use futures::FutureExt;
use libp2p::{
PeerId, StreamProtocol, request_response,
swarm::{
ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
};
use serde::{Deserialize, Serialize};
use tokio::{sync::oneshot, task::JoinSet};
use crate::{
actor::ActorId,
error::{ActorStopReason, Infallible, RemoteSendError},
};
use super::_internal::{
REMOTE_ACTORS, REMOTE_MESSAGES, RemoteActorFns, RemoteMessageFns, RemoteMessageRegistrationID,
};
const PROTO_NAME: StreamProtocol = StreamProtocol::new("/kameo/messaging/1.0.0");
static REMOTE_ACTORS_MAP: LazyLock<HashMap<&'static str, RemoteActorFns>> = LazyLock::new(|| {
let mut existing_ids = HashSet::new();
for (id, _) in REMOTE_ACTORS {
if !existing_ids.insert(id) {
panic!("duplicate remote actor detected for actor '{id}'");
}
}
REMOTE_ACTORS.iter().copied().collect()
});
static REMOTE_MESSAGES_MAP: LazyLock<
HashMap<RemoteMessageRegistrationID<'static>, RemoteMessageFns>,
> = LazyLock::new(|| {
let mut existing_ids = HashSet::new();
for (id, _) in REMOTE_MESSAGES {
if !existing_ids.insert(id) {
panic!(
"duplicate remote message detected for actor '{}' and message '{}'",
id.actor_remote_id, id.message_remote_id
);
}
}
REMOTE_MESSAGES.iter().copied().collect()
});
type AskResult = Result<Vec<u8>, RemoteSendError<Vec<u8>>>;
type TellResult = Result<(), RemoteSendError>;
type LinkResult = Result<(), RemoteSendError>;
type UnlinkResult = Result<(), RemoteSendError>;
type SignalLinkDiedResult = Result<(), RemoteSendError>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum RequestId {
Local(u64),
Outbound(request_response::OutboundRequestId),
}
impl RequestId {
fn unwrap_outbound(self) -> request_response::OutboundRequestId {
match self {
RequestId::Local(_) => panic!("called unwrap_outbound on a local request id"),
RequestId::Outbound(id) => id,
}
}
}
impl PartialEq<request_response::OutboundRequestId> for RequestId {
fn eq(&self, other: &request_response::OutboundRequestId) -> bool {
match self {
RequestId::Local(_) => false,
RequestId::Outbound(id) => id.eq(other),
}
}
}
impl PartialEq<RequestId> for request_response::OutboundRequestId {
fn eq(&self, other: &RequestId) -> bool {
match other {
RequestId::Local(_) => false,
RequestId::Outbound(other) => self.eq(other),
}
}
}
impl fmt::Display for RequestId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RequestId::Local(id) => write!(f, "{id}"),
RequestId::Outbound(id) => id.fmt(f),
}
}
}
enum ReplyChannel {
Event(RequestId),
Local(oneshot::Sender<SwarmResponse>),
Remote(request_response::ResponseChannel<SwarmResponse>),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum SwarmRequest {
Ask {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
reply_timeout: Option<Duration>,
immediate: bool,
},
Tell {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
immediate: bool,
},
Link {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
},
Unlink {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
},
SignalLinkDied {
dead_actor_id: ActorId,
notified_actor_id: ActorId,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: ActorStopReason,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub enum SwarmResponse {
Ask(Result<Vec<u8>, RemoteSendError<Vec<u8>>>),
Tell(Result<(), RemoteSendError>),
Link(Result<(), RemoteSendError>),
Unlink(Result<(), RemoteSendError>),
SignalLinkDied(Result<(), RemoteSendError>),
OutboundFailure(RemoteSendError),
}
#[derive(Debug)]
pub enum Event {
AskResult {
peer: PeerId,
connection_id: Option<ConnectionId>,
request_id: RequestId,
result: AskResult,
},
TellResult {
peer: PeerId,
connection_id: Option<ConnectionId>,
request_id: RequestId,
result: TellResult,
},
LinkResult {
peer: PeerId,
connection_id: Option<ConnectionId>,
request_id: RequestId,
result: LinkResult,
},
UnlinkResult {
peer: PeerId,
connection_id: Option<ConnectionId>,
request_id: RequestId,
result: UnlinkResult,
},
SignalLinkDiedResult {
peer: PeerId,
connection_id: Option<ConnectionId>,
request_id: RequestId,
result: SignalLinkDiedResult,
},
OutboundFailure {
peer: PeerId,
connection_id: ConnectionId,
request_id: request_response::OutboundRequestId,
error: RemoteSendError,
},
InboundFailure {
peer: PeerId,
connection_id: ConnectionId,
request_id: request_response::InboundRequestId,
error: request_response::InboundFailure,
},
ResponseSent {
peer: PeerId,
connection_id: ConnectionId,
request_id: request_response::InboundRequestId,
},
}
#[derive(Debug, Clone, Copy)]
pub struct Config {
request_timeout: Duration,
max_concurrent_streams: usize,
request_size_maximum: u64,
response_size_maximum: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
request_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
request_size_maximum: 1024 * 1024,
response_size_maximum: 10 * 1024 * 1024,
}
}
}
impl Config {
pub fn with_request_timeout(mut self, v: Duration) -> Self {
self.request_timeout = v;
self
}
pub fn with_max_concurrent_streams(mut self, num_streams: usize) -> Self {
self.max_concurrent_streams = num_streams;
self
}
pub fn with_request_size_maximum(mut self, bytes: u64) -> Self {
self.request_size_maximum = bytes;
self
}
pub fn with_response_size_maximum(mut self, bytes: u64) -> Self {
self.response_size_maximum = bytes;
self
}
}
impl From<Config> for request_response::Config {
fn from(config: Config) -> Self {
request_response::Config::default()
.with_request_timeout(config.request_timeout)
.with_max_concurrent_streams(config.max_concurrent_streams)
}
}
impl<Req, Resp> From<Config> for request_response::cbor::codec::Codec<Req, Resp> {
fn from(config: Config) -> Self {
request_response::cbor::codec::Codec::default()
.set_request_size_maximum(config.request_size_maximum)
.set_response_size_maximum(config.response_size_maximum)
}
}
#[allow(missing_debug_implementations)]
pub struct Behaviour {
request_response: request_response::cbor::Behaviour<SwarmRequest, SwarmResponse>,
local_peer_id: PeerId,
next_id: u64,
requests: HashMap<RequestId, (PeerId, Option<oneshot::Sender<SwarmResponse>>)>,
join_set: JoinSet<(ReplyChannel, SwarmResponse)>,
}
impl Behaviour {
pub fn new(local_peer_id: PeerId, config: Config) -> Self {
let request_response = request_response::cbor::Behaviour::with_codec(
config.into(),
[(PROTO_NAME, request_response::ProtocolSupport::Full)],
config.into(),
);
Behaviour {
request_response,
local_peer_id,
next_id: 0,
requests: HashMap::new(),
join_set: JoinSet::new(),
}
}
#[allow(clippy::too_many_arguments)]
pub fn ask(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
reply_timeout: Option<Duration>,
immediate: bool,
) -> RequestId {
self.ask_with_reply(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
None,
)
.unwrap()
}
pub fn tell(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
immediate: bool,
) -> RequestId {
self.tell_with_reply(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
None,
)
.unwrap()
}
pub fn link(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
) -> RequestId {
self.link_with_reply(
actor_id,
actor_remote_id,
sibling_id,
sibling_remote_id,
None,
)
.unwrap()
}
pub fn unlink(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
) -> RequestId {
self.unlink_with_reply(actor_id, actor_remote_id, sibling_id, None)
.unwrap()
}
pub fn signal_link_died(
&mut self,
dead_actor_id: ActorId,
notified_actor_id: ActorId,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: ActorStopReason,
) -> RequestId {
self.signal_link_died_with_reply(
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
None,
)
.unwrap()
}
#[allow(clippy::too_many_arguments)]
pub(super) fn ask_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
reply_timeout: Option<Duration>,
immediate: bool,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
peer_id,
reply,
(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
),
|(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)| {
ask(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)
.map(SwarmResponse::Ask)
},
move |(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)| SwarmRequest::Ask {
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
},
)
}
#[allow(clippy::too_many_arguments)]
pub(super) fn tell_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
immediate: bool,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
peer_id,
reply,
(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
),
|(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)| {
tell(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)
.map(SwarmResponse::Tell)
},
move |(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)| SwarmRequest::Tell {
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
},
)
}
pub(super) fn link_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
peer_id,
reply,
(actor_id, actor_remote_id, sibling_id, sibling_remote_id),
|(actor_id, actor_remote_id, sibling_id, sibling_remote_id)| {
link(actor_id, actor_remote_id, sibling_id, sibling_remote_id)
.map(SwarmResponse::Link)
},
move |(actor_id, actor_remote_id, sibling_id, sibling_remote_id)| SwarmRequest::Link {
actor_id,
actor_remote_id,
sibling_id,
sibling_remote_id,
},
)
}
pub(super) fn unlink_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
peer_id,
reply,
(actor_id, actor_remote_id, sibling_id),
|(actor_id, actor_remote_id, sibling_id)| {
unlink(actor_id, actor_remote_id, sibling_id).map(SwarmResponse::Unlink)
},
move |(actor_id, actor_remote_id, sibling_id)| SwarmRequest::Unlink {
actor_id,
actor_remote_id,
sibling_id,
},
)
}
pub(super) fn signal_link_died_with_reply(
&mut self,
dead_actor_id: ActorId,
notified_actor_id: ActorId,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: ActorStopReason,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = notified_actor_id
.peer_id()
.expect("swarm should be bootstrapped");
self.request_with_reply(
peer_id,
reply,
(
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
),
|(dead_actor_id, notified_actor_id, notified_actor_remote_id, stop_reason)| {
signal_link_died(
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
)
.map(SwarmResponse::SignalLinkDied)
},
move |(dead_actor_id, notified_actor_id, notified_actor_remote_id, stop_reason)| {
SwarmRequest::SignalLinkDied {
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
}
},
)
}
fn new_local_request_id(&mut self) -> RequestId {
let id = RequestId::Local(self.next_id);
self.next_id += 1;
id
}
fn request_with_reply<L, LF, R, T>(
&mut self,
peer_id: &PeerId,
reply: Option<oneshot::Sender<SwarmResponse>>,
shared_data: T,
local: L,
remote: R,
) -> Option<RequestId>
where
L: FnOnce(T) -> LF,
LF: Future<Output = SwarmResponse> + Send + 'static,
R: FnOnce(T) -> SwarmRequest,
{
if peer_id == &self.local_peer_id {
let (request_id, channel) = match reply {
Some(tx) => (None, ReplyChannel::Local(tx)),
None => {
let request_id = self.new_local_request_id();
(Some(request_id), ReplyChannel::Event(request_id))
}
};
self.join_set
.spawn(local(shared_data).map(|resp| (channel, resp)));
request_id
} else {
let request_id = RequestId::Outbound(
self.request_response
.send_request(peer_id, remote(shared_data)),
);
self.requests.insert(request_id, (*peer_id, reply));
Some(request_id)
}
}
fn handle_request_response_event(
&mut self,
ev: request_response::Event<SwarmRequest, SwarmResponse>,
) -> (bool, Option<Event>) {
match ev {
request_response::Event::Message {
peer,
connection_id,
message,
} => match message {
request_response::Message::Request {
request, channel, ..
} => {
self.handle_incoming_request(request, channel);
(true, None)
}
request_response::Message::Response {
request_id,
response,
} => {
let ev =
self.handle_incoming_response(peer, connection_id, request_id, response);
(false, ev)
}
},
request_response::Event::OutboundFailure {
peer,
connection_id,
request_id,
error,
} => match self.requests.remove(&RequestId::Outbound(request_id)) {
Some((_, Some(tx))) => {
let _ = tx.send(SwarmResponse::OutboundFailure(error.into()));
(false, None)
}
Some((_, None)) | None => (
false,
Some(Event::OutboundFailure {
peer,
connection_id,
request_id,
error: error.into(),
}),
),
},
request_response::Event::InboundFailure {
peer,
connection_id,
request_id,
error,
} => (
false,
Some(Event::InboundFailure {
peer,
connection_id,
request_id,
error,
}),
),
request_response::Event::ResponseSent {
peer,
connection_id,
request_id,
} => (
false,
Some(Event::ResponseSent {
peer,
connection_id,
request_id,
}),
),
}
}
fn handle_incoming_request(
&mut self,
req: SwarmRequest,
channel: request_response::ResponseChannel<SwarmResponse>,
) {
let channel = ReplyChannel::Remote(channel);
match req {
SwarmRequest::Ask {
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
} => {
self.join_set.spawn(
ask(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)
.map(|res| (channel, SwarmResponse::Ask(res))),
);
}
SwarmRequest::Tell {
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
} => {
self.join_set.spawn(
tell(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)
.map(|res| (channel, SwarmResponse::Tell(res))),
);
}
SwarmRequest::Link {
actor_id,
actor_remote_id,
sibling_id,
sibling_remote_id,
} => {
self.join_set.spawn(
link(actor_id, actor_remote_id, sibling_id, sibling_remote_id)
.map(|res| (channel, SwarmResponse::Link(res))),
);
}
SwarmRequest::Unlink {
actor_id,
actor_remote_id,
sibling_id,
} => {
self.join_set.spawn(
unlink(actor_id, actor_remote_id, sibling_id)
.map(|res| (channel, SwarmResponse::Unlink(res))),
);
}
SwarmRequest::SignalLinkDied {
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
} => {
self.join_set.spawn(
signal_link_died(
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
)
.map(|res| (channel, SwarmResponse::SignalLinkDied(res))),
);
}
}
}
fn handle_incoming_response(
&mut self,
peer: PeerId,
connection_id: ConnectionId,
req_id: request_response::OutboundRequestId,
res: SwarmResponse,
) -> Option<Event> {
match self.requests.remove(&RequestId::Outbound(req_id)) {
Some((_, Some(tx))) => {
let _ = tx.send(res);
None
}
Some((_, None)) => {
Some(Event::from_swarm_resp(
res,
peer,
Some(connection_id),
RequestId::Outbound(req_id),
))
}
None => {
#[cfg(feature = "tracing")]
tracing::warn!(%peer, %connection_id, %req_id, ?res, "unrecognised request id for response");
None
}
}
}
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler =
THandler<request_response::cbor::Behaviour<SwarmRequest, SwarmResponse>>;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: libp2p::PeerId,
local_addr: &libp2p::Multiaddr,
remote_addr: &libp2p::Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.request_response.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: libp2p::PeerId,
addr: &libp2p::Multiaddr,
role_override: libp2p::core::Endpoint,
port_use: libp2p::core::transport::PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
self.request_response
.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)
}
fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
if let FromSwarm::DialFailure(DialFailure {
peer_id: Some(peer_id),
..
}) = event
{
let dead_requests = self
.requests
.extract_if(|_, (req_peer_id, _)| req_peer_id == &peer_id);
for (_, (_, reply)) in dead_requests {
if let Some(tx) = reply {
let _ = tx.send(SwarmResponse::OutboundFailure(RemoteSendError::DialFailure));
}
}
}
self.request_response.on_swarm_event(event)
}
fn on_connection_handler_event(
&mut self,
peer_id: libp2p::PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.request_response
.on_connection_handler_event(peer_id, connection_id, event)
}
fn poll(
&mut self,
cx: &mut task::Context<'_>,
) -> task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
loop {
match self.join_set.poll_join_next(cx) {
task::Poll::Ready(Some(Ok((ch, res)))) => {
match ch {
ReplyChannel::Event(request_id) => {
return task::Poll::Ready(ToSwarm::GenerateEvent(
Event::from_swarm_resp(res, self.local_peer_id, None, request_id),
));
}
ReplyChannel::Local(tx) => {
let _ = tx.send(res);
continue;
}
ReplyChannel::Remote(ch) => {
let _ = self.request_response.send_response(ch, res);
continue;
}
}
}
task::Poll::Ready(Some(Err(err))) => {
panic!("ask request futures should never fail: {err}");
}
task::Poll::Ready(None) => {
}
task::Poll::Pending => {
}
}
match self.request_response.poll(cx) {
task::Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
let (wake, ev) = self.handle_request_response_event(ev);
if let Some(ev) = ev {
if wake {
cx.waker().wake_by_ref();
}
return task::Poll::Ready(ToSwarm::GenerateEvent(ev));
}
if wake {
continue;
}
continue;
}
task::Poll::Ready(other_ev) => {
return task::Poll::Ready(
other_ev.map_out(|_| unreachable!("we handled GenerateEvent above")),
);
}
task::Poll::Pending => {
match self.join_set.poll_join_next(cx) {
task::Poll::Ready(Some(Ok((ch, res)))) => match ch {
ReplyChannel::Event(request_id) => {
return task::Poll::Ready(ToSwarm::GenerateEvent(
Event::from_swarm_resp(
res,
self.local_peer_id,
None,
request_id,
),
));
}
ReplyChannel::Local(tx) => {
let _ = tx.send(res);
continue; }
ReplyChannel::Remote(ch) => {
let _ = self.request_response.send_response(ch, res);
continue; }
},
task::Poll::Ready(Some(Err(err))) => {
panic!("ask request futures should never fail: {err}");
}
task::Poll::Ready(None) | task::Poll::Pending => {
return task::Poll::Pending;
}
}
}
}
}
}
}
impl Event {
fn from_swarm_resp(
resp: SwarmResponse,
peer: PeerId,
connection_id: Option<ConnectionId>,
request_id: RequestId,
) -> Self {
match resp {
SwarmResponse::Ask(result) => Event::AskResult {
peer,
connection_id,
request_id,
result,
},
SwarmResponse::Tell(result) => Event::TellResult {
peer,
connection_id,
request_id,
result,
},
SwarmResponse::Link(result) => Event::LinkResult {
peer,
connection_id,
request_id,
result,
},
SwarmResponse::Unlink(result) => Event::UnlinkResult {
peer,
connection_id,
request_id,
result,
},
SwarmResponse::SignalLinkDied(result) => Event::SignalLinkDiedResult {
peer,
connection_id,
request_id,
result,
},
SwarmResponse::OutboundFailure(error) => Event::OutboundFailure {
peer,
connection_id: connection_id.unwrap(),
request_id: request_id.unwrap_outbound(),
error,
},
}
}
}
async fn ask(
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
reply_timeout: Option<Duration>,
immediate: bool,
) -> Result<Vec<u8>, RemoteSendError<Vec<u8>>> {
let Some(fns) = REMOTE_MESSAGES_MAP.get(&RemoteMessageRegistrationID {
actor_remote_id: &actor_remote_id,
message_remote_id: &message_remote_id,
}) else {
return Err(RemoteSendError::UnknownMessage {
actor_remote_id,
message_remote_id,
});
};
if immediate {
(fns.try_ask)(actor_id, payload, reply_timeout).await
} else {
(fns.ask)(actor_id, payload, mailbox_timeout, reply_timeout).await
}
}
async fn tell(
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
immediate: bool,
) -> Result<(), RemoteSendError> {
let Some(fns) = REMOTE_MESSAGES_MAP.get(&RemoteMessageRegistrationID {
actor_remote_id: &actor_remote_id,
message_remote_id: &message_remote_id,
}) else {
return Err(RemoteSendError::UnknownMessage {
actor_remote_id,
message_remote_id,
});
};
if immediate {
(fns.try_tell)(actor_id, payload).await
} else {
(fns.tell)(actor_id, payload, mailbox_timeout).await
}
}
async fn link(
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
) -> Result<(), RemoteSendError<Infallible>> {
let Some(fns) = REMOTE_ACTORS_MAP.get(&*actor_remote_id) else {
return Err(RemoteSendError::UnknownActor { actor_remote_id });
};
(fns.link)(actor_id, sibling_id, sibling_remote_id).await
}
async fn unlink(
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
) -> Result<(), RemoteSendError<Infallible>> {
let Some(fns) = REMOTE_ACTORS_MAP.get(&*actor_remote_id) else {
return Err(RemoteSendError::UnknownActor { actor_remote_id });
};
(fns.unlink)(actor_id, sibling_id).await
}
async fn signal_link_died(
dead_actor_id: ActorId,
notified_actor_id: ActorId,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: ActorStopReason,
) -> Result<(), RemoteSendError<Infallible>> {
let Some(fns) = REMOTE_ACTORS_MAP.get(&*notified_actor_remote_id) else {
return Err(RemoteSendError::UnknownActor {
actor_remote_id: notified_actor_remote_id,
});
};
(fns.signal_link_died)(dead_actor_id, notified_actor_id, stop_reason).await
}