use std::marker::PhantomData;
use serde::ser::Serialize;
use serde::de::DeserializeOwned;
use bincode;
use crate::simulation;
use crate::simulation::comm::pid::*;
use crate::simulation::internal::input_message_queue::*;
use crate::simulation::internal::output_message_queue::*;
use crate::simulation::point::*;
use crate::simulation::event::*;
use crate::simulation::observable::*;
use crate::simulation::observable::observer::*;
use crate::simulation::observable::disposable::*;
use crate::simulation::error::*;
use crate::simulation::utils::byte_vec::*;
pub fn enqueue_message<T>(pid: &LogicalProcessId, time: f64, message: T) -> EnqueueMessage<T>
where T: Serialize
{
EnqueueMessage { pid: pid.clone(), time: time, message: message }
}
pub fn send_message<T>(pid: &LogicalProcessId, message: T) -> SendMessage<T>
where T: Serialize
{
SendMessage { pid: pid.clone(), message: message }
}
pub fn message_received<T>() -> MessageReceived<T>
where T: DeserializeOwned
{
MessageReceived { _phantom: PhantomData }
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct EnqueueMessage<T> {
pid: LogicalProcessId,
time: f64,
message: T
}
impl<T> Event for EnqueueMessage<T>
where T: Serialize
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let EnqueueMessage { pid, time, message } = self;
assert!(time >= p.time, "The enqueue time cannot be less than the current time");
match bincode::serialize(&message) {
Result::Ok(vec) => {
let vec = ByteVecRepr::into_repr(Box::new(vec));
unsafe {
extern_send(pid.rank(), time, vec, p);
}
Result::Ok(())
},
Result::Err(_) => {
let msg = String::from("Could not serialize the message");
let err = Error::panic(msg);
Result::Err(err)
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct SendMessage<T> {
pid: LogicalProcessId,
message: T
}
impl<T> Event for SendMessage<T>
where T: Serialize
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let SendMessage { pid, message } = self;
let comp = EnqueueMessage { pid: pid, time: p.time, message: message };
comp.call_event(p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct MessageReceived<T> {
_phantom: PhantomData<T>
}
impl<T> Observable for MessageReceived<T>
where T: DeserializeOwned
{
type Message = T;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
cons_event(move |p| {
let r = p.run;
let observer = cons_observer(move |m: &&[u8], p: &Point| {
match bincode::deserialize(*m) {
Result::Ok(msg) => {
observer.call_observer(&msg, p)
},
Result::Err(_) => {
Result::Ok(())
}
}
});
let observer = observer.into_boxed();
let observer = ObserverRepr::into_repr(observer);
let comp = unsafe {
subscribe_to_extern_input_messages(r.input_messages, observer)
};
let h = comp.call_event(p)?;
let h = h.into_boxed();
Result::Ok(h)
}).into_boxed()
}
}