dvcompute 2.0.0

Discrete event simulation library (sequential simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::marker::PhantomData;

use serde::ser::Serialize;
use serde::de::DeserializeOwned;

use bincode;

use crate::simulation;
use crate::simulation::comm::pid::*;
use crate::simulation::internal::input_message_queue::*;
use crate::simulation::internal::output_message_queue::*;
use crate::simulation::point::*;
use crate::simulation::event::*;
use crate::simulation::observable::*;
use crate::simulation::observable::observer::*;
use crate::simulation::observable::disposable::*;
use crate::simulation::error::*;
use crate::simulation::utils::byte_vec::*;

/// Send a message to the specified logical process with the given receive time that should always
/// be greater than the current modeling time.
///
/// The gap between the current time and the receive time can be very small, but it should always be
/// positive. Unlike the conservative methods of distributed discrete event simulation, there is no
/// restriction on the value of this gap. For example, it can be one millionth of the modeling time unit.
pub fn enqueue_message<T>(pid: &LogicalProcessId, time: f64, message: T) -> EnqueueMessage<T>
    where T: Serialize
{
    EnqueueMessage { pid: pid.clone(), time: time, message: message }
}

/// Send a message to the specified logical process with the receive time equaled to the current time,
/// which is very risky (please pay attention to this fact!). Use the `enqueue_message` function!
///
/// For example, this computation won't completely give the desired effect at the start modeling time.
/// Also such a message may potentially lead to infinite loop, when rolling the simulation back in cascade.
/// Therefore, you should always use the `enqueue_message` whenever possible by providing the receive time,
/// which is greater than the current modeling time.
///
/// The gap between the current time and the receive time can be very small, but it should always be
/// positive. Unlike the conservative methods of distributed discrete event simulation, there is no
/// restriction on the value of this gap. For example, it can be one millionth of the modeling time unit.
pub fn send_message<T>(pid: &LogicalProcessId, message: T) -> SendMessage<T>
    where T: Serialize
{
    SendMessage { pid: pid.clone(), message: message }
}

/// An `Observable` triggered when another logical process send a message of the specified type
/// to the current logical process.
///
/// The message is triggered at the specified receive time, which may cause a rollback of the
/// simulation model within the current logical process, if this specified time occurs to be less
/// than the current modeling time of the logical process. Moreover, the rollback can be cascading
/// by involving other logical processes if required.
pub fn message_received<T>() -> MessageReceived<T>
    where T: DeserializeOwned
{
    MessageReceived { _phantom: PhantomData }
}

/// Allows sending messages to the specified remote process with the given receive time.
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct EnqueueMessage<T> {

    /// The destination logical process identifier.
    pid: LogicalProcessId,

    /// The time of actuating the message.
    time: f64,

    /// The message to send.
    message: T
}

impl<T> Event for EnqueueMessage<T>
    where T: Serialize
{
    type Item = ();

    #[doc(hidden)]
    #[inline]
    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
        let EnqueueMessage { pid, time, message } = self;
        assert!(time >= p.time, "The enqueue time cannot be less than the current time");
        match bincode::serialize(&message) {
            Result::Ok(vec) => {
                let vec = ByteVecRepr::into_repr(Box::new(vec));
                unsafe {
                    extern_send(pid.rank(), time, vec, p);
                }
                Result::Ok(())
            },
            Result::Err(_) => {
                let msg = String::from("Could not serialize the message");
                let err = Error::panic(msg);
                Result::Err(err)
            }
        }
    }
}

/// Allows sending messages to the specified remote process with the receive time equaled to the current time.
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct SendMessage<T> {

    /// The destination logical process identifier.
    pid: LogicalProcessId,

    /// The message to send.
    message: T
}

impl<T> Event for SendMessage<T>
    where T: Serialize
{
    type Item = ();

    #[doc(hidden)]
    #[inline]
    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
        let SendMessage { pid, message } = self;
        let comp = EnqueueMessage { pid: pid, time: p.time, message: message };
        comp.call_event(p)
    }
}

/// Allows receiving the messages of the specified type from logical processes.
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct MessageReceived<T> {

    /// To keep the type parameter.
    _phantom: PhantomData<T>
}

impl<T> Observable for MessageReceived<T>
    where T: DeserializeOwned
{
    type Message = T;

    #[inline]
    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
        where O: Observer<Message = Self::Message, Item = ()> + 'static
    {
        cons_event(move |p| {
            let r = p.run;
            let observer = cons_observer(move |m: &&[u8], p: &Point| {
                match bincode::deserialize(*m) {
                    Result::Ok(msg) => {
                        observer.call_observer(&msg, p)
                    },
                    Result::Err(_) => {
                        // this is not a message of the type we need
                        Result::Ok(())
                    }
                }
            });
            let observer = observer.into_boxed();
            let observer = ObserverRepr::into_repr(observer);
            let comp = unsafe {
                subscribe_to_extern_input_messages(r.input_messages, observer)
            };
            let h = comp.call_event(p)?;
            let h = h.into_boxed();

            Result::Ok(h)
        }).into_boxed()
    }
}