dvcompute_cons/simulation/comm/
message.rs1use std::marker::PhantomData;
8
9use serde::ser::Serialize;
10use serde::de::DeserializeOwned;
11
12use bincode;
13
14use crate::simulation;
15use crate::simulation::comm::pid::*;
16use crate::simulation::internal::input_message_queue::*;
17use crate::simulation::internal::output_message_queue::*;
18use crate::simulation::point::*;
19use crate::simulation::event::*;
20use crate::simulation::observable::*;
21use crate::simulation::observable::observer::*;
22use crate::simulation::observable::disposable::*;
23use crate::simulation::error::*;
24use crate::simulation::utils::byte_vec::*;
25
26pub fn enqueue_message<T>(pid: &LogicalProcessId, time: f64, message: T) -> EnqueueMessage<T>
33 where T: Serialize
34{
35 EnqueueMessage { pid: pid.clone(), time: time, message: message }
36}
37
38pub fn send_message<T>(pid: &LogicalProcessId, message: T) -> SendMessage<T>
50 where T: Serialize
51{
52 SendMessage { pid: pid.clone(), message: message }
53}
54
55pub fn message_received<T>() -> MessageReceived<T>
63 where T: DeserializeOwned
64{
65 MessageReceived { _phantom: PhantomData }
66}
67
68#[must_use = "computations are lazy and do nothing unless to be run"]
70#[derive(Clone)]
71pub struct EnqueueMessage<T> {
72
73 pid: LogicalProcessId,
75
76 time: f64,
78
79 message: T
81}
82
83impl<T> Event for EnqueueMessage<T>
84 where T: Serialize
85{
86 type Item = ();
87
88 #[doc(hidden)]
89 #[inline]
90 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
91 let EnqueueMessage { pid, time, message } = self;
92 assert!(time >= p.time, "The enqueue time cannot be less than the current time");
93 match bincode::serialize(&message) {
94 Result::Ok(vec) => {
95 let vec = ByteVecRepr::into_repr(Box::new(vec));
96 unsafe {
97 extern_send(pid.rank(), time, vec, p);
98 }
99 Result::Ok(())
100 },
101 Result::Err(_) => {
102 let msg = String::from("Could not serialize the message");
103 let err = Error::panic(msg);
104 Result::Err(err)
105 }
106 }
107 }
108}
109
110#[must_use = "computations are lazy and do nothing unless to be run"]
112#[derive(Clone)]
113pub struct SendMessage<T> {
114
115 pid: LogicalProcessId,
117
118 message: T
120}
121
122impl<T> Event for SendMessage<T>
123 where T: Serialize
124{
125 type Item = ();
126
127 #[doc(hidden)]
128 #[inline]
129 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
130 let SendMessage { pid, message } = self;
131 let comp = EnqueueMessage { pid: pid, time: p.time, message: message };
132 comp.call_event(p)
133 }
134}
135
136#[must_use = "computations are lazy and do nothing unless to be run"]
138#[derive(Clone)]
139pub struct MessageReceived<T> {
140
141 _phantom: PhantomData<T>
143}
144
145impl<T> Observable for MessageReceived<T>
146 where T: DeserializeOwned
147{
148 type Message = T;
149
150 #[inline]
151 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
152 where O: Observer<Message = Self::Message, Item = ()> + 'static
153 {
154 cons_event(move |p| {
155 let r = p.run;
156 let observer = cons_observer(move |m: &&[u8], p: &Point| {
157 match bincode::deserialize(*m) {
158 Result::Ok(msg) => {
159 observer.call_observer(&msg, p)
160 },
161 Result::Err(_) => {
162 Result::Ok(())
164 }
165 }
166 });
167 let observer = observer.into_boxed();
168 let observer = ObserverRepr::into_repr(observer);
169 let comp = unsafe {
170 subscribe_to_extern_input_messages(r.input_messages, observer)
171 };
172 let h = comp.call_event(p)?;
173 let h = h.into_boxed();
174
175 Result::Ok(h)
176 }).into_boxed()
177 }
178}