use std::{
cell::Cell,
cmp, fmt,
hash::{Hash, Hasher},
sync::Arc,
time::Duration,
};
use dyn_clone::DynClone;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::BoxFuture, stream::AbortHandle};
use tokio::{sync::SetOnce, task::JoinHandle, task_local};
#[cfg(feature = "remote")]
use std::marker::PhantomData;
#[cfg(feature = "remote")]
use crate::remote;
#[cfg(feature = "remote")]
use crate::request;
use crate::{
Actor, Reply,
error::{self, ActorStopReason, HookError, Infallible, PanicError, SendError},
links::{ErasedChildSpec, Link, Links},
mailbox::{MailboxSender, Signal, SignalMailbox, WeakMailboxSender},
message::{Message, StreamMessage},
reply::ReplyError,
request::{
AskRequest, RecipientTellRequest, ReplyRecipientAskRequest, ReplyRecipientTellRequest,
TellRequest, WithoutRequestTimeout,
},
};
use super::id::ActorId;
task_local! {
pub(crate) static CURRENT_ACTOR_ID: ActorId;
}
thread_local! {
pub(crate) static CURRENT_THREAD_ACTOR_ID: Cell<Option<ActorId>> = const { Cell::new(None) };
}
pub struct ActorRef<A: Actor> {
id: ActorId,
mailbox_sender: MailboxSender<A>,
abort_handle: AbortHandle,
pub(crate) links: Links,
pub(crate) startup_result: Arc<SetOnce<Result<(), PanicError>>>,
pub(crate) shutdown_result: Arc<SetOnce<Result<ActorStopReason, PanicError>>>,
}
impl<A> ActorRef<A>
where
A: Actor,
{
#[inline]
pub(crate) fn new(
id: ActorId,
mailbox: MailboxSender<A>,
abort_handle: AbortHandle,
links: Links,
startup_result: Arc<SetOnce<Result<(), PanicError>>>,
shutdown_result: Arc<SetOnce<Result<ActorStopReason, PanicError>>>,
) -> Self {
ActorRef {
id,
mailbox_sender: mailbox,
abort_handle,
links,
startup_result,
shutdown_result,
}
}
#[inline]
pub fn id(&self) -> ActorId {
self.id
}
#[inline]
pub fn is_alive(&self) -> bool {
!self.mailbox_sender.is_closed()
}
#[cfg(not(feature = "remote"))]
pub fn register(
&self,
name: impl Into<std::borrow::Cow<'static, str>>,
) -> Result<(), error::RegistryError> {
let was_inserted = crate::registry::ACTOR_REGISTRY
.lock()
.unwrap()
.insert(name, self.clone());
if was_inserted {
Ok(())
} else {
Err(error::RegistryError::NameAlreadyRegistered)
}
}
#[cfg(feature = "remote")]
pub async fn register(&self, name: impl Into<Arc<str>>) -> Result<(), error::RegistryError>
where
A: remote::RemoteActor + 'static,
{
remote::ActorSwarm::get()
.ok_or(error::RegistryError::SwarmNotBootstrapped)?
.register(self.clone(), name.into())
.await
}
#[cfg(not(feature = "remote"))]
pub fn lookup<Q>(name: &Q) -> Result<Option<Self>, error::RegistryError>
where
Q: std::hash::Hash + Eq + ?Sized,
std::borrow::Cow<'static, str>: std::borrow::Borrow<Q>,
{
crate::registry::ACTOR_REGISTRY.lock().unwrap().get(name)
}
#[cfg(feature = "remote")]
pub async fn lookup(name: impl Into<Arc<str>>) -> Result<Option<Self>, error::RegistryError>
where
A: remote::RemoteActor + 'static,
{
remote::ActorSwarm::get()
.ok_or(error::RegistryError::SwarmNotBootstrapped)?
.lookup_local(name.into())
.await
}
#[must_use]
pub fn recipient<M>(self) -> Recipient<M>
where
A: Message<M>,
M: Send + 'static,
{
Recipient::new(self)
}
#[must_use]
pub fn reply_recipient<M>(
self,
) -> ReplyRecipient<M, <A::Reply as Reply>::Ok, <A::Reply as Reply>::Error>
where
A: Message<M>,
M: Send + 'static,
{
ReplyRecipient::new(self)
}
#[must_use = "Downgrade creates a WeakActorRef without destroying the original non-weak actor ref."]
#[inline]
pub fn downgrade(&self) -> WeakActorRef<A> {
WeakActorRef {
id: self.id,
mailbox_sender: self.mailbox_sender.downgrade(),
abort_handle: self.abort_handle.clone(),
links: self.links.clone(),
startup_result: self.startup_result.clone(),
shutdown_result: self.shutdown_result.clone(),
}
}
pub(crate) fn into_downgrade(self) -> WeakActorRef<A> {
WeakActorRef {
id: self.id,
mailbox_sender: self.mailbox_sender.downgrade(),
abort_handle: self.abort_handle,
links: self.links,
startup_result: self.startup_result,
shutdown_result: self.shutdown_result,
}
}
#[inline]
pub fn strong_count(&self) -> usize {
self.mailbox_sender.strong_count()
}
#[inline]
pub fn weak_count(&self) -> usize {
self.mailbox_sender.weak_count()
}
#[inline]
pub fn is_current(&self) -> bool {
CURRENT_ACTOR_ID
.try_with(Clone::clone)
.map(|current_actor_id| current_actor_id == self.id)
.unwrap_or(false)
}
#[inline]
pub async fn stop_gracefully(&self) -> Result<(), SendError> {
self.mailbox_sender
.send(Signal::Stop)
.await
.map_err(|_| SendError::ActorNotRunning(()))
}
#[inline]
pub fn kill(&self) {
self.abort_handle.abort()
}
pub fn get_startup_result(&self) -> Option<Result<(), HookError<A::Error>>>
where
A::Error: Clone,
{
match self.startup_result.get()? {
Ok(()) => Some(Ok(())),
Err(err) => Some(Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone())))),
}
}
pub fn with_startup_result<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(Result<(), HookError<&A::Error>>) -> R,
{
match self.startup_result.get()? {
Ok(()) => Some(f(Ok(()))),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
},
}
}
pub fn get_shutdown_result(&self) -> Option<Result<ActorStopReason, HookError<A::Error>>>
where
A::Error: Clone,
{
match self.shutdown_result.get()? {
Ok(reason) => Some(Ok(reason.clone())),
Err(err) => Some(Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone())))),
}
}
pub fn with_shutdown_result<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(Result<&ActorStopReason, HookError<&A::Error>>) -> R,
{
match self.shutdown_result.get()? {
Ok(reason) => Some(f(Ok(reason))),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
},
}
}
#[inline]
pub async fn wait_for_startup(&self) {
self.startup_result.wait().await;
}
pub async fn wait_for_startup_result(&self) -> Result<(), HookError<A::Error>>
where
A::Error: Clone,
{
match self.startup_result.wait().await {
Ok(()) => Ok(()),
Err(err) => Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone()))),
}
}
pub async fn wait_for_startup_with_result<F, R>(&self, f: F) -> R
where
F: FnOnce(Result<(), HookError<&A::Error>>) -> R,
{
match self.startup_result.wait().await {
Ok(()) => f(Ok(())),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
},
}
}
#[inline]
pub async fn wait_for_shutdown(&self) {
self.mailbox_sender.closed().await
}
pub async fn wait_for_shutdown_result(&self) -> Result<ActorStopReason, HookError<A::Error>>
where
A::Error: Clone,
{
self.mailbox_sender.closed().await;
match self.shutdown_result.wait().await {
Ok(reason) => Ok(reason.clone()),
Err(err) => Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone()))),
}
}
pub async fn wait_for_shutdown_with_result<F, R>(&self, f: F) -> R
where
F: FnOnce(Result<&ActorStopReason, HookError<&A::Error>>) -> R,
{
self.mailbox_sender.closed().await;
match self.shutdown_result.wait().await {
Ok(reason) => f(Ok(reason)),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
},
}
}
#[inline]
#[track_caller]
#[doc(alias = "send")]
pub fn ask<M>(
&self,
msg: M,
) -> AskRequest<'_, A, M, WithoutRequestTimeout, WithoutRequestTimeout>
where
A: Message<M>,
M: Send + 'static,
{
AskRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
#[inline]
#[track_caller]
#[doc(alias = "send_async")]
pub fn tell<M>(&self, msg: M) -> TellRequest<'_, A, M, WithoutRequestTimeout>
where
A: Message<M>,
M: Send + 'static,
{
TellRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
#[inline]
pub async fn link<B: Actor>(&self, sibling_ref: &ActorRef<B>) {
if self.id == sibling_ref.id {
return;
}
if self.id < sibling_ref.id {
let mut this_links = self.links.lock().await;
let mut sibling_links = sibling_ref.links.lock().await;
this_links.sibblings.insert(
sibling_ref.id,
Link::Local(sibling_ref.weak_signal_mailbox()),
);
sibling_links
.sibblings
.insert(self.id, Link::Local(self.weak_signal_mailbox()));
} else {
let mut sibling_links = sibling_ref.links.lock().await;
let mut this_links = self.links.lock().await;
this_links.sibblings.insert(
sibling_ref.id,
Link::Local(sibling_ref.weak_signal_mailbox()),
);
sibling_links
.sibblings
.insert(self.id, Link::Local(self.weak_signal_mailbox()));
}
}
#[inline]
pub fn blocking_link<B: Actor>(&self, sibling_ref: &ActorRef<B>) {
if self.id == sibling_ref.id {
return;
}
if self.id < sibling_ref.id {
let mut this_links = self.links.blocking_lock();
let mut sibling_links = sibling_ref.links.blocking_lock();
this_links.sibblings.insert(
sibling_ref.id,
Link::Local(sibling_ref.weak_signal_mailbox()),
);
sibling_links
.sibblings
.insert(self.id, Link::Local(self.weak_signal_mailbox()));
} else {
let mut sibling_links = sibling_ref.links.blocking_lock();
let mut this_links = self.links.blocking_lock();
this_links.sibblings.insert(
sibling_ref.id,
Link::Local(sibling_ref.weak_signal_mailbox()),
);
sibling_links
.sibblings
.insert(self.id, Link::Local(self.weak_signal_mailbox()));
}
}
pub(crate) async fn link_child(
&self,
child_id: ActorId,
child_links: &Links,
spec: ErasedChildSpec,
) {
if self.id == child_id {
return;
}
if self.id < child_id {
let mut this_links = self.links.lock().await;
let mut child_links = child_links.lock().await;
this_links.children.insert(child_id, spec);
child_links.parent = Some((self.id, Link::Local(self.weak_signal_mailbox())));
} else {
let mut child_links = child_links.lock().await;
let mut this_links = self.links.lock().await;
this_links.children.insert(child_id, spec);
child_links.parent = Some((self.id, Link::Local(self.weak_signal_mailbox())));
}
}
#[cfg(feature = "remote")]
pub async fn link_remote<B>(
&self,
sibling_ref: &RemoteActorRef<B>,
) -> Result<(), error::RemoteSendError<error::Infallible>>
where
A: remote::RemoteActor,
B: Actor + remote::RemoteActor,
{
if self.id == sibling_ref.id {
return Ok(());
}
remote::REMOTE_REGISTRY
.lock()
.await
.entry(self.id)
.or_insert_with(|| remote::RemoteRegistryActorRef::new(self.clone(), None));
self.links.lock().await.sibblings.insert(
sibling_ref.id,
Link::Remote(std::borrow::Cow::Borrowed(B::REMOTE_ID)),
);
remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.link::<A, B>(self.id, sibling_ref.id)
.await
}
#[inline]
pub async fn unlink<B: Actor>(&self, sibling_ref: &ActorRef<B>) {
if self.id == sibling_ref.id {
return;
}
if self.id < sibling_ref.id {
let mut this_links = self.links.lock().await;
let mut sibling_links = sibling_ref.links.lock().await;
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
} else {
let mut sibling_links = sibling_ref.links.lock().await;
let mut this_links = self.links.lock().await;
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
}
}
#[inline]
pub fn blocking_unlink<B: Actor>(&self, sibling_ref: &ActorRef<B>) {
if self.id == sibling_ref.id {
return;
}
if self.id < sibling_ref.id {
let mut this_links = self.links.blocking_lock();
let mut sibling_links = sibling_ref.links.blocking_lock();
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
} else {
let mut sibling_links = sibling_ref.links.blocking_lock();
let mut this_links = self.links.blocking_lock();
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
}
}
#[cfg(feature = "remote")]
pub async fn unlink_remote<B>(
&self,
sibling_ref: &RemoteActorRef<B>,
) -> Result<(), error::RemoteSendError<error::Infallible>>
where
A: remote::RemoteActor,
B: Actor + remote::RemoteActor,
{
if self.id == sibling_ref.id {
return Ok(());
}
self.links.lock().await.sibblings.remove(&sibling_ref.id);
remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.unlink::<B>(self.id, sibling_ref.id)
.await
}
#[allow(clippy::type_complexity)]
pub fn attach_stream<M, S, T, F>(
&self,
mut stream: S,
start_value: T,
finish_value: F,
) -> JoinHandle<Result<S, SendError<StreamMessage<M, T, F>>>>
where
A: Message<StreamMessage<M, T, F>>,
S: Stream<Item = M> + Send + Unpin + 'static,
M: Send + 'static,
T: Send + 'static,
F: Send + 'static,
{
let actor_ref = self.clone();
tokio::spawn(async move {
actor_ref
.tell(StreamMessage::Started(start_value))
.send()
.await?;
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(msg) => {
actor_ref.tell(StreamMessage::Next(msg)).send().await?;
}
None => break,
}
}
_ = actor_ref.wait_for_shutdown() => {
return Ok(stream);
}
}
}
actor_ref
.tell(StreamMessage::Finished(finish_value))
.send()
.await?;
Ok(stream)
})
}
pub fn mailbox_sender(&self) -> &MailboxSender<A> {
&self.mailbox_sender
}
#[cfg(feature = "remote")]
pub async fn into_remote_ref(&self) -> RemoteActorRef<A>
where
A: remote::RemoteActor,
{
let remote_ref = RemoteActorRef::new(
self.id(),
remote::ActorSwarm::get().unwrap().sender().clone(),
);
remote::REMOTE_REGISTRY
.lock()
.await
.entry(self.id())
.or_insert_with(|| remote::RemoteRegistryActorRef::new_weak(self.downgrade(), None));
remote_ref
}
#[cfg(feature = "remote")]
pub fn into_remote_ref_blocking(&self) -> RemoteActorRef<A>
where
A: remote::RemoteActor,
{
let remote_ref = RemoteActorRef::new(
self.id(),
remote::ActorSwarm::get().unwrap().sender().clone(),
);
remote::REMOTE_REGISTRY
.blocking_lock()
.entry(self.id())
.or_insert_with(|| remote::RemoteRegistryActorRef::new_weak(self.downgrade(), None));
remote_ref
}
#[inline]
pub(crate) fn weak_signal_mailbox(&self) -> Box<dyn SignalMailbox> {
Box::new(self.mailbox_sender.downgrade())
}
}
impl<A: Actor> Clone for ActorRef<A> {
fn clone(&self) -> Self {
ActorRef {
id: self.id,
mailbox_sender: self.mailbox_sender.clone(),
abort_handle: self.abort_handle.clone(),
links: self.links.clone(),
startup_result: self.startup_result.clone(),
shutdown_result: self.shutdown_result.clone(),
}
}
}
impl<A: Actor> fmt::Debug for ActorRef<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("ActorRef");
d.field("id", &self.id);
match self.links.try_lock() {
Ok(guard) => {
d.field(
"parent",
&guard.parent.as_ref().map(|(parent_id, _)| parent_id),
)
.field("links", &guard.sibblings.keys());
}
Err(_) => {
d.field("parent", &format_args!("<locked>"))
.field("links", &format_args!("<locked>"));
}
}
d.finish()
}
}
impl<A: Actor> PartialEq for ActorRef<A> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl<A: Actor> Eq for ActorRef<A> {}
impl<A: Actor> PartialOrd for ActorRef<A> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<A: Actor> Ord for ActorRef<A> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.id.cmp(&other.id)
}
}
impl<A: Actor> Hash for ActorRef<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
pub struct ReplyRecipient<M: Send + 'static, Ok: Send + 'static, Err: ReplyError = Infallible> {
pub(crate) handler: Box<dyn ReplyMessageHandler<M, Ok, Err>>,
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> ReplyRecipient<M, Ok, Err> {
fn new<A, AR>(actor_ref: ActorRef<A>) -> Self
where
AR: Reply<Ok = Ok, Error = Err>,
A: Actor + Message<M, Reply = AR>,
{
ReplyRecipient {
handler: Box::new(actor_ref),
}
}
#[must_use]
pub fn erase_reply(self) -> Recipient<M> {
Recipient {
handler: self.handler.upcast(),
}
}
#[inline]
pub fn id(&self) -> ActorId {
self.handler.id()
}
#[inline]
pub fn is_alive(&self) -> bool {
self.handler.is_alive()
}
#[must_use = "Downgrade creates a WeakReplyRecipient without destroying the original non-weak recipient."]
#[inline]
pub fn downgrade(&self) -> WeakReplyRecipient<M, Ok, Err> {
self.handler.reply_downgrade()
}
#[inline]
pub fn strong_count(&self) -> usize {
self.handler.strong_count()
}
#[inline]
pub fn weak_count(&self) -> usize {
self.handler.weak_count()
}
#[inline]
pub fn is_current(&self) -> bool {
self.handler.is_current()
}
#[inline]
pub async fn stop_gracefully(&self) -> Result<(), SendError> {
self.handler.stop_gracefully().await
}
#[inline]
pub fn kill(&self) {
self.handler.kill()
}
#[inline]
pub async fn wait_for_startup(&self) {
self.handler.wait_for_startup().await
}
#[inline]
pub async fn wait_for_shutdown(&self) {
self.handler.wait_for_shutdown().await
}
#[track_caller]
pub fn tell(&self, msg: M) -> ReplyRecipientTellRequest<'_, M, Ok, Err, WithoutRequestTimeout> {
ReplyRecipientTellRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
#[track_caller]
pub fn ask(&self, msg: M) -> ReplyRecipientAskRequest<'_, M, Ok, Err, WithoutRequestTimeout> {
ReplyRecipientAskRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Clone for ReplyRecipient<M, Ok, Err> {
fn clone(&self) -> Self {
ReplyRecipient {
handler: dyn_clone::clone_box(&*self.handler),
}
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> fmt::Debug
for ReplyRecipient<M, Ok, Err>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("ReplyRecipient");
d.field("id", &self.handler.id());
d.finish()
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> PartialEq
for ReplyRecipient<M, Ok, Err>
{
fn eq(&self, other: &Self) -> bool {
self.handler.id() == other.handler.id()
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Eq for ReplyRecipient<M, Ok, Err> {}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> PartialOrd
for ReplyRecipient<M, Ok, Err>
{
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Ord for ReplyRecipient<M, Ok, Err> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.handler.id().cmp(&other.handler.id())
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Hash for ReplyRecipient<M, Ok, Err> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.handler.id().hash(state);
}
}
pub struct Recipient<M: Send + 'static> {
pub(crate) handler: Box<dyn MessageHandler<M>>,
}
impl<M: Send + 'static> Recipient<M> {
fn new<A>(actor_ref: ActorRef<A>) -> Self
where
A: Actor + Message<M>,
{
Recipient {
handler: Box::new(actor_ref),
}
}
#[inline]
pub fn id(&self) -> ActorId {
self.handler.id()
}
#[inline]
pub fn is_alive(&self) -> bool {
self.handler.is_alive()
}
#[must_use = "Downgrade creates a WeakRecipient without destroying the original non-weak recipient."]
#[inline]
pub fn downgrade(&self) -> WeakRecipient<M> {
self.handler.downgrade()
}
#[inline]
pub fn strong_count(&self) -> usize {
self.handler.strong_count()
}
#[inline]
pub fn weak_count(&self) -> usize {
self.handler.weak_count()
}
#[inline]
pub fn is_current(&self) -> bool {
self.handler.is_current()
}
#[inline]
pub async fn stop_gracefully(&self) -> Result<(), SendError> {
self.handler.stop_gracefully().await
}
#[inline]
pub fn kill(&self) {
self.handler.kill()
}
#[inline]
pub async fn wait_for_startup(&self) {
self.handler.wait_for_startup().await
}
#[inline]
pub async fn wait_for_shutdown(&self) {
self.handler.wait_for_shutdown().await
}
#[track_caller]
pub fn tell(&self, msg: M) -> RecipientTellRequest<'_, M, WithoutRequestTimeout> {
RecipientTellRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
}
impl<M: Send + 'static> Clone for Recipient<M> {
fn clone(&self) -> Self {
Recipient {
handler: dyn_clone::clone_box(&*self.handler),
}
}
}
impl<M: Send + 'static> fmt::Debug for Recipient<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("Recipient");
d.field("id", &self.handler.id());
d.finish()
}
}
impl<M: Send + 'static> PartialEq for Recipient<M> {
fn eq(&self, other: &Self) -> bool {
self.handler.id() == other.handler.id()
}
}
impl<M: Send + 'static> Eq for Recipient<M> {}
impl<M: Send + 'static> PartialOrd for Recipient<M> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<M: Send + 'static> Ord for Recipient<M> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.handler.id().cmp(&other.handler.id())
}
}
impl<M: Send + 'static> Hash for Recipient<M> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.handler.id().hash(state);
}
}
#[cfg(feature = "remote")]
pub struct RemoteActorRef<A: Actor> {
id: ActorId,
swarm_tx: remote::SwarmSender,
phantom: PhantomData<fn(&mut A)>,
}
#[cfg(feature = "remote")]
impl<A> RemoteActorRef<A>
where
A: Actor + remote::RemoteActor,
{
pub(crate) fn new(id: ActorId, swarm_tx: remote::SwarmSender) -> Self {
RemoteActorRef {
id,
swarm_tx,
phantom: PhantomData,
}
}
pub fn id(&self) -> ActorId {
self.id
}
pub async fn lookup(name: impl Into<Arc<str>>) -> Result<Option<Self>, error::RegistryError>
where
A: remote::RemoteActor + 'static,
{
remote::ActorSwarm::get()
.ok_or(error::RegistryError::SwarmNotBootstrapped)?
.lookup(name.into())
.await
}
pub fn lookup_all(name: impl Into<Arc<str>>) -> remote::LookupStream<A>
where
A: remote::RemoteActor + 'static,
{
match remote::ActorSwarm::get() {
Some(swarm) => swarm.lookup_all(name.into()),
None => remote::LookupStream::new_err(),
}
}
#[inline]
#[track_caller]
#[doc(alias = "send")]
pub fn ask<'a, M>(
&'a self,
msg: &'a M,
) -> request::RemoteAskRequest<'a, A, M, WithoutRequestTimeout, WithoutRequestTimeout>
where
A: remote::RemoteActor + Message<M> + remote::RemoteMessage<M>,
M: serde::Serialize + Send + 'static,
{
request::RemoteAskRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
#[inline]
#[track_caller]
#[doc(alias = "send_async")]
pub fn tell<'a, M>(
&'a self,
msg: &'a M,
) -> request::RemoteTellRequest<'a, A, M, WithoutRequestTimeout>
where
A: Message<M> + remote::RemoteMessage<M>,
M: Send + 'static,
{
request::RemoteTellRequest::new(
self,
msg,
#[cfg(all(debug_assertions, feature = "tracing"))]
std::panic::Location::caller(),
)
}
pub async fn link_remote<B>(
&self,
sibling_ref: &RemoteActorRef<B>,
) -> Result<(), error::RemoteSendError<error::Infallible>>
where
A: remote::RemoteActor,
B: Actor + remote::RemoteActor,
{
if self.id == sibling_ref.id {
return Ok(());
}
let fut_a = remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.link::<A, B>(self.id, sibling_ref.id);
let fut_b = remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.link::<B, A>(sibling_ref.id, self.id);
tokio::try_join!(fut_a, fut_b)?;
Ok(())
}
pub async fn unlink_remote<B>(
&self,
sibling_ref: &RemoteActorRef<B>,
) -> Result<(), error::RemoteSendError<error::Infallible>>
where
A: remote::RemoteActor,
B: Actor + remote::RemoteActor,
{
if self.id == sibling_ref.id {
return Ok(());
}
let fut_a = remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.unlink::<B>(self.id, sibling_ref.id);
let fut_b = remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.unlink::<A>(sibling_ref.id, self.id);
tokio::try_join!(fut_a, fut_b)?;
Ok(())
}
pub(crate) fn send_to_swarm(&self, msg: remote::SwarmCommand) {
self.swarm_tx.send(msg)
}
}
#[cfg(feature = "remote")]
impl<A: Actor> Clone for RemoteActorRef<A> {
fn clone(&self) -> Self {
RemoteActorRef {
id: self.id,
swarm_tx: self.swarm_tx.clone(),
phantom: PhantomData,
}
}
}
#[cfg(feature = "remote")]
impl<A: Actor> fmt::Debug for RemoteActorRef<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("RemoteActorRef");
d.field("id", &self.id);
d.finish()
}
}
#[cfg(feature = "remote")]
impl<A: Actor> PartialEq for RemoteActorRef<A> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
#[cfg(feature = "remote")]
impl<A: Actor> Eq for RemoteActorRef<A> {}
#[cfg(feature = "remote")]
impl<A: Actor> PartialOrd for RemoteActorRef<A> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
#[cfg(feature = "remote")]
impl<A: Actor> Ord for RemoteActorRef<A> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.id.cmp(&other.id)
}
}
#[cfg(feature = "remote")]
impl<A: Actor> Hash for RemoteActorRef<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
#[cfg(feature = "remote")]
impl<A: Actor> serde::Serialize for RemoteActorRef<A> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut ser = serializer.serialize_struct("RemoteActorRef", 1)?;
ser.serialize_field("id", &self.id)?;
ser.end()
}
}
#[cfg(feature = "remote")]
impl<'de, A: Actor> serde::Deserialize<'de> for RemoteActorRef<A> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor<A>(std::marker::PhantomData<A>);
impl<'de, A: Actor> serde::de::Visitor<'de> for IdVisitor<A> {
type Value = RemoteActorRef<A>;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("struct RemoteActorRef")
}
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
where
V: serde::de::MapAccess<'de>,
{
let mut id = None;
while let Some(key) = map.next_key()? {
match key {
"id" => {
if id.is_some() {
return Err(serde::de::Error::duplicate_field("id"));
}
id = Some(map.next_value()?);
}
_ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
let id = id.ok_or_else(|| serde::de::Error::missing_field("id"))?;
let swarm = remote::ActorSwarm::get()
.ok_or_else(|| serde::de::Error::custom("actor swarm not bootstrapped"))?;
Ok(RemoteActorRef {
id,
swarm_tx: swarm.sender().clone(),
phantom: PhantomData,
})
}
fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error>
where
S: serde::de::SeqAccess<'de>,
{
let id: Option<ActorId> = seq.next_element()?;
let id = id.ok_or_else(|| serde::de::Error::missing_field("id"))?;
let swarm = remote::ActorSwarm::get()
.ok_or_else(|| serde::de::Error::custom("actor swarm not bootstrapped"))?;
Ok(RemoteActorRef {
id,
swarm_tx: swarm.sender().clone(),
phantom: PhantomData,
})
}
}
let visitor = IdVisitor(std::marker::PhantomData);
deserializer.deserialize_struct("RemoteActorRef", &["id"], visitor)
}
}
pub struct WeakActorRef<A: Actor> {
id: ActorId,
mailbox_sender: WeakMailboxSender<A>,
abort_handle: AbortHandle,
pub(crate) links: Links,
pub(crate) startup_result: Arc<SetOnce<Result<(), PanicError>>>,
pub(crate) shutdown_result: Arc<SetOnce<Result<ActorStopReason, PanicError>>>,
}
impl<A: Actor> WeakActorRef<A> {
pub fn id(&self) -> ActorId {
self.id
}
#[inline]
pub fn is_alive(&self) -> bool {
!self.shutdown_result.initialized()
}
#[must_use]
pub fn upgrade(&self) -> Option<ActorRef<A>> {
self.mailbox_sender.upgrade().map(|mailbox| ActorRef {
id: self.id,
mailbox_sender: mailbox,
abort_handle: self.abort_handle.clone(),
links: self.links.clone(),
startup_result: self.startup_result.clone(),
shutdown_result: self.shutdown_result.clone(),
})
}
pub fn strong_count(&self) -> usize {
self.mailbox_sender.strong_count()
}
pub fn weak_count(&self) -> usize {
self.mailbox_sender.weak_count()
}
#[inline]
pub fn is_current(&self) -> bool {
CURRENT_ACTOR_ID
.try_with(Clone::clone)
.map(|current_actor_id| current_actor_id == self.id)
.unwrap_or(false)
}
#[inline]
pub fn kill(&self) {
self.abort_handle.abort()
}
#[inline]
pub async fn wait_for_startup(&self) {
self.startup_result.wait().await;
}
pub async fn wait_for_startup_result(&self) -> Result<(), HookError<A::Error>>
where
A::Error: Clone,
{
match self.startup_result.wait().await {
Ok(()) => Ok(()),
Err(err) => Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone()))),
}
}
pub async fn wait_for_startup_with_result<F, R>(&self, f: F) -> R
where
F: FnOnce(Result<(), HookError<&A::Error>>) -> R,
{
match self.startup_result.wait().await {
Ok(()) => f(Ok(())),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
},
}
}
pub fn get_startup_result(&self) -> Option<Result<(), HookError<A::Error>>>
where
A::Error: Clone,
{
match self.startup_result.get()? {
Ok(()) => Some(Ok(())),
Err(err) => Some(Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone())))),
}
}
pub fn with_startup_result<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(Result<(), HookError<&A::Error>>) -> R,
{
match self.startup_result.get()? {
Ok(()) => Some(f(Ok(()))),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
},
}
}
#[inline]
pub async fn wait_for_shutdown(&self) {
self.shutdown_result.wait().await;
}
pub async fn wait_for_shutdown_result(&self) -> Result<ActorStopReason, HookError<A::Error>>
where
A::Error: Clone,
{
match self.shutdown_result.wait().await {
Ok(reason) => Ok(reason.clone()),
Err(err) => Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone()))),
}
}
pub async fn wait_for_shutdown_with_result<F, R>(&self, f: F) -> R
where
F: FnOnce(Result<&ActorStopReason, HookError<&A::Error>>) -> R,
{
match self.shutdown_result.wait().await {
Ok(reason) => f(Ok(reason)),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => f(Err(HookError::Error(err))),
None => f(Err(HookError::Panicked(err.clone()))),
},
},
}
}
pub fn get_shutdown_result(&self) -> Option<Result<ActorStopReason, HookError<A::Error>>>
where
A::Error: Clone,
{
match self.shutdown_result.get()? {
Ok(reason) => Some(Ok(reason.clone())),
Err(err) => Some(Err(err
.with_downcast_ref(|err: &A::Error| HookError::Error(err.clone()))
.unwrap_or_else(|| HookError::Panicked(err.clone())))),
}
}
pub fn with_shutdown_result<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(Result<&ActorStopReason, HookError<&A::Error>>) -> R,
{
match self.shutdown_result.get()? {
Ok(reason) => Some(f(Ok(reason))),
Err(err) => match err.err.lock() {
Ok(lock) => match lock.downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
Err(poison_err) => match poison_err.get_ref().downcast_ref() {
Some(err) => Some(f(Err(HookError::Error(err)))),
None => Some(f(Err(HookError::Panicked(err.clone())))),
},
},
}
}
#[inline]
pub async fn unlink<B: Actor>(&self, sibling_ref: &ActorRef<B>) {
if self.id == sibling_ref.id {
return;
}
if self.id < sibling_ref.id {
let mut this_links = self.links.lock().await;
let mut sibling_links = sibling_ref.links.lock().await;
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
} else {
let mut sibling_links = sibling_ref.links.lock().await;
let mut this_links = self.links.lock().await;
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
}
}
#[inline]
pub fn blocking_unlink<B: Actor>(&self, sibling_ref: &ActorRef<B>) {
if self.id == sibling_ref.id {
return;
}
if self.id < sibling_ref.id {
let mut this_links = self.links.blocking_lock();
let mut sibling_links = sibling_ref.links.blocking_lock();
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
} else {
let mut sibling_links = sibling_ref.links.blocking_lock();
let mut this_links = self.links.blocking_lock();
this_links.sibblings.remove(&sibling_ref.id);
sibling_links.sibblings.remove(&self.id);
}
}
#[cfg(feature = "remote")]
pub async fn unlink_remote<B>(
&self,
sibling_ref: &RemoteActorRef<B>,
) -> Result<(), error::RemoteSendError<error::Infallible>>
where
A: remote::RemoteActor,
B: Actor + remote::RemoteActor,
{
if self.id == sibling_ref.id {
return Ok(());
}
self.links.lock().await.sibblings.remove(&sibling_ref.id);
remote::ActorSwarm::get()
.ok_or(error::RemoteSendError::SwarmNotBootstrapped)?
.unlink::<B>(self.id, sibling_ref.id)
.await
}
#[inline]
pub fn mailbox_sender(&self) -> &WeakMailboxSender<A> {
&self.mailbox_sender
}
#[cfg(feature = "remote")]
#[inline]
pub(crate) fn weak_signal_mailbox(&self) -> Box<dyn SignalMailbox> {
Box::new(self.mailbox_sender.clone())
}
}
impl<A: Actor> Clone for WeakActorRef<A> {
fn clone(&self) -> Self {
WeakActorRef {
id: self.id,
mailbox_sender: self.mailbox_sender.clone(),
abort_handle: self.abort_handle.clone(),
links: self.links.clone(),
startup_result: self.startup_result.clone(),
shutdown_result: self.shutdown_result.clone(),
}
}
}
impl<A: Actor> fmt::Debug for WeakActorRef<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("WeakActorRef");
d.field("id", &self.id);
match self.links.try_lock() {
Ok(guard) => {
d.field(
"parent",
&guard.parent.as_ref().map(|(parent_id, _)| parent_id),
)
.field("links", &guard.sibblings.keys());
}
Err(_) => {
d.field("parent", &format_args!("<locked>"))
.field("links", &format_args!("<locked>"));
}
}
d.finish()
}
}
impl<A: Actor> PartialEq for WeakActorRef<A> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl<A: Actor> Eq for WeakActorRef<A> {}
impl<A: Actor> PartialOrd for WeakActorRef<A> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<A: Actor> Ord for WeakActorRef<A> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.id.cmp(&other.id)
}
}
impl<A: Actor> Hash for WeakActorRef<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
pub struct WeakRecipient<M: Send + 'static> {
handler: Box<dyn WeakMessageHandler<M>>,
}
impl<M: Send + 'static> WeakRecipient<M> {
fn new<A>(weak_actor_ref: WeakActorRef<A>) -> Self
where
A: Actor + Message<M>,
{
WeakRecipient {
handler: Box::new(weak_actor_ref),
}
}
pub fn id(&self) -> ActorId {
self.handler.id()
}
#[must_use]
pub fn upgrade(&self) -> Option<Recipient<M>> {
self.handler.upgrade()
}
pub fn strong_count(&self) -> usize {
self.handler.strong_count()
}
pub fn weak_count(&self) -> usize {
self.handler.weak_count()
}
}
impl<M: Send + 'static> Clone for WeakRecipient<M> {
fn clone(&self) -> Self {
WeakRecipient {
handler: dyn_clone::clone_box(&*self.handler),
}
}
}
impl<M: Send + 'static> fmt::Debug for WeakRecipient<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("WeakRecipient");
d.field("id", &self.handler.id());
d.finish()
}
}
impl<M: Send + 'static> PartialEq for WeakRecipient<M> {
fn eq(&self, other: &Self) -> bool {
self.handler.id() == other.handler.id()
}
}
impl<M: Send + 'static> Eq for WeakRecipient<M> {}
impl<M: Send + 'static> PartialOrd for WeakRecipient<M> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<M: Send + 'static> Ord for WeakRecipient<M> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.handler.id().cmp(&other.handler.id())
}
}
impl<M: Send + 'static> Hash for WeakRecipient<M> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.handler.id().hash(state);
}
}
pub struct WeakReplyRecipient<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> {
handler: Box<dyn WeakReplyMessageHandler<M, Ok, Err>>,
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> WeakReplyRecipient<M, Ok, Err> {
fn new<A, AR>(weak_actor_ref: WeakActorRef<A>) -> Self
where
AR: Reply<Ok = Ok, Error = Err>,
A: Actor + Message<M, Reply = AR>,
{
WeakReplyRecipient {
handler: Box::new(weak_actor_ref),
}
}
pub fn id(&self) -> ActorId {
self.handler.id()
}
#[must_use]
pub fn upgrade(&self) -> Option<ReplyRecipient<M, Ok, Err>> {
self.handler.reply_upgrade()
}
pub fn strong_count(&self) -> usize {
self.handler.strong_count()
}
pub fn weak_count(&self) -> usize {
self.handler.weak_count()
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Clone
for WeakReplyRecipient<M, Ok, Err>
{
fn clone(&self) -> Self {
WeakReplyRecipient {
handler: dyn_clone::clone_box(&*self.handler),
}
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> fmt::Debug
for WeakReplyRecipient<M, Ok, Err>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("WeakReplyRecipient");
d.field("id", &self.handler.id());
d.finish()
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> PartialEq
for WeakReplyRecipient<M, Ok, Err>
{
fn eq(&self, other: &Self) -> bool {
self.handler.id() == other.handler.id()
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Eq for WeakReplyRecipient<M, Ok, Err> {}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> PartialOrd
for WeakReplyRecipient<M, Ok, Err>
{
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Ord
for WeakReplyRecipient<M, Ok, Err>
{
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.handler.id().cmp(&other.handler.id())
}
}
impl<M: Send + 'static, Ok: Send + 'static, Err: ReplyError> Hash
for WeakReplyRecipient<M, Ok, Err>
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.handler.id().hash(state);
}
}
pub(crate) trait MessageHandler<M: Send + 'static>:
DynClone + Send + Sync + 'static
{
fn id(&self) -> ActorId;
fn is_alive(&self) -> bool;
fn downgrade(&self) -> WeakRecipient<M>;
fn strong_count(&self) -> usize;
fn weak_count(&self) -> usize;
fn is_current(&self) -> bool;
fn stop_gracefully(&self) -> BoxFuture<'_, Result<(), SendError>>;
fn kill(&self);
fn wait_for_startup(&self) -> BoxFuture<'_, ()>;
fn wait_for_shutdown(&self) -> BoxFuture<'_, ()>;
#[allow(clippy::type_complexity)]
fn tell(
&self,
msg: M,
mailbox_timeout: Option<Duration>,
) -> BoxFuture<'_, Result<(), SendError<M>>>;
fn try_tell(&self, msg: M) -> Result<(), SendError<M>>;
fn blocking_tell(&self, msg: M) -> Result<(), SendError<M>>;
}
impl<A, M> MessageHandler<M> for ActorRef<A>
where
A: Actor + Message<M>,
M: Send + 'static,
{
#[inline]
fn id(&self) -> ActorId {
self.id
}
#[inline]
fn is_alive(&self) -> bool {
self.is_alive()
}
#[inline]
fn downgrade(&self) -> WeakRecipient<M> {
WeakRecipient::new(self.downgrade())
}
#[inline]
fn strong_count(&self) -> usize {
self.strong_count()
}
#[inline]
fn weak_count(&self) -> usize {
self.weak_count()
}
#[inline]
fn is_current(&self) -> bool {
self.is_current()
}
#[inline]
fn stop_gracefully(&self) -> BoxFuture<'_, Result<(), SendError>> {
self.stop_gracefully().boxed()
}
#[inline]
fn kill(&self) {
self.kill()
}
#[inline]
fn wait_for_startup(&self) -> BoxFuture<'_, ()> {
self.wait_for_startup().boxed()
}
#[inline]
fn wait_for_shutdown(&self) -> BoxFuture<'_, ()> {
self.wait_for_shutdown().boxed()
}
fn tell(
&self,
msg: M,
mailbox_timeout: Option<Duration>,
) -> BoxFuture<'_, Result<(), SendError<M>>> {
self.tell(msg)
.mailbox_timeout_opt(mailbox_timeout)
.send()
.map_err(|err| {
err.map_err(|_| unreachable!("tell requests don't handle the reply errors"))
})
.boxed()
}
fn try_tell(&self, msg: M) -> Result<(), SendError<M>> {
self.tell(msg).try_send().map_err(|err| {
err.map_err(|_| unreachable!("tell requests don't handle the reply errors"))
})
}
fn blocking_tell(&self, msg: M) -> Result<(), SendError<M>> {
self.tell(msg).blocking_send().map_err(|err| {
err.map_err(|_| unreachable!("tell requests don't handle the reply errors"))
})
}
}
pub(crate) trait ReplyMessageHandler<M: Send + 'static, Ok: Send + 'static, Err: ReplyError>:
MessageHandler<M>
{
#[allow(clippy::type_complexity)]
fn ask(
&self,
msg: M,
mailbox_timeout: Option<Duration>,
) -> BoxFuture<'_, Result<Ok, SendError<M, Err>>>;
fn try_ask(&self, msg: M) -> BoxFuture<'_, Result<Ok, SendError<M, Err>>>;
fn blocking_ask(&self, msg: M) -> Result<Ok, SendError<M, Err>>;
fn reply_downgrade(&self) -> WeakReplyRecipient<M, Ok, Err>;
fn upcast(self: Box<Self>) -> Box<dyn MessageHandler<M>>;
}
impl<A, M, AR, Ok, Err> ReplyMessageHandler<M, Ok, Err> for ActorRef<A>
where
AR: Reply<Ok = Ok, Error = Err>,
A: Actor + Message<M, Reply = AR>,
M: Send + 'static,
Ok: Send + 'static,
Err: ReplyError,
{
#[allow(clippy::type_complexity)]
fn ask(
&self,
msg: M,
mailbox_timeout: Option<Duration>,
) -> BoxFuture<'_, Result<Ok, SendError<M, Err>>> {
self.ask(msg)
.mailbox_timeout_opt(mailbox_timeout)
.send()
.boxed()
}
fn try_ask(&self, msg: M) -> BoxFuture<'_, Result<Ok, SendError<M, Err>>> {
Box::pin(self.ask(msg).try_send())
}
fn blocking_ask(&self, msg: M) -> Result<Ok, SendError<M, Err>> {
self.ask(msg).blocking_send()
}
#[inline]
fn reply_downgrade(&self) -> WeakReplyRecipient<M, Ok, Err> {
WeakReplyRecipient::new(self.downgrade())
}
fn upcast(self: Box<Self>) -> Box<dyn MessageHandler<M>> {
self
}
}
trait WeakMessageHandler<M: Send + 'static>: DynClone + Send + Sync + 'static {
fn id(&self) -> ActorId;
fn upgrade(&self) -> Option<Recipient<M>>;
fn strong_count(&self) -> usize;
fn weak_count(&self) -> usize;
}
impl<A, M> WeakMessageHandler<M> for WeakActorRef<A>
where
A: Actor + Message<M>,
M: Send + 'static,
{
#[inline]
fn id(&self) -> ActorId {
self.id
}
#[inline]
fn upgrade(&self) -> Option<Recipient<M>> {
self.upgrade().map(Recipient::new)
}
#[inline]
fn strong_count(&self) -> usize {
self.strong_count()
}
#[inline]
fn weak_count(&self) -> usize {
self.weak_count()
}
}
trait WeakReplyMessageHandler<M: Send + 'static, Ok: Send + 'static, Err: ReplyError>:
WeakMessageHandler<M>
{
fn reply_upgrade(&self) -> Option<ReplyRecipient<M, Ok, Err>>;
}
impl<A, M, AR, Ok, Err> WeakReplyMessageHandler<M, Ok, Err> for WeakActorRef<A>
where
AR: Reply<Ok = Ok, Error = Err>,
A: Actor + Message<M, Reply = AR>,
M: Send + 'static,
Ok: Send + 'static,
Err: ReplyError,
{
#[inline]
fn reply_upgrade(&self) -> Option<ReplyRecipient<M, Ok, Err>> {
self.upgrade().map(ReplyRecipient::new)
}
}