use std::{any::Any, fmt::Debug, future::Future};
use futures::channel::oneshot;
use theta_flume::{Receiver, Sender, WeakSender};
use crate::{actor::Actor, actor_ref::ActorHdl, context::Context};
#[cfg(feature = "monitor")]
use crate::monitor::AnyUpdateTx;
#[cfg(feature = "remote")]
use {
crate::remote::{
base::Tag,
peer::{PEER, Peer},
},
serde::{Deserialize, Serialize},
};
#[cfg(not(feature = "remote"))]
use std::panic::UnwindSafe;
pub(crate) type SigAck = oneshot::Sender<()>;
pub type MsgPack<A> = (<A as Actor>::Msg, Continuation);
pub type OneShotAny = oneshot::Sender<Box<dyn Any + Send>>;
pub type OneShotBytes = oneshot::Sender<Vec<u8>>;
pub type MsgTx<A> = Sender<MsgPack<A>>;
pub type WeakMsgTx<A> = WeakSender<MsgPack<A>>;
pub type MsgRx<A> = Receiver<MsgPack<A>>;
pub type SigTx = Sender<RawSignal>;
pub type WeakSigTx = WeakSender<RawSignal>;
pub type SigRx = Receiver<RawSignal>;
pub trait Message<A: Actor>: Debug + Send + Into<A::Msg> + 'static {
#[cfg(not(feature = "remote"))]
type Return: Send + UnwindSafe + 'static;
#[cfg(feature = "remote")]
type Return: Debug + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static;
#[cfg(feature = "remote")]
const TAG: Tag;
fn process(
state: &mut A,
ctx: &Context<A>,
msg: Self,
) -> impl Future<Output = Self::Return> + Send;
fn process_to_any(
state: &mut A,
ctx: &Context<A>,
msg: Self,
) -> impl Future<Output = Box<dyn Any + Send>> + Send {
async move { Box::new(Self::process(state, ctx, msg).await) as Box<dyn Any + Send> }
}
#[cfg(feature = "remote")]
fn process_to_bytes(
state: &mut A,
ctx: &Context<A>,
peer: Peer,
msg: Self,
) -> impl Future<Output = Result<Vec<u8>, postcard::Error>> + Send {
async move {
let ret = Self::process(state, ctx, msg).await;
PEER.sync_scope(peer, || postcard::to_stdvec(&ret))
}
}
}
#[derive(Debug)]
pub enum Continuation {
Nil,
Reply(OneShotAny), Forward(OneShotAny),
#[cfg(feature = "remote")]
BinReply {
peer: Peer,
reply_tx: oneshot::Sender<Vec<u8>>,
},
#[cfg(feature = "remote")]
LocalBinForward {
peer: Peer,
tx: OneShotBytes, },
#[cfg(feature = "remote")]
RemoteBinForward {
peer: Peer,
tx: OneShotBytes, },
}
#[derive(Debug, Clone, Copy)]
pub enum Signal {
Restart,
Terminate,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "remote", derive(Serialize, Deserialize))]
pub enum Escalation {
Initialize(String),
ProcessMsg(String),
Supervise(String),
}
#[derive(Debug)]
pub enum RawSignal {
#[cfg(feature = "monitor")]
Monitor(AnyUpdateTx),
Escalation(ActorHdl, Escalation),
ChildDropped,
Pause(Option<SigAck>),
Resume(Option<SigAck>),
Restart(Option<SigAck>),
Terminate(Option<SigAck>),
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum InternalSignal {
Pause,
Resume,
Restart,
Terminate,
}
impl Continuation {
pub fn reply(tx: OneShotAny) -> Self {
Continuation::Reply(tx)
}
pub fn forward(tx: OneShotAny) -> Self {
Continuation::Forward(tx)
}
pub fn is_nil(&self) -> bool {
matches!(self, Continuation::Nil)
}
}
impl InternalSignal {
pub(crate) fn into_raw(self, k: Option<SigAck>) -> RawSignal {
match self {
InternalSignal::Pause => RawSignal::Pause(k),
InternalSignal::Resume => RawSignal::Resume(k),
InternalSignal::Restart => RawSignal::Restart(k),
InternalSignal::Terminate => RawSignal::Terminate(k),
}
}
}
impl From<Signal> for InternalSignal {
fn from(signal: Signal) -> Self {
match signal {
Signal::Restart => InternalSignal::Restart,
Signal::Terminate => InternalSignal::Terminate,
}
}
}