use std::fmt;
use std::sync::{Arc, Weak};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use super::actor_cell::SystemMsg;
use super::actor_system::ActorSystemInner;
use super::path::ActorPath;
use super::remote::{RemoteRef, RemoteSystemMsg, SerializedMessage};
use super::sender::Sender;
use super::traits::MessageEnvelope;
type RemoteSerializerFn<M> = Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync>;
enum RefImpl<M: Send + 'static> {
Local {
path: ActorPath,
user: mpsc::UnboundedSender<MessageEnvelope<M>>,
system: mpsc::UnboundedSender<SystemMsg>,
system_ref: Weak<ActorSystemInner>,
},
Remote {
path: ActorPath,
handle: Arc<dyn RemoteRef>,
serialize: RemoteSerializerFn<M>,
},
}
pub struct ActorRef<M: Send + 'static> {
inner: Arc<RefImpl<M>>,
}
impl<M: Send + 'static> Clone for ActorRef<M> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<M: Send + 'static> fmt::Debug for ActorRef<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActorRef").field("path", &self.path().to_string()).finish()
}
}
impl<M: Send + 'static> ActorRef<M> {
pub(crate) fn new(
path: ActorPath,
user: mpsc::UnboundedSender<MessageEnvelope<M>>,
system: mpsc::UnboundedSender<SystemMsg>,
system_ref: Weak<ActorSystemInner>,
) -> Self {
Self { inner: Arc::new(RefImpl::Local { path, user, system, system_ref }) }
}
pub fn from_remote(handle: Arc<dyn RemoteRef>, serialize: RemoteSerializerFn<M>) -> Self {
let path = handle.path().clone();
Self { inner: Arc::new(RefImpl::Remote { path, handle, serialize }) }
}
pub fn path(&self) -> &ActorPath {
match &*self.inner {
RefImpl::Local { path, .. } => path,
RefImpl::Remote { path, .. } => path,
}
}
pub fn is_remote(&self) -> bool {
matches!(&*self.inner, RefImpl::Remote { .. })
}
pub fn tell(&self, msg: M) {
match &*self.inner {
RefImpl::Local { user, path, system_ref, .. } => {
if user.send(MessageEnvelope::new(msg)).is_err() {
notify_dead_letter::<M>(path, system_ref);
}
}
RefImpl::Remote { handle, serialize, .. } => {
handle.tell_serialized(serialize(msg, None));
}
}
}
pub fn tell_from(&self, msg: M, sender: Sender) {
match &*self.inner {
RefImpl::Local { user, path, system_ref, .. } => {
let env = MessageEnvelope::with_typed_sender(msg, sender);
if user.send(env).is_err() {
notify_dead_letter::<M>(path, system_ref);
}
}
RefImpl::Remote { handle, serialize, .. } => {
let sender_path = sender.path().cloned();
handle.tell_serialized(serialize(msg, sender_path));
}
}
}
pub fn stop(&self) {
match &*self.inner {
RefImpl::Local { system, .. } => {
let _ = system.send(SystemMsg::Stop);
}
RefImpl::Remote { handle, .. } => {
handle.tell_system(RemoteSystemMsg::Stop);
}
}
}
pub async fn ask_with<R, F>(&self, build: F, timeout: Duration) -> Result<R, AskError>
where
R: Send + 'static,
F: FnOnce(oneshot::Sender<R>) -> M,
{
let (tx, rx) = oneshot::channel();
let msg = build(tx);
self.tell(msg);
tokio::time::timeout(timeout, rx)
.await
.map_err(|_| AskError::Timeout)?
.map_err(|_| AskError::TargetDropped)
}
pub fn as_untyped(&self) -> UntypedActorRef {
match &*self.inner {
RefImpl::Local { path, system, .. } => UntypedActorRef {
inner: Arc::new(UntypedImpl::Local { path: path.clone(), system: system.clone() }),
},
RefImpl::Remote { path, handle, .. } => UntypedActorRef {
inner: Arc::new(UntypedImpl::Remote { path: path.clone(), handle: handle.clone() }),
},
}
}
pub(crate) fn system_sender(&self) -> mpsc::UnboundedSender<SystemMsg> {
match &*self.inner {
RefImpl::Local { system, .. } => system.clone(),
RefImpl::Remote { .. } => {
let (tx, _rx) = mpsc::unbounded_channel();
tx
}
}
}
}
fn notify_dead_letter<M: 'static>(path: &ActorPath, system_ref: &Weak<ActorSystemInner>) {
if let Some(system) = system_ref.upgrade() {
if let Some(obs) = system.dead_letter_observer.read().as_ref() {
obs.on_dead_letter(path, None, std::any::type_name::<M>());
}
}
}
enum UntypedImpl {
Local { path: ActorPath, system: mpsc::UnboundedSender<SystemMsg> },
Remote { path: ActorPath, handle: Arc<dyn RemoteRef> },
}
#[derive(Clone)]
pub struct UntypedActorRef {
inner: Arc<UntypedImpl>,
}
impl fmt::Debug for UntypedActorRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UntypedActorRef").field("path", &self.path().to_string()).finish()
}
}
impl UntypedActorRef {
pub fn from_remote(handle: Arc<dyn RemoteRef>) -> Self {
let path = handle.path().clone();
Self { inner: Arc::new(UntypedImpl::Remote { path, handle }) }
}
pub fn path(&self) -> &ActorPath {
match &*self.inner {
UntypedImpl::Local { path, .. } => path,
UntypedImpl::Remote { path, .. } => path,
}
}
pub fn is_remote(&self) -> bool {
matches!(&*self.inner, UntypedImpl::Remote { .. })
}
pub fn stop(&self) {
match &*self.inner {
UntypedImpl::Local { system, .. } => {
let _ = system.send(SystemMsg::Stop);
}
UntypedImpl::Remote { handle, .. } => {
handle.tell_system(RemoteSystemMsg::Stop);
}
}
}
pub fn notify_watchers(&self, sender: ActorPath) {
match &*self.inner {
UntypedImpl::Local { system, .. } => {
let _ = system.send(SystemMsg::Terminated(sender));
}
UntypedImpl::Remote { handle, .. } => {
handle.tell_system(RemoteSystemMsg::Terminated { actor: sender });
}
}
}
}
impl PartialEq for UntypedActorRef {
fn eq(&self, other: &Self) -> bool {
self.path() == other.path()
}
}
impl Eq for UntypedActorRef {}
impl std::hash::Hash for UntypedActorRef {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.path().hash(state);
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum AskError {
#[error("ask timed out")]
Timeout,
#[error("target actor was dropped before replying")]
TargetDropped,
}