dvcompute_cons/simulation/comm/
message.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9use serde::ser::Serialize;
10use serde::de::DeserializeOwned;
11
12use bincode;
13
14use crate::simulation;
15use crate::simulation::comm::pid::*;
16use crate::simulation::internal::input_message_queue::*;
17use crate::simulation::internal::output_message_queue::*;
18use crate::simulation::point::*;
19use crate::simulation::event::*;
20use crate::simulation::observable::*;
21use crate::simulation::observable::observer::*;
22use crate::simulation::observable::disposable::*;
23use crate::simulation::error::*;
24use crate::simulation::utils::byte_vec::*;
25
26/// Send a message to the specified logical process with the given receive time that should always
27/// be greater than the current modeling time.
28///
29/// The gap between the current time and the receive time can be very small, but it should always be
30/// positive. Unlike the conservative methods of distributed discrete event simulation, there is no
31/// restriction on the value of this gap. For example, it can be one millionth of the modeling time unit.
32pub fn enqueue_message<T>(pid: &LogicalProcessId, time: f64, message: T) -> EnqueueMessage<T>
33    where T: Serialize
34{
35    EnqueueMessage { pid: pid.clone(), time: time, message: message }
36}
37
38/// Send a message to the specified logical process with the receive time equaled to the current time,
39/// which is very risky (please pay attention to this fact!). Use the `enqueue_message` function!
40///
41/// For example, this computation won't completely give the desired effect at the start modeling time.
42/// Also such a message may potentially lead to infinite loop, when rolling the simulation back in cascade.
43/// Therefore, you should always use the `enqueue_message` whenever possible by providing the receive time,
44/// which is greater than the current modeling time.
45///
46/// The gap between the current time and the receive time can be very small, but it should always be
47/// positive. Unlike the conservative methods of distributed discrete event simulation, there is no
48/// restriction on the value of this gap. For example, it can be one millionth of the modeling time unit.
49pub fn send_message<T>(pid: &LogicalProcessId, message: T) -> SendMessage<T>
50    where T: Serialize
51{
52    SendMessage { pid: pid.clone(), message: message }
53}
54
55/// An `Observable` triggered when another logical process send a message of the specified type
56/// to the current logical process.
57///
58/// The message is triggered at the specified receive time, which may cause a rollback of the
59/// simulation model within the current logical process, if this specified time occurs to be less
60/// than the current modeling time of the logical process. Moreover, the rollback can be cascading
61/// by involving other logical processes if required.
62pub fn message_received<T>() -> MessageReceived<T>
63    where T: DeserializeOwned
64{
65    MessageReceived { _phantom: PhantomData }
66}
67
68/// Allows sending messages to the specified remote process with the given receive time.
69#[must_use = "computations are lazy and do nothing unless to be run"]
70#[derive(Clone)]
71pub struct EnqueueMessage<T> {
72
73    /// The destination logical process identifier.
74    pid: LogicalProcessId,
75
76    /// The time of actuating the message.
77    time: f64,
78
79    /// The message to send.
80    message: T
81}
82
83impl<T> Event for EnqueueMessage<T>
84    where T: Serialize
85{
86    type Item = ();
87
88    #[doc(hidden)]
89    #[inline]
90    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
91        let EnqueueMessage { pid, time, message } = self;
92        assert!(time >= p.time, "The enqueue time cannot be less than the current time");
93        match bincode::serialize(&message) {
94            Result::Ok(vec) => {
95                let vec = ByteVecRepr::into_repr(Box::new(vec));
96                unsafe {
97                    extern_send(pid.rank(), time, vec, p);
98                }
99                Result::Ok(())
100            },
101            Result::Err(_) => {
102                let msg = String::from("Could not serialize the message");
103                let err = Error::panic(msg);
104                Result::Err(err)
105            }
106        }
107    }
108}
109
110/// Allows sending messages to the specified remote process with the receive time equaled to the current time.
111#[must_use = "computations are lazy and do nothing unless to be run"]
112#[derive(Clone)]
113pub struct SendMessage<T> {
114
115    /// The destination logical process identifier.
116    pid: LogicalProcessId,
117
118    /// The message to send.
119    message: T
120}
121
122impl<T> Event for SendMessage<T>
123    where T: Serialize
124{
125    type Item = ();
126
127    #[doc(hidden)]
128    #[inline]
129    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
130        let SendMessage { pid, message } = self;
131        let comp = EnqueueMessage { pid: pid, time: p.time, message: message };
132        comp.call_event(p)
133    }
134}
135
136/// Allows receiving the messages of the specified type from logical processes.
137#[must_use = "computations are lazy and do nothing unless to be run"]
138#[derive(Clone)]
139pub struct MessageReceived<T> {
140
141    /// To keep the type parameter.
142    _phantom: PhantomData<T>
143}
144
145impl<T> Observable for MessageReceived<T>
146    where T: DeserializeOwned
147{
148    type Message = T;
149
150    #[inline]
151    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
152        where O: Observer<Message = Self::Message, Item = ()> + 'static
153    {
154        cons_event(move |p| {
155            let r = p.run;
156            let observer = cons_observer(move |m: &&[u8], p: &Point| {
157                match bincode::deserialize(*m) {
158                    Result::Ok(msg) => {
159                        observer.call_observer(&msg, p)
160                    },
161                    Result::Err(_) => {
162                        // this is not a message of the type we need
163                        Result::Ok(())
164                    }
165                }
166            });
167            let observer = observer.into_boxed();
168            let observer = ObserverRepr::into_repr(observer);
169            let comp = unsafe {
170                subscribe_to_extern_input_messages(r.input_messages, observer)
171            };
172            let h = comp.call_event(p)?;
173            let h = h.into_boxed();
174
175            Result::Ok(h)
176        }).into_boxed()
177    }
178}