use crate::import::*;
use crate::process::registry::{Dispatch, ProcessRegistry, RegisterProcess, UnregisterProcess};
use crate::node::{NodeControl, SendToNode};
use crate::util::Service;
use actix::dev::{ContextParts, Mailbox, ContextFut, AsyncContextParts, ToEnvelope, Envelope, RecipientRequest};
use actix::Handler;
use std::pin::Pin;
pub mod registry;
#[derive(Debug)]
pub enum DispatchError {
TimeoutLocal,
TimeoutRemote,
DispatchRemote,
DispatchLocal,
MailboxRemote,
Format,
}
pub trait Dispatcher: Send + 'static {
fn dispatch(&self, method: u64, data: Bytes) -> BoxFuture<'static, Result<Bytes, DispatchError>>;
}
pub trait ProcessDispatch: Actor<Context=Process<Self>> {
fn make_dispatcher(addr: WeakAddr<Self>) -> Box<dyn Dispatcher>;
}
pub struct Process<A: Actor<Context=Self>>
{
id: Uuid,
parts: ContextParts<A>,
mb: Option<Mailbox<A>>,
}
impl<A: Actor<Context=Self>> Process<A> where A: ProcessDispatch
{
pub fn start(a: A) -> Pid<A> {
Self::start_with(|_| a)
}
pub fn start_with(f: impl FnOnce(&mut Self) -> A) -> Pid<A> {
let (tx, rx) = actix::dev::channel::channel(2);
let id = Uuid::new_v4();
let parts = ContextParts::new(rx.sender_producer());
let mut proc = Process {
id,
parts,
mb: Some(actix::dev::Mailbox::new(rx)),
};
let act = f(&mut proc);
proc.run(act)
}
pub fn pid(&self) -> Pid<A> {
return Pid::Local {
id: self.id.clone(),
addr: self.parts.address(),
};
}
fn run(mut self, act: A) -> Pid<A> {
let id = self.id;
let pid = self.pid();
let fut = self.into_fut(act);
ProcessRegistry::from_registry().do_send(RegisterProcess::new(pid.clone()));
let fut = fut.map(move |_| {
ProcessRegistry::from_registry().do_send(UnregisterProcess { id });
});
actix_rt::spawn(fut);
pid
}
fn into_fut(mut self, act: A) -> ContextFut<A, Self> {
let mb = self.mb.take().unwrap();
ContextFut::new(self, act, mb)
}
}
impl<A: Actor<Context=Self>> AsyncContextParts<A> for Process<A>
{
fn parts(&mut self) -> &mut ContextParts<A> {
&mut self.parts
}
}
impl<A: Actor<Context=Self>> ActorContext for Process<A>
{
fn stop(&mut self) {
self.parts.stop()
}
fn terminate(&mut self) {
self.parts.terminate()
}
fn state(&self) -> ActorState {
self.parts.state()
}
}
impl<A: Actor<Context=Self>> AsyncContext<A> for Process<A>
{
fn address(&self) -> Addr<A> {
self.parts.address()
}
fn spawn<F>(&mut self, fut: F) -> SpawnHandle where
F: ActorFuture<Output=(), Actor=A> + 'static {
self.parts.spawn(fut)
}
fn wait<F>(&mut self, fut: F) where
F: ActorFuture<Output=(), Actor=A> + 'static {
self.parts.wait(fut)
}
fn waiting(&self) -> bool {
self.parts.waiting()
}
fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.parts.cancel_future(handle)
}
}
impl<A: Actor<Context=Self>, M: Message> ToEnvelope<A, M> for Process<A>
where A: Handler<M>,
M: Send + 'static,
M::Result: Send
{
fn pack(msg: M, tx: Option<Sender<<M as Message>::Result>>) -> Envelope<A> {
Envelope::new(msg, tx)
}
}
pub enum Pid<A: Actor + ProcessDispatch> {
Local {
id: Uuid,
addr: Addr<A>,
},
Remote(Uuid),
}
impl<A: Actor + ProcessDispatch> Clone for Pid<A> {
fn clone(&self) -> Self {
match self {
Pid::Local {
id, addr
} => Pid::Local { id: id.clone(), addr: addr.clone() },
Pid::Remote(r) => Pid::Remote(*r),
}
}
}
impl<A: Actor + ProcessDispatch> Pid<A> {
pub fn local_addr(&self) -> Option<Addr<A>> {
match self {
Pid::Local { addr, .. } => Some(addr.clone()),
_ => None
}
}
pub fn id(&self) -> Uuid {
match self {
Pid::Local { id, .. } => id.clone(),
Pid::Remote(id) => id.clone()
}
}
pub fn send<M>(&self, m: M) -> PidRequest<A, M>
where A: Handler<M>,
A::Context: ToEnvelope<A, M>,
M: Message + Service + Send,
M::Result: Send,
{
match self {
Pid::Local { addr, .. } => PidRequest::Local(addr.send(m)),
Pid::Remote(id) => {
let dispatch = m.make_call_dispatch(id.clone()).unwrap();
PidRequest::Remote(ProcessRegistry::from_registry().send(dispatch))
}
}
}
pub fn do_send<M>(&self, m: M)
where A: Handler<M>,
A::Context: ToEnvelope<A, M>,
M: Message + Service + Send,
M::Result: Send,
{
match self {
Self::Local { addr, .. } => addr.do_send(m),
Self::Remote(id) => {
let dispatch = m.make_ann_dispatch(id.clone()).unwrap();
ProcessRegistry::from_registry().do_send(dispatch)
}
}
}
pub fn recipient<M>(&self) -> PidRecipient<M>
where A: Handler<M>,
A::Context: ToEnvelope<A, M>,
M: Message + Service + Send + 'static,
M::Result: Send,
{
match self {
Self::Local { addr, id } => PidRecipient {
id: id.clone(),
local: Some(addr.clone().recipient()),
},
Self::Remote(id) => PidRecipient {
id: id.clone(),
local: None,
}
}
}
}
pub enum PidRequest<A, M>
where A: Actor + Handler<M>,
A::Context: ToEnvelope<A, M>,
M: Message
{
Local(Request<A, M>),
Remote(Request<ProcessRegistry, Dispatch>),
}
impl<A: Actor, M: Message> Future for PidRequest<A, M>
where A: Actor + Handler<M>,
A::Context: ToEnvelope<A, M>,
M: Message + Service + Unpin + Send,
M::Result: Send
{
type Output = Result<M::Result, DispatchError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
PidRequest::Local(r) => {
r.poll_unpin(cx).map_err(|e| DispatchError::DispatchLocal)
}
PidRequest::Remote(r) => {
match futures::ready!(r.poll_unpin(cx)) {
Ok(Ok(res)) => {
Poll::Ready(<M as Service>::read_result(res).map_err(|e| DispatchError::Format))
}
Ok(Err(err)) => Poll::Ready(Err(err)),
Err(mailbox) => Poll::Ready(Err(DispatchError::DispatchRemote)),
}
}
}
}
}
pub struct PidRecipient<M>
where M: Message + Send,
M::Result: Send,
{
id: Uuid,
local: Option<Recipient<M>>,
}
impl<M> PidRecipient<M>
where M: Message + Send,
M::Result: Send {
pub fn from_id(id: Uuid) -> Self {
Self {
id,
local: None,
}
}
}
impl<M> Clone for PidRecipient<M>
where M: Message + Send,
M::Result: Send {
fn clone(&self) -> Self {
Self {
id: self.id,
local: self.local.clone(),
}
}
}
impl<M> PidRecipient<M>
where M: Message + Service + Send,
M::Result: Send,
{
pub fn send(&self, m: M) -> PidRecipientRequest<M> {
if let Some(ref local) = self.local {
return PidRecipientRequest::Local(local.send(m));
} else {
let dispatch = m.make_call_dispatch(self.id.clone()).unwrap();
PidRecipientRequest::Remote(ProcessRegistry::from_registry().send(dispatch))
}
}
pub fn do_send(&self, m: M) -> Result<(), SendError<M>> {
if let Some(ref local) = self.local {
local.do_send(m)
} else {
let dispatch = m.make_ann_dispatch(self.id.clone()).unwrap();
Ok(ProcessRegistry::from_registry().do_send(dispatch))
}
}
}
pub enum PidRecipientRequest<M>
where M: Message + Send + 'static,
M::Result: Send {
Local(RecipientRequest<M>),
Remote(Request<ProcessRegistry, Dispatch>),
}
impl<M: Message> Future for PidRecipientRequest<M>
where M: Message + Service + Unpin + Send,
M::Result: Send
{
type Output = Result<M::Result, DispatchError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
Self::Local(r) => {
r.poll_unpin(cx).map_err(|e| DispatchError::DispatchLocal)
}
Self::Remote(r) => {
match futures::ready!(r.poll_unpin(cx)) {
Ok(Ok(res)) => {
Poll::Ready(<M as Service>::read_result(res).map_err(|e| DispatchError::Format))
}
Ok(Err(err)) => Poll::Ready(Err(err)),
Err(mailbox) => Poll::Ready(Err(DispatchError::DispatchRemote)),
}
}
}
}
}