dvcompute 2.0.0

Discrete event simulation library (sequential simulation)
Documentation
// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::rc::Rc;

use crate::simulation::error::*;
use crate::simulation::parameter::*;
use crate::simulation::parameter::random::*;
use crate::simulation::event::*;
use crate::simulation::process::*;
use crate::simulation::stream::*;

use dvcompute_utils::simulation::arrival::*;

/// Return a stream of random events that arrive with the specified delay.
pub fn random_stream<T, F, M>(f: F) -> Stream<Arrival<T>>
    where F: Fn() -> M + 'static,
          M: Parameter<Item = (f64, T)> + 'static,
          T: 'static
{
    random_stream_loop(f, None)
}

/// Return a stream of random events that arrive with the specified delay and start time.
fn random_stream_loop<T, F, M>(f: F, t0: Option<f64>) -> Stream<Arrival<T>>
    where F: Fn() -> M + 'static,
          M: Parameter<Item = (f64, T)> + 'static,
          T: 'static
{
    let comp = {
        time_event()
            .and_then(move |t1| {
                cons_event(move |_p| {
                    match t0 {
                        None => Result::Ok(()),
                        Some(t0) if t1 == t0 => Result::Ok(()),
                        Some(_) => {
                            let msg =
                                "The time of requesting for a new random event is different from \
                                 the time when the previous event has arrived. Probably, your model \
                                 contains a logical error. The random events should be requested permanently. \
                                 At least, they can be lost, for example, when trying to enqueue them, but \
                                 the random stream itself must always work.";
                            let msg = String::from(msg);
                            let err = Error::retry(msg);
                            Result::Err(err)
                        }
                    }
                })
            })
            .into_process()
            .and_then(move |()| {
                let p = f();
                p.into_process()
                    .and_then(move |(delay, a)| {
                        let delay = delay.max(0.0);
                        hold_process(delay)
                            .and_then(move |()| {
                                time_event()
                                    .into_process()
                                    .and_then(move |t2| {
                                        let arrival = Arrival {
                                            value: a,
                                            time: t2,
                                            delay: {
                                                match t0 {
                                                    None => None,
                                                    Some(_) => Some(delay)
                                                }
                                            }
                                        };
                                        return_process((arrival, random_stream_loop(f, Some(t2))))
                                    })
                            })
                    })
            })
            .into_boxed()
    };
    Stream::Cons(comp)
}

/// A stream with random time delays distributed uniformly.
#[inline]
pub fn random_uniform_stream(min: f64, max: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_uniform_parameter(min, max)
            .map(|x| (x, x))
    })
}

/// A stream with random integer time delays distributed uniformly.
#[inline]
pub fn random_int_uniform_stream(min: isize, max: isize) -> Stream<Arrival<isize>> {
    random_stream(move || {
        random_int_uniform_parameter(min, max)
            .map(|x| (x as f64, x))
    })
}

/// A stream with random time delays from the triangular distribution.
#[inline]
pub fn random_triangular_stream(min: f64, median: f64, max: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_triangular_parameter(min, median, max)
            .map(|x| (x, x))
    })
}

/// A stream with random time delays distributed normally.
#[inline]
pub fn random_normal_stream(mu: f64, nu: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_normal_parameter(mu, nu)
            .map(|x| {
                let x = x.max(0.0);
                (x, x)
            })
    })
}

/// A stream with random time delays distributed lognormally.
#[inline]
pub fn random_log_normal_stream(mu: f64, nu: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_log_normal_parameter(mu, nu)
            .map(|x| (x, x))
    })
}

/// A stream with exponential random time delays with the specified mean.
#[inline]
pub fn random_exponential_stream(mu: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_exponential_parameter(mu)
            .map(|x| (x, x))
    })
}

/// A stream with random time delays from the Erlang distribution with the specified scale
/// (the reciprocal of the rate) and integer shape.
#[inline]
pub fn random_erlang_stream(scale: f64, shape: isize) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_erlang_parameter(scale, shape)
            .map(|x| (x, x))
    })
}

/// A stream with the Poisson random time delays with the specified mean.
#[inline]
pub fn random_poisson_stream(mu: f64) -> Stream<Arrival<isize>> {
    random_stream(move || {
        random_poisson_parameter(mu)
            .map(|x| (x as f64, x))
    })
}

/// A stream with binomial random time delays with the specified probability and trials.
#[inline]
pub fn random_binomial_stream(prob: f64, trials: isize) -> Stream<Arrival<isize>> {
    random_stream(move || {
        random_binomial_parameter(prob, trials)
            .map(|x| (x as f64, x))
    })
}

/// A stream with random time delays from the Gamma distribution.
#[inline]
pub fn random_gamma_stream(kappa: f64, theta: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_gamma_parameter(kappa, theta)
            .map(|x| (x, x))
    })
}

/// A stream with random time delays from the Beta distribution.
#[inline]
pub fn random_beta_stream(alpha: f64, beta: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_beta_parameter(alpha, beta)
            .map(|x| (x, x))
    })
}

/// A stream with random time delays from the Weibull distribution.
#[inline]
pub fn random_weibull_stream(alpha: f64, beta: f64) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_weibull_parameter(alpha, beta)
            .map(|x| (x, x))
    })
}

/// A stream with random time delays from the specified discrete distribution.
#[inline]
pub fn random_discrete_stream(dpdf: Rc<Vec<(f64, f64)>>) -> Stream<Arrival<f64>> {
    random_stream(move || {
        random_discrete_parameter(dpdf.clone())
            .map(|x| (x, x))
    })
}