use std::any::TypeId;
use std::convert::TryFrom;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::iter::FromIterator;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{self, Poll};
use heph_inbox::{self as inbox, Sender};
pub mod rpc;
#[doc(no_inline)]
pub use rpc::{Rpc, RpcError, RpcMessage, RpcResponse};
pub struct ActorRef<M> {
kind: ActorRefKind<M>,
}
enum ActorRefKind<M> {
Local(Sender<M>),
Mapped(Arc<dyn MappedActorRef<M>>),
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<M: Send> Send for ActorRefKind<M> {}
unsafe impl<M: Send> Sync for ActorRefKind<M> {}
impl<M> Unpin for ActorRefKind<M> {}
impl<M> ActorRef<M> {
#[doc(hidden)] pub const fn local(sender: Sender<M>) -> ActorRef<M> {
ActorRef {
kind: ActorRefKind::Local(sender),
}
}
pub fn send<'r, Msg>(&'r self, msg: Msg) -> SendValue<'r, M>
where
Msg: Into<M>,
{
use ActorRefKind::*;
let msg = msg.into();
SendValue {
kind: match &self.kind {
Local(sender) => SendValueKind::Local(sender.send(msg)),
Mapped(actor_ref) => SendValueKind::Mapped(actor_ref.mapped_send(msg)),
},
}
}
pub fn try_send<Msg>(&self, msg: Msg) -> Result<(), SendError>
where
Msg: Into<M>,
{
use ActorRefKind::*;
#[cfg(any(test, feature = "test"))]
if crate::test::should_lose_msg() {
log::debug!("dropping message on purpose");
return Ok(());
}
let msg = msg.into();
match &self.kind {
Local(sender) => sender.try_send(msg).map_err(|_| SendError),
Mapped(actor_ref) => actor_ref.try_mapped_send(msg),
}
}
pub fn rpc<'r, Req, Res>(&'r self, request: Req) -> Rpc<'r, M, Res>
where
M: From<RpcMessage<Req, Res>>,
{
Rpc::new(self, request)
}
pub fn map<Msg>(self) -> ActorRef<Msg>
where
M: From<Msg> + 'static,
Msg: 'static,
{
self.try_map()
}
pub fn try_map<Msg>(self) -> ActorRef<Msg>
where
M: TryFrom<Msg> + 'static,
Msg: 'static,
{
if TypeId::of::<ActorRef<M>>() == TypeId::of::<ActorRef<Msg>>() {
unsafe { std::mem::transmute(self) }
} else {
ActorRef {
kind: ActorRefKind::Mapped(Arc::new(self)),
}
}
}
pub fn map_fn<Msg, F>(self, map: F) -> ActorRef<Msg>
where
F: Fn(Msg) -> M + 'static,
M: 'static,
{
self.try_map_fn::<Msg, _, !>(move |msg| Ok(map(msg)))
}
pub fn try_map_fn<Msg, F, E>(self, map: F) -> ActorRef<Msg>
where
F: Fn(Msg) -> Result<M, E> + 'static,
M: 'static,
{
let mapped_ref = MappedActorRefFn {
actor_ref: self,
map,
};
ActorRef {
kind: ActorRefKind::Mapped(Arc::new(mapped_ref)),
}
}
pub fn join<'r>(&'r self) -> Join<'r, M> {
use ActorRefKind::*;
Join {
kind: match &self.kind {
Local(sender) => JoinKind::Local(sender.join()),
Mapped(actor_ref) => JoinKind::Mapped(actor_ref.mapped_join()),
},
}
}
pub fn is_connected(&self) -> bool {
use ActorRefKind::*;
match &self.kind {
Local(sender) => sender.is_connected(),
Mapped(actor_ref) => actor_ref.is_connected(),
}
}
pub fn sends_to<Msg>(&self, other: &ActorRef<Msg>) -> bool {
self.id() == other.id()
}
fn id(&self) -> inbox::Id {
use ActorRefKind::*;
match &self.kind {
Local(sender) => sender.id(),
Mapped(actor_ref) => actor_ref.id(),
}
}
}
impl<M> Clone for ActorRef<M> {
fn clone(&self) -> ActorRef<M> {
use ActorRefKind::*;
ActorRef {
kind: match &self.kind {
Local(sender) => Local(sender.clone()),
Mapped(actor_ref) => Mapped(actor_ref.clone()),
},
}
}
}
impl<M> fmt::Debug for ActorRef<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ActorRef")
}
}
trait MappedActorRef<M> {
fn try_mapped_send(&self, msg: M) -> Result<(), SendError>;
fn mapped_send<'r>(&'r self, msg: M) -> MappedSendValue<'r>;
fn mapped_join<'r>(&'r self) -> MappedJoin<'r>;
fn is_connected(&self) -> bool;
fn id(&self) -> inbox::Id;
}
impl<M, Msg> MappedActorRef<Msg> for ActorRef<M>
where
M: TryFrom<Msg>,
{
fn try_mapped_send(&self, msg: Msg) -> Result<(), SendError> {
M::try_from(msg)
.map_err(|_| SendError)
.and_then(|msg| self.try_send(msg))
}
fn mapped_send<'r>(&'r self, msg: Msg) -> MappedSendValue<'r> {
match M::try_from(msg) {
Ok(msg) => match &self.kind {
ActorRefKind::Local(sender) => match sender.try_send(msg) {
Ok(()) => MappedSendValue::Send,
Err(heph_inbox::SendError::Full(msg)) => {
MappedSendValue::Sending(Box::pin(self.send(msg)))
}
Err(heph_inbox::SendError::Disconnected(_)) => MappedSendValue::SendErr,
},
ActorRefKind::Mapped(sender) => sender.mapped_send(msg),
},
Err(..) => MappedSendValue::SendErr,
}
}
fn mapped_join<'r>(&'r self) -> MappedJoin<'r> {
match &self.kind {
ActorRefKind::Local(sender) => match sender.is_connected() {
false => MappedJoin::Disconnected,
true => MappedJoin::Join(Box::pin(self.join())),
},
ActorRefKind::Mapped(sender) => sender.mapped_join(),
}
}
fn is_connected(&self) -> bool {
self.is_connected()
}
fn id(&self) -> inbox::Id {
self.id()
}
}
struct MappedActorRefFn<M, F> {
actor_ref: ActorRef<M>,
map: F,
}
impl<M, Msg, F, E> MappedActorRef<Msg> for MappedActorRefFn<M, F>
where
F: Fn(Msg) -> Result<M, E>,
{
fn try_mapped_send(&self, msg: Msg) -> Result<(), SendError> {
match (self.map)(msg) {
Ok(msg) => self.actor_ref.try_send(msg),
Err(..) => Err(SendError),
}
}
fn mapped_send<'r>(&'r self, msg: Msg) -> MappedSendValue<'r> {
match (self.map)(msg) {
Ok(msg) => match &self.actor_ref.kind {
ActorRefKind::Local(sender) => match sender.try_send(msg) {
Ok(()) => MappedSendValue::Send,
Err(heph_inbox::SendError::Full(msg)) => {
MappedSendValue::Sending(Box::pin(self.actor_ref.send(msg)))
}
Err(heph_inbox::SendError::Disconnected(_)) => MappedSendValue::SendErr,
},
ActorRefKind::Mapped(sender) => sender.mapped_send(msg),
},
Err(..) => MappedSendValue::SendErr,
}
}
fn mapped_join<'r>(&'r self) -> MappedJoin<'r> {
match &self.actor_ref.kind {
ActorRefKind::Local(sender) => match sender.is_connected() {
false => MappedJoin::Disconnected,
true => MappedJoin::Join(Box::pin(self.actor_ref.join())),
},
ActorRefKind::Mapped(sender) => sender.mapped_join(),
}
}
fn is_connected(&self) -> bool {
self.actor_ref.is_connected()
}
fn id(&self) -> inbox::Id {
self.actor_ref.id()
}
}
enum MappedSendValue<'r> {
Send,
SendErr,
Sending(Pin<Box<dyn Future<Output = Result<(), SendError>> + 'r>>),
}
impl<'r> Future for MappedSendValue<'r> {
type Output = Result<(), SendError>;
#[track_caller]
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
use MappedSendValue::*;
match self.get_mut() {
Send => Poll::Ready(Ok(())),
SendErr => Poll::Ready(Err(SendError)),
Sending(send_value) => send_value.as_mut().poll(ctx),
}
}
}
enum MappedJoin<'r> {
Disconnected,
Join(Pin<Box<dyn Future<Output = ()> + 'r>>),
}
impl<'r> Future for MappedJoin<'r> {
type Output = ();
#[track_caller]
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
use MappedJoin::*;
match self.get_mut() {
Disconnected => Poll::Ready(()),
Join(join) => join.as_mut().poll(ctx),
}
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendValue<'r, M> {
kind: SendValueKind<'r, M>,
}
enum SendValueKind<'r, M> {
Local(inbox::SendValue<'r, M>),
Mapped(MappedSendValue<'r>),
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<'r, M: Send> Send for SendValueKind<'r, M> {}
unsafe impl<'r, M: Send> Sync for SendValueKind<'r, M> {}
impl<'r, M> Future for SendValue<'r, M> {
type Output = Result<(), SendError>;
#[track_caller]
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
use SendValueKind::*;
#[cfg(any(test, feature = "test"))]
if crate::test::should_lose_msg() {
log::debug!("dropping message on purpose");
return Poll::Ready(Ok(()));
}
let this = unsafe { self.get_unchecked_mut() };
match &mut this.kind {
Local(fut) => unsafe { Pin::new_unchecked(fut) }
.poll(ctx)
.map_err(|_| SendError),
Mapped(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
}
}
}
impl<'r, M> fmt::Debug for SendValue<'r, M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("SendValue")
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct SendError;
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("unable to send message")
}
}
impl Error for SendError {}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Join<'r, M> {
kind: JoinKind<'r, M>,
}
enum JoinKind<'r, M> {
Local(inbox::Join<'r, M>),
Mapped(MappedJoin<'r>),
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<'r, M: Send> Send for JoinKind<'r, M> {}
unsafe impl<'r, M: Send> Sync for JoinKind<'r, M> {}
impl<'r, M> Future for Join<'r, M> {
type Output = ();
#[track_caller]
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
use JoinKind::*;
let this = unsafe { self.get_unchecked_mut() };
match &mut this.kind {
Local(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
Mapped(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
}
}
}
impl<'r, M> fmt::Debug for Join<'r, M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Join")
}
}
pub struct ActorGroup<M> {
actor_refs: Vec<ActorRef<M>>,
send_next: AtomicUsize,
}
#[derive(Copy, Clone, Debug)]
pub enum Delivery {
ToAll,
ToOne,
}
impl<M> ActorGroup<M> {
pub const fn empty() -> ActorGroup<M> {
ActorGroup {
actor_refs: Vec::new(),
send_next: AtomicUsize::new(0),
}
}
pub fn new<I>(actor_refs: I) -> ActorGroup<M>
where
I: IntoIterator<Item = ActorRef<M>>,
{
ActorGroup {
actor_refs: actor_refs.into_iter().collect(),
send_next: AtomicUsize::new(0),
}
}
pub fn len(&self) -> usize {
self.actor_refs.len()
}
pub fn is_empty(&self) -> bool {
self.actor_refs.is_empty()
}
pub fn add(&mut self, actor_ref: ActorRef<M>) {
self.actor_refs.push(actor_ref)
}
pub fn add_unique(&mut self, actor_ref: ActorRef<M>) {
let id = actor_ref.id();
for actor_ref in &self.actor_refs {
if actor_ref.id() == id {
return;
}
}
self.actor_refs.push(actor_ref)
}
pub fn remove(&mut self, actor_ref: &ActorRef<M>) {
let id = actor_ref.id();
self.actor_refs.retain(|a| a.id() != id);
}
pub fn remove_disconnected(&mut self) {
self.actor_refs.retain(ActorRef::is_connected);
}
pub fn make_unique(&mut self) {
let mut i = 0;
while let Some(id) = self.actor_refs.get(i).map(ActorRef::id) {
let mut j = i + 1;
while let Some(other_id) = self.actor_refs.get(j).map(ActorRef::id) {
if id == other_id {
drop(self.actor_refs.swap_remove(j));
} else {
j += 1;
}
}
i += 1;
}
}
pub fn try_send<Msg>(&self, msg: Msg, delivery: Delivery) -> Result<(), SendError>
where
Msg: Into<M> + Clone,
{
if self.actor_refs.is_empty() {
return Err(SendError);
}
match delivery {
Delivery::ToAll => {
for actor_ref in &self.actor_refs {
let _ = actor_ref.try_send(msg.clone());
}
Ok(())
}
Delivery::ToOne => {
let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
let actor_ref = &self.actor_refs[idx];
actor_ref.try_send(msg)
}
}
}
pub fn join_all<'r>(&'r self) -> JoinAll<'r, M> {
JoinAll {
actor_refs: &self.actor_refs,
join: None,
}
}
}
impl<M> From<ActorRef<M>> for ActorGroup<M> {
fn from(actor_ref: ActorRef<M>) -> ActorGroup<M> {
ActorGroup {
actor_refs: vec![actor_ref],
send_next: AtomicUsize::new(0),
}
}
}
impl<M> FromIterator<ActorRef<M>> for ActorGroup<M> {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = ActorRef<M>>,
{
ActorGroup::new(iter)
}
}
impl<M> Extend<ActorRef<M>> for ActorGroup<M> {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = ActorRef<M>>,
{
self.actor_refs.extend(iter);
}
}
impl<M> Clone for ActorGroup<M> {
fn clone(&self) -> ActorGroup<M> {
ActorGroup {
actor_refs: self.actor_refs.clone(),
send_next: AtomicUsize::new(0),
}
}
fn clone_from(&mut self, source: &Self) {
self.actor_refs.clone_from(&source.actor_refs);
*self.send_next.get_mut() = 0
}
}
impl<M> fmt::Debug for ActorGroup<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.actor_refs.fmt(f)
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct JoinAll<'r, M> {
actor_refs: &'r [ActorRef<M>],
join: Option<Join<'r, M>>,
}
impl<'r, M> Future for JoinAll<'r, M> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(join) = self.join.as_mut() {
match Pin::new(join).poll(ctx) {
Poll::Ready(()) => {
drop(self.join.take());
}
Poll::Pending => return Poll::Pending,
}
}
let mut remove = 0;
for actor_ref in self.actor_refs {
if actor_ref.is_connected() {
break;
}
remove += 1;
}
self.actor_refs = &self.actor_refs[remove..];
if self.actor_refs.is_empty() {
return Poll::Ready(());
}
self.join = Some(self.actor_refs[0].join());
}
}
}
impl<'r, M> fmt::Debug for JoinAll<'r, M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinAll")
.field("left", &self.actor_refs.len())
.finish()
}
}