use std::any::TypeId;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::time::Duration;
use std::{any, fmt};
use crate::function::process::IntoProcess;
use crate::mailbox::MailboxError;
use crate::serializer::{Bincode, CanSerialize};
use crate::{host, LunaticError, Mailbox, Process, ProcessConfig, Tag};
#[derive(serde::Serialize, serde::Deserialize, Debug, Hash)]
pub struct ProtocolCapture<C> {
process: Process<()>,
tag: Tag,
capture: C,
}
#[derive(Hash)]
pub struct Protocol<P: 'static, S = Bincode, Z: 'static = ()> {
id: u64,
node_id: u64,
tag: Tag,
phantom: PhantomData<(P, S, Z)>,
}
impl<P: 'static, S, Z: 'static> Drop for Protocol<P, S, Z> {
fn drop(&mut self) {
if TypeId::of::<P>() != TypeId::of::<End>() && TypeId::of::<P>() != TypeId::of::<TaskEnd>()
{
panic!(
"Protocol prematurely dropped, before reaching the `End` or `TaskEnd` state (currently: {}).",
std::any::type_name::<P>()
);
}
}
}
impl<P, S, Z> Protocol<P, S, Z> {
pub fn id(&self) -> u64 {
self.id
}
pub fn node_id(&self) -> u64 {
self.node_id
}
pub fn tag(&self) -> Tag {
self.tag
}
pub fn from_process<M, S2>(process: Process<M, S2>) -> Self {
Self::from_process_with_tag(process, Tag::new())
}
pub fn from_process_with_tag<M, S2>(process: Process<M, S2>, tag: Tag) -> Self {
Self {
id: process.id(),
node_id: process.node_id(),
tag,
phantom: PhantomData,
}
}
fn cast<P2, Z2>(self) -> Protocol<P2, S, Z2> {
let self_ = ManuallyDrop::new(self);
Protocol {
id: self_.id,
node_id: self_.node_id,
tag: self_.tag,
phantom: PhantomData,
}
}
}
impl<P, A, S, Z> Protocol<Send<A, P>, S, Z>
where
S: CanSerialize<A>,
{
#[must_use]
pub fn send(self, message: A) -> Protocol<P, S, Z> {
let self_ = ManuallyDrop::new(self);
let process: Process<A, S> = unsafe { Process::new(self_.node_id, self_.id) };
process.tag_send(self_.tag, message);
Protocol::from_process_with_tag(process, self_.tag)
}
}
impl<P, A, S, Z> Protocol<Recv<A, P>, S, Z>
where
S: CanSerialize<A>,
{
#[must_use]
pub fn receive(self) -> (Protocol<P, S, Z>, A) {
let mailbox: Mailbox<A, S> = unsafe { Mailbox::new() };
let received = mailbox.tag_receive(&[self.tag]);
(self.cast(), received)
}
}
impl<A, S, Z> Protocol<Recv<A, TaskEnd>, S, Z>
where
S: CanSerialize<A>,
{
#[must_use]
pub fn result(self) -> A {
let mailbox: Mailbox<A, S> = unsafe { Mailbox::new() };
let result = mailbox.tag_receive(&[self.tag]);
let _: Protocol<TaskEnd, S, Z> = self.cast(); result
}
pub fn result_timeout(self, duration: Duration) -> Result<A, MailboxError> {
let mailbox: Mailbox<A, S> = unsafe { Mailbox::new() };
let result = mailbox.tag_receive_timeout(&[self.tag], duration);
let _: Protocol<TaskEnd, S, Z> = self.cast(); result
}
}
impl<P, Q, S, Z> Protocol<Choose<P, Q>, S, Z>
where
S: CanSerialize<bool>,
{
#[must_use]
pub fn select_left(self) -> Protocol<P, S, Z> {
let self_ = ManuallyDrop::new(self);
let process: Process<bool, S> = unsafe { Process::new(self_.node_id, self_.id) };
process.tag_send(self_.tag, true);
Protocol::from_process_with_tag(process, self_.tag)
}
#[must_use]
pub fn select_right(self) -> Protocol<Q, S, Z> {
let self_ = ManuallyDrop::new(self);
let process: Process<bool, S> = unsafe { Process::new(self_.node_id, self_.id) };
process.tag_send(self_.tag, false);
Protocol::from_process_with_tag(process, self_.tag)
}
}
impl<P, Q, S, Z> Protocol<Offer<P, Q>, S, Z>
where
S: CanSerialize<bool>,
{
#[must_use]
pub fn offer(self) -> Branch<Protocol<P, S, Z>, Protocol<Q, S, Z>> {
unsafe {
let mailbox: Mailbox<bool, S> = Mailbox::new();
if mailbox.receive() {
Branch::Left(self.cast())
} else {
Branch::Right(self.cast())
}
}
}
}
impl<P, S, Z> Protocol<Rec<P>, S, Z> {
#[must_use]
pub fn repeat(self) -> Protocol<P, S, Protocol<Rec<P>, S, Z>> {
self.cast()
}
}
impl<P2, S, Z> Protocol<Pop, S, Protocol<P2, S, Z>> {
#[must_use]
pub fn pop(self) -> Protocol<P2, S, Z> {
self.cast()
}
}
impl<P, S, Z> From<Protocol<Rec<P>, S, Z>> for Protocol<P, S, Z> {
fn from(p: Protocol<Rec<P>, S, Z>) -> Self {
p.cast()
}
}
impl<P, S, Z> fmt::Debug for Protocol<P, S, Z> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Protocol")
.field("id", &self.id)
.field("node_id", &self.node_id)
.field("tag", &self.tag)
.field("protocol", &any::type_name::<P>())
.field("serializer", &any::type_name::<S>())
.field("Z", &any::type_name::<Z>())
.finish()
}
}
impl<P, M, S, S2> From<Process<M, S>> for Protocol<P, S2> {
fn from(process: Process<M, S>) -> Self {
Protocol::from_process(process)
}
}
pub struct TaskEnd;
pub struct End;
pub struct Recv<A, P>(PhantomData<(A, P)>);
pub struct Send<A, P>(PhantomData<(A, P)>);
pub struct Choose<P, Q>(PhantomData<(P, Q)>);
pub struct Offer<P, Q>(PhantomData<(P, Q)>);
pub struct Rec<P>(PhantomData<P>);
pub struct Pop;
pub trait HasDual: private::Sealed {
type Dual: HasDual;
}
impl HasDual for TaskEnd {
type Dual = TaskEnd;
}
impl HasDual for End {
type Dual = End;
}
impl<A, P: HasDual> HasDual for Send<A, P> {
type Dual = Recv<A, P::Dual>;
}
impl<A, P: HasDual> HasDual for Recv<A, P> {
type Dual = Send<A, P::Dual>;
}
impl<P: HasDual, Q: HasDual> HasDual for Choose<P, Q> {
type Dual = Offer<P::Dual, Q::Dual>;
}
impl<P: HasDual, Q: HasDual> HasDual for Offer<P, Q> {
type Dual = Choose<P::Dual, Q::Dual>;
}
impl<P: HasDual> HasDual for Rec<P> {
type Dual = Rec<P::Dual>;
}
impl HasDual for Pop {
type Dual = Pop;
}
pub enum Branch<L, R> {
Left(L),
Right(R),
}
mod private {
use super::*;
pub trait Sealed {}
impl Sealed for TaskEnd {}
impl Sealed for End {}
impl<A, P> Sealed for Send<A, P> {}
impl<A, P> Sealed for Recv<A, P> {}
impl<P, Q> Sealed for Choose<P, Q> {}
impl<P, Q> Sealed for Offer<P, Q> {}
impl<P> Sealed for Rec<P> {}
impl Sealed for Pop {}
}
impl<P, S, Z> IntoProcess<P, S> for Protocol<P, S, Z>
where
P: HasDual,
{
type Process = Protocol<<P as HasDual>::Dual, S, Z>;
fn spawn<C>(
capture: C,
entry: fn(C, Protocol<P, S, Z>),
name: Option<&str>,
link: Option<Tag>,
config: Option<&ProcessConfig>,
node: Option<u64>,
) -> Result<Self::Process, LunaticError>
where
S: CanSerialize<ProtocolCapture<C>>,
{
let entry = entry as usize as i32;
let node_id = node.unwrap_or_else(host::node_id);
match host::spawn(
name,
node,
config,
link,
type_helper_wrapper::<C, P, S, Z>,
entry,
) {
Ok(id) => {
let tag = Tag::new();
let this = unsafe { Process::<()>::new(host::node_id(), host::process_id()) };
let capture = ProtocolCapture {
process: this,
tag,
capture,
};
let child = unsafe { Process::<ProtocolCapture<C>, S>::new(node_id, id) };
child.send(capture);
Ok(Protocol::from_process_with_tag(child, tag))
}
Err(err) => Err(err),
}
}
}
fn type_helper_wrapper<C, P, S, Z>(function: i32)
where
S: CanSerialize<ProtocolCapture<C>>,
P: HasDual + 'static,
Z: 'static,
{
let p_capture = unsafe { Mailbox::<ProtocolCapture<C>, S>::new() }.receive();
let capture = p_capture.capture;
let protocol = Protocol::from_process_with_tag(p_capture.process, p_capture.tag);
let function: fn(C, Protocol<P, S, Z>) = unsafe { std::mem::transmute(function as usize) };
function(capture, protocol);
}
#[cfg(test)]
mod tests {
use lunatic_test::test;
use super::*;
type AddProtocol = Recv<i32, Recv<i32, Send<i32, End>>>;
#[test]
fn protocol() {
let child = Process::spawn_link(1, |capture: i32, protocol: Protocol<AddProtocol>| {
assert_eq!(capture, 1);
let (protocol, a) = protocol.receive();
let (protocol, b) = protocol.receive();
let _ = protocol.send(capture + a + b);
});
let child = child.send(2);
let child = child.send(2);
let (_, result) = child.receive();
assert_eq!(result, 5);
}
}