#[derive(Debug)]
pub struct SwarmGuard {
_not_send: PhantomData<*const ()>,
}
impl Drop for SwarmGuard {
fn drop(&mut self) {
THREAD_LOCAL_SWARM.with(|tls| *tls.borrow_mut() = None);
}
}
#[derive(Clone, Debug)]
pub struct SwarmSender(pub(crate) mpsc::UnboundedSender<SwarmCommand>);
impl SwarmSender {
pub(crate) fn send(&self, cmd: SwarmCommand) {
if self.0.send(cmd).is_err() {
#[cfg(feature = "tracing")]
tracing::warn!("SwarmSender: swarm channel closed (likely test isolation)");
}
}
fn send_with_reply<T>(
&self,
cmd_fn: impl FnOnce(oneshot::Sender<T>) -> SwarmCommand,
) -> SwarmFuture<T> {
let (reply_tx, reply_rx) = oneshot::channel();
let cmd = cmd_fn(reply_tx);
self.send(cmd);
SwarmFuture(reply_rx)
}
}
#[derive(Debug)]
pub(crate) enum SwarmCommand {
Lookup {
name: Arc<str>,
reply: LookupReply,
},
LookupLocal {
name: Arc<str>,
reply: LookupLocalReply,
},
Register {
name: Arc<str>,
registration: ActorRegistration<'static>,
reply: RegisterReply,
},
Unregister {
name: Arc<str>,
reply: UnregisterReply,
},
Ask {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
reply_timeout: Option<Duration>,
immediate: bool,
reply: oneshot::Sender<SwarmResponse>,
},
Tell {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
immediate: bool,
reply: Option<oneshot::Sender<SwarmResponse>>,
},
Link {
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
reply: oneshot::Sender<SwarmResponse>,
},
Unlink {
actor_id: ActorId,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
reply: oneshot::Sender<SwarmResponse>,
},
SignalLinkDied {
dead_actor_id: ActorId,
notified_actor_id: ActorId,
notified_actor_remote_id: Cow<'static, str>,
stop_reason: ActorStopReason,
reply: oneshot::Sender<SwarmResponse>,
},
}
#[derive(Debug)]
struct SwarmFuture<T>(oneshot::Receiver<T>);
impl<T> Future for SwarmFuture<T> {
type Output = T;
fn poll(mut self: pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
task::Poll::Ready(
ready!(self.0.poll_unpin(cx))
.expect("the oneshot sender should never be dropped before being sent to"),
)
}
}