use super::active;
use super::reactive;
use std::any::Any;
use std::cmp;
use std::sync::Mutex;
#[cfg(feature = "async-actor")]
use super::async_actor;
#[derive(Debug, Clone)]
pub struct Addr {
pub(crate) kind: AddrKind,
}
#[derive(Debug)]
pub enum Message {
Request(Request),
Response(Response),
Notification(Notification),
}
pub type RequestId = u32;
#[derive(Debug)]
pub struct Request {
pub src: Addr,
pub id: RequestId,
pub data: Box<dyn Any + Send>,
}
#[derive(Debug)]
pub struct Response {
pub request_id: RequestId,
pub result: Result<Box<dyn Any + Send>, ErrorStatus>,
}
#[derive(Debug)]
pub struct Notification {
pub data: Box<dyn Any + Send>,
}
#[derive(Debug)]
pub enum Error {
AddrUnreachable,
QueueFull,
BrokenReceive,
ActorDisappeared,
Timeout,
NoMessage,
DowncastFailed,
}
#[derive(Debug)]
pub struct ErrorStatus {
pub error: Error,
pub request_data: Box<dyn Any + Send>,
}
pub(crate) type ActorId = u64;
pub(crate) const INVALID_ACTOR_ID: ActorId = 0;
static NEXT_ACTOR_ID: Mutex<ActorId> = Mutex::new(1);
pub(crate) fn generate_actor_id() -> ActorId {
let mut next_actor_id = NEXT_ACTOR_ID.lock().unwrap();
let out = *next_actor_id;
*next_actor_id += 1;
out
}
#[derive(Debug, Clone)]
pub(crate) enum AddrKind {
Invalid,
Reactive(reactive::ReactiveAddr),
Active(active::ActiveAddr),
#[cfg(feature = "async-actor")]
Async(async_actor::AsyncAddr),
}
pub(crate) struct NonBoxedErrorStatus<T>
where
T: Send + Sized,
{
pub error: Error,
pub request_data: T,
}
pub fn send_notification<T>(dst_addr: &Addr, data: T) -> Result<(), Error>
where
T: 'static + Send,
{
dst_addr.receive_notification(data)
}
impl Message {
pub fn new() -> Message {
Message::Notification(Notification { data: Box::new(()) })
}
}
impl Default for Message {
fn default() -> Self {
Self::new()
}
}
impl Addr {
pub const INVALID: Addr = Addr {
kind: AddrKind::Invalid,
};
pub fn new() -> Addr {
Addr {
kind: AddrKind::Invalid,
}
}
pub(crate) fn from_kind(kind: AddrKind) -> Self {
Self { kind }
}
pub(crate) fn actor_id(&self) -> ActorId {
match &self.kind {
AddrKind::Invalid => INVALID_ACTOR_ID,
AddrKind::Reactive(reactive_addr) => reactive_addr.actor_id(),
AddrKind::Active(active_addr) => active_addr.actor_id(),
#[cfg(feature = "async-actor")]
AddrKind::Async(async_addr) => async_addr.actor_id(),
}
}
pub fn is_valid(&self) -> bool {
!matches!(self.kind, AddrKind::Invalid)
}
pub(crate) fn receive_notification<T>(&self, data: T) -> Result<(), Error>
where
T: 'static + Send,
{
match &self.kind {
AddrKind::Invalid => Result::Err(Error::AddrUnreachable),
AddrKind::Reactive(reactive_addr) => reactive_addr.receive_notification(data),
AddrKind::Active(active_addr) => active_addr.receive_notification(data),
#[cfg(feature = "async-actor")]
AddrKind::Async(async_addr) => async_addr.receive_notification(data),
}
}
pub(crate) fn receive_request<T>(&self, src: &Addr, id: RequestId, data: T)
where
T: 'static + Send,
{
match &self.kind {
AddrKind::Invalid => {
let _ = src.receive_err_response(
id,
NonBoxedErrorStatus {
error: Error::AddrUnreachable,
request_data: data,
},
);
}
AddrKind::Reactive(reactive_addr) => reactive_addr.receive_request(src, id, data),
AddrKind::Active(active_addr) => active_addr.receive_request(src, id, data),
#[cfg(feature = "async-actor")]
AddrKind::Async(async_addr) => async_addr.receive_request(src, id, data),
}
}
pub(crate) fn receive_ok_response<T>(
&self,
request_id: RequestId,
result: T,
) -> Result<(), Error>
where
T: 'static + Send + Sized,
{
match &self.kind {
AddrKind::Invalid => Result::Err(Error::AddrUnreachable),
AddrKind::Reactive(reactive_addr) => {
reactive_addr.receive_ok_response(request_id, result)
}
AddrKind::Active(active_addr) => active_addr.receive_ok_response(request_id, result),
#[cfg(feature = "async-actor")]
AddrKind::Async(async_addr) => async_addr.receive_ok_response(request_id, result),
}
}
pub(crate) fn receive_err_response<T>(
&self,
request_id: RequestId,
result: NonBoxedErrorStatus<T>,
) -> Result<(), Error>
where
T: 'static + Send + Sized,
{
match &self.kind {
AddrKind::Invalid => Result::Err(Error::AddrUnreachable),
AddrKind::Reactive(reactive_addr) => {
reactive_addr.receive_err_response(request_id, result)
}
AddrKind::Active(active_addr) => active_addr.receive_err_response(request_id, result),
#[cfg(feature = "async-actor")]
AddrKind::Async(async_addr) => async_addr.receive_err_response(request_id, result),
}
}
}
impl Default for Addr {
fn default() -> Self {
Self::new()
}
}
impl cmp::Ord for Addr {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.actor_id().cmp(&other.actor_id())
}
}
impl cmp::PartialOrd for Addr {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl cmp::PartialEq for Addr {
fn eq(&self, other: &Self) -> bool {
self.actor_id() == other.actor_id()
}
}
impl cmp::Eq for Addr {}
impl Response {
#[deprecated(note = "The name of this function is a typo, use id_eq() instead")]
pub fn iq_eq(&self, id: RequestId) -> bool {
self.id_eq(id)
}
pub fn id_eq(&self, id: RequestId) -> bool {
self.request_id == id
}
}