mod builder;
mod lifecycles;
mod tag;
pub mod handlers;
pub(crate) mod messages;
use std::any::type_name;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::time::Duration;
use self::builder::AbstractProcessBuilder;
use self::handlers::{DeferredRequest, Handlers, Message, Request};
use self::messages::{RequestMessage, ReturnAddress, ShutdownMessage, SHUTDOWN_HANDLER};
use self::tag::AbstractProcessTag;
use crate::function::process::{process_name, ProcessType};
use crate::mailbox::{MailboxError, MessageSignal};
use crate::protocol::ProtocolCapture;
use crate::serializer::CanSerialize;
use crate::time::{Timeout, TimerRef, WithDelay, WithTimeout};
use crate::{host, MailboxResult, Process, ProcessConfig, ProcessName, Tag};
pub trait AbstractProcess: Sized
where
Self::Serializer: CanSerialize<Self::Arg>,
Self::Serializer: CanSerialize<Result<(), StartupError<Self>>>,
Self::Serializer: CanSerialize<ShutdownMessage<Self::Serializer>>,
Self::Serializer: CanSerialize<()>,
Self::Serializer: CanSerialize<(
Process<Result<(), StartupError<Self>>, Self::Serializer>,
Tag,
Self::Arg,
)>,
Self::Serializer: CanSerialize<
ProtocolCapture<(
Process<Result<(), StartupError<Self>>, Self::Serializer>,
Tag,
Self::Arg,
)>,
>,
{
type State;
type Serializer;
type Arg;
type Handlers: Handlers<Self>;
type StartupError: Debug;
fn init(config: Config<Self>, arg: Self::Arg) -> Result<Self::State, Self::StartupError>;
fn terminate(_state: Self::State) {}
fn handle_link_death(_state: State<Self>, _tag: Tag) {}
#[track_caller]
fn start(arg: Self::Arg) -> Result<ProcessRef<Self>, StartupError<Self>> {
AbstractProcessBuilder::<Self>::new().start(arg)
}
#[track_caller]
fn start_as<N: ProcessName>(
name: &N,
arg: Self::Arg,
) -> Result<ProcessRef<Self>, StartupError<Self>> {
AbstractProcessBuilder::<Self>::new().start_as(name, arg)
}
fn link() -> AbstractProcessBuilder<'static, Self> {
AbstractProcessBuilder::new().link()
}
fn link_with(tag: Tag) -> AbstractProcessBuilder<'static, Self> {
AbstractProcessBuilder::new().link_with(tag)
}
fn configure(config: &ProcessConfig) -> AbstractProcessBuilder<Self> {
AbstractProcessBuilder::new().configure(config)
}
fn on_node(node: u64) -> AbstractProcessBuilder<'static, Self> {
AbstractProcessBuilder::new().on_node(node)
}
}
pub struct Config<AP: AbstractProcess> {
phantom: PhantomData<AP>,
}
impl<AP: AbstractProcess> Config<AP> {
pub(crate) fn new() -> Self {
Config {
phantom: PhantomData,
}
}
pub fn die_if_link_dies(&self, die: bool) {
unsafe { host::api::process::die_when_link_dies(die as u32) };
}
pub fn self_ref(&self) -> ProcessRef<AP> {
let process = unsafe { Process::this() };
ProcessRef { process }
}
}
pub trait MessageHandler<Message>: AbstractProcess
where
Self::Serializer: CanSerialize<Message>,
{
fn handle(state: State<Self>, message: Message);
}
pub trait RequestHandler<Request>: AbstractProcess
where
Self::Serializer: CanSerialize<Request>,
Self::Serializer: CanSerialize<Self::Response>,
{
type Response;
fn handle(state: State<Self>, request: Request) -> Self::Response;
}
pub trait DeferredRequestHandler<Request>: AbstractProcess
where
Self::Serializer: CanSerialize<Request>,
Self::Serializer: CanSerialize<Self::Response>,
{
type Response;
fn handle(
state: State<Self>,
request: Request,
deferred_response: DeferredResponse<Self::Response, Self>,
);
}
pub struct State<'a, AP: AbstractProcess> {
state: &'a mut AP::State,
}
impl<'a, AP: AbstractProcess> State<'a, AP> {
pub fn self_ref(&self) -> ProcessRef<AP> {
let process = unsafe { Process::this() };
ProcessRef { process }
}
}
impl<'a, AP: AbstractProcess> Deref for State<'a, AP> {
type Target = AP::State;
fn deref(&self) -> &Self::Target {
self.state
}
}
impl<'a, AP: AbstractProcess> DerefMut for State<'a, AP> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.state
}
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(bound = "")]
pub struct DeferredResponse<Response, AP: AbstractProcess> {
tag: Tag,
return_address: ReturnAddress<Response, AP::Serializer>,
}
impl<Response, AP: AbstractProcess> DeferredResponse<Response, AP>
where
AP::Serializer: CanSerialize<Response>,
{
pub fn send_response(self, response: Response) {
self.return_address.send_response(response, self.tag);
}
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(bound = "")]
pub struct ProcessRef<T>
where
T: AbstractProcess,
{
process: Process<(), T::Serializer>,
}
impl<T: AbstractProcess> Copy for ProcessRef<T> {}
impl<T> ProcessRef<T>
where
T: AbstractProcess,
{
unsafe fn new(node_id: u64, process_id: u64) -> Self {
let process = Process::new(node_id, process_id);
ProcessRef { process }
}
pub fn id(&self) -> u64 {
self.process.id()
}
pub fn node_id(&self) -> u64 {
self.process.node_id()
}
pub fn lookup<N: ProcessName + ?Sized>(name: &N) -> Option<Self> {
let name = process_name::<T, T::Serializer>(ProcessType::ProcessRef, name.process_name());
let mut id = 0;
let mut node_id = 0;
let result =
unsafe { host::api::registry::get(name.as_ptr(), name.len(), &mut node_id, &mut id) };
if result == 0 {
unsafe { Some(Self::new(node_id, id)) }
} else {
None
}
}
pub fn register<N: ProcessName>(&self, name: &N) {
let name = process_name::<T, T::Serializer>(ProcessType::ProcessRef, name.process_name());
unsafe { host::api::registry::put(name.as_ptr(), name.len(), self.node_id(), self.id()) };
}
#[track_caller]
pub fn is_alive(&self) -> bool {
assert_eq!(
self.process.node_id(),
host::node_id(),
"is_alive() can only be used with local processes"
);
unsafe { host::api::process::exists(self.process.id()) != 0 }
}
pub fn link(&self) {
self.link_with(Tag::new());
}
pub fn link_with(&self, tag: Tag) {
unsafe { host::api::process::link(tag.id(), self.process.id()) };
}
pub fn unlink(&self) {
unsafe { host::api::process::unlink(self.process.id()) };
}
pub fn kill(&self) {
unsafe { host::api::process::kill(self.process.id()) };
}
#[track_caller]
pub fn shutdown(&self)
where
T::Serializer: CanSerialize<ShutdownMessage<T::Serializer>>,
T::Serializer: CanSerialize<()>,
{
self.shutdown_timeout(None).unwrap();
}
#[track_caller]
pub(crate) fn shutdown_timeout(&self, timeout: Option<Duration>) -> Result<(), Timeout>
where
T::Serializer: CanSerialize<ShutdownMessage<T::Serializer>>,
T::Serializer: CanSerialize<()>,
{
let return_address = ReturnAddress::from_self();
let message = ShutdownMessage(return_address);
let send_tag = AbstractProcessTag::from_u6(SHUTDOWN_HANDLER);
let (receive_tag, _) = AbstractProcessTag::extract_u6_data(send_tag);
unsafe {
let process: Process<ShutdownMessage<T::Serializer>, T::Serializer> =
mem::transmute(self.process);
match process.tag_send_receive(send_tag, receive_tag, message, timeout) {
MailboxResult::Ok(MessageSignal::Message(())) => Ok(()),
MailboxResult::Err(MailboxError::TimedOut) => Err(Timeout),
_ => unreachable!("send_receive should panic in case of other errors"),
}
}
}
#[track_caller]
pub fn send<M: 'static>(&self, message: M)
where
T::Serializer: CanSerialize<M>,
{
let handler_id = T::Handlers::handler_id::<Message<M>>();
let tag = AbstractProcessTag::from_u6(handler_id);
let process: Process<M, T::Serializer> = unsafe { std::mem::transmute(self.process) };
process.tag_send(tag, message);
}
#[track_caller]
pub(crate) fn delayed_send<M: 'static>(&self, message: M, duration: Duration) -> TimerRef
where
T::Serializer: CanSerialize<M>,
{
let handler_id = T::Handlers::handler_id::<Message<M>>();
let tag = AbstractProcessTag::from_u6(handler_id);
let process: Process<M, T::Serializer> = unsafe { std::mem::transmute(self.process) };
process.tag_send_after(tag, message, duration)
}
#[track_caller]
pub fn request<R: 'static>(&self, request: R) -> T::Response
where
T: RequestHandler<R>,
T::Serializer: CanSerialize<R>,
T::Serializer: CanSerialize<T::Response>,
T::Serializer: CanSerialize<RequestMessage<R, T::Response, T::Serializer>>,
{
self.request_timeout(request, None).unwrap()
}
#[track_caller]
pub(crate) fn request_timeout<R: 'static>(
&self,
request: R,
timeout: Option<Duration>,
) -> Result<T::Response, Timeout>
where
T: RequestHandler<R>,
T::Serializer: CanSerialize<R>,
T::Serializer: CanSerialize<T::Response>,
T::Serializer: CanSerialize<RequestMessage<R, T::Response, T::Serializer>>,
{
let return_address = ReturnAddress::from_self();
let message = RequestMessage(request, return_address);
let handler_id = T::Handlers::handler_id::<Request<R>>();
let send_tag = AbstractProcessTag::from_u6(handler_id);
let (receive_tag, _) = AbstractProcessTag::extract_u6_data(send_tag);
unsafe {
let process: Process<RequestMessage<R, T::Response, T::Serializer>, T::Serializer> =
mem::transmute(self.process);
match process.tag_send_receive(send_tag, receive_tag, message, timeout) {
MailboxResult::Ok(MessageSignal::Message(message)) => Ok(message),
MailboxResult::Err(MailboxError::TimedOut) => Err(Timeout),
_ => unreachable!("send_receive should panic in case of other errors"),
}
}
}
#[track_caller]
pub fn deferred_request<R: 'static>(&self, request: R) -> T::Response
where
T: DeferredRequestHandler<R>,
T::Serializer: CanSerialize<R>,
T::Serializer: CanSerialize<T::Response>,
T::Serializer: CanSerialize<RequestMessage<R, T::Response, T::Serializer>>,
{
self.deferred_request_timeout(request, None).unwrap()
}
#[track_caller]
pub(crate) fn deferred_request_timeout<R: 'static>(
&self,
request: R,
timeout: Option<Duration>,
) -> Result<T::Response, Timeout>
where
T: DeferredRequestHandler<R>,
T::Serializer: CanSerialize<R>,
T::Serializer: CanSerialize<T::Response>,
T::Serializer: CanSerialize<RequestMessage<R, T::Response, T::Serializer>>,
{
let return_address = ReturnAddress::from_self();
let message = RequestMessage(request, return_address);
let handler_id = T::Handlers::handler_id::<DeferredRequest<R>>();
let send_tag = AbstractProcessTag::from_u6(handler_id);
let (receive_tag, _) = AbstractProcessTag::extract_u6_data(send_tag);
unsafe {
let process: Process<RequestMessage<R, T::Response, T::Serializer>, T::Serializer> =
mem::transmute(self.process);
match process.tag_send_receive(send_tag, receive_tag, message, timeout) {
MailboxResult::Ok(MessageSignal::Message(message)) => Ok(message),
MailboxResult::Err(MailboxError::TimedOut) => Err(Timeout),
_ => unreachable!("send_receive should panic in case of other errors"),
}
}
}
pub fn with_timeout(self, timeout: Duration) -> WithTimeout<ProcessRef<T>> {
WithTimeout::from(timeout, self)
}
pub fn with_delay(self, timeout: Duration) -> WithDelay<ProcessRef<T>> {
WithDelay::from(timeout, self)
}
}
impl<T> Debug for ProcessRef<T>
where
T: AbstractProcess,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = format!("ProcessRef<{}>", type_name::<T>());
f.debug_struct(&name)
.field("id", &self.process.id())
.finish()
}
}
impl<T> Clone for ProcessRef<T>
where
T: AbstractProcess,
{
fn clone(&self) -> Self {
ProcessRef {
process: self.process,
}
}
}
impl<T> PartialEq for ProcessRef<T>
where
T: AbstractProcess,
{
fn eq(&self, other: &Self) -> bool {
self.process == other.process
}
}
impl<T> Eq for ProcessRef<T> where T: AbstractProcess {}
#[derive(serde::Serialize, serde::Deserialize)]
pub enum StartupError<AP: AbstractProcess> {
InitPanicked,
#[serde(bound(serialize = "", deserialize = ""))]
NameAlreadyRegistered(ProcessRef<AP>),
Custom(AP::StartupError),
}
impl<AP: AbstractProcess> Debug for StartupError<AP>
where
AP::StartupError: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InitPanicked => write!(f, "InitPanicked"),
Self::NameAlreadyRegistered(arg0) => {
f.debug_tuple("NameAlreadyRegistered").field(arg0).finish()
}
Self::Custom(arg0) => f.debug_tuple("Custom").field(arg0).finish(),
}
}
}
impl<AP: AbstractProcess> Clone for StartupError<AP>
where
AP::StartupError: Clone,
{
fn clone(&self) -> Self {
match self {
Self::InitPanicked => Self::InitPanicked,
Self::NameAlreadyRegistered(arg0) => Self::NameAlreadyRegistered(*arg0),
Self::Custom(arg0) => Self::Custom(arg0.clone()),
}
}
}
impl<AP: AbstractProcess> PartialEq for StartupError<AP>
where
AP::StartupError: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::NameAlreadyRegistered(l0), Self::NameAlreadyRegistered(r0)) => l0 == r0,
(Self::Custom(l0), Self::Custom(r0)) => l0 == r0,
_ => core::mem::discriminant(self) == core::mem::discriminant(other),
}
}
}
impl<AP: AbstractProcess> Eq for StartupError<AP> where AP::StartupError: Eq {}