dvcompute_cons/simulation/stream/
random.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::rc::Rc;
8
9use crate::simulation::error::*;
10use crate::simulation::parameter::*;
11use crate::simulation::parameter::random::*;
12use crate::simulation::event::*;
13use crate::simulation::process::*;
14use crate::simulation::stream::*;
15
16use dvcompute_utils::simulation::arrival::*;
17
18/// Return a stream of random events that arrive with the specified delay.
19pub fn random_stream<T, F, M>(f: F) -> Stream<Arrival<T>>
20    where F: Fn() -> M + 'static,
21          M: Parameter<Item = (f64, T)> + 'static,
22          T: 'static
23{
24    random_stream_loop(f, None)
25}
26
27/// Return a stream of random events that arrive with the specified delay and start time.
28fn random_stream_loop<T, F, M>(f: F, t0: Option<f64>) -> Stream<Arrival<T>>
29    where F: Fn() -> M + 'static,
30          M: Parameter<Item = (f64, T)> + 'static,
31          T: 'static
32{
33    let comp = {
34        time_event()
35            .and_then(move |t1| {
36                cons_event(move |_p| {
37                    match t0 {
38                        None => Result::Ok(()),
39                        Some(t0) if t1 == t0 => Result::Ok(()),
40                        Some(_) => {
41                            let msg =
42                                "The time of requesting for a new random event is different from \
43                                 the time when the previous event has arrived. Probably, your model \
44                                 contains a logical error. The random events should be requested permanently. \
45                                 At least, they can be lost, for example, when trying to enqueue them, but \
46                                 the random stream itself must always work.";
47                            let msg = String::from(msg);
48                            let err = Error::retry(msg);
49                            Result::Err(err)
50                        }
51                    }
52                })
53            })
54            .into_process()
55            .and_then(move |()| {
56                let p = f();
57                p.into_process()
58                    .and_then(move |(delay, a)| {
59                        let delay = delay.max(0.0);
60                        hold_process(delay)
61                            .and_then(move |()| {
62                                time_event()
63                                    .into_process()
64                                    .and_then(move |t2| {
65                                        let arrival = Arrival {
66                                            value: a,
67                                            time: t2,
68                                            delay: {
69                                                match t0 {
70                                                    None => None,
71                                                    Some(_) => Some(delay)
72                                                }
73                                            }
74                                        };
75                                        return_process((arrival, random_stream_loop(f, Some(t2))))
76                                    })
77                            })
78                    })
79            })
80            .into_boxed()
81    };
82    Stream::Cons(comp)
83}
84
85/// A stream with random time delays distributed uniformly.
86#[inline]
87pub fn random_uniform_stream(min: f64, max: f64) -> Stream<Arrival<f64>> {
88    random_stream(move || {
89        random_uniform_parameter(min, max)
90            .map(|x| (x, x))
91    })
92}
93
94/// A stream with random integer time delays distributed uniformly.
95#[inline]
96pub fn random_int_uniform_stream(min: isize, max: isize) -> Stream<Arrival<isize>> {
97    random_stream(move || {
98        random_int_uniform_parameter(min, max)
99            .map(|x| (x as f64, x))
100    })
101}
102
103/// A stream with random time delays from the triangular distribution.
104#[inline]
105pub fn random_triangular_stream(min: f64, median: f64, max: f64) -> Stream<Arrival<f64>> {
106    random_stream(move || {
107        random_triangular_parameter(min, median, max)
108            .map(|x| (x, x))
109    })
110}
111
112/// A stream with random time delays distributed normally.
113#[inline]
114pub fn random_normal_stream(mu: f64, nu: f64) -> Stream<Arrival<f64>> {
115    random_stream(move || {
116        random_normal_parameter(mu, nu)
117            .map(|x| {
118                let x = x.max(0.0);
119                (x, x)
120            })
121    })
122}
123
124/// A stream with random time delays distributed lognormally.
125#[inline]
126pub fn random_log_normal_stream(mu: f64, nu: f64) -> Stream<Arrival<f64>> {
127    random_stream(move || {
128        random_log_normal_parameter(mu, nu)
129            .map(|x| (x, x))
130    })
131}
132
133/// A stream with exponential random time delays with the specified mean.
134#[inline]
135pub fn random_exponential_stream(mu: f64) -> Stream<Arrival<f64>> {
136    random_stream(move || {
137        random_exponential_parameter(mu)
138            .map(|x| (x, x))
139    })
140}
141
142/// A stream with random time delays from the Erlang distribution with the specified scale
143/// (the reciprocal of the rate) and integer shape.
144#[inline]
145pub fn random_erlang_stream(scale: f64, shape: isize) -> Stream<Arrival<f64>> {
146    random_stream(move || {
147        random_erlang_parameter(scale, shape)
148            .map(|x| (x, x))
149    })
150}
151
152/// A stream with the Poisson random time delays with the specified mean.
153#[inline]
154pub fn random_poisson_stream(mu: f64) -> Stream<Arrival<isize>> {
155    random_stream(move || {
156        random_poisson_parameter(mu)
157            .map(|x| (x as f64, x))
158    })
159}
160
161/// A stream with binomial random time delays with the specified probability and trials.
162#[inline]
163pub fn random_binomial_stream(prob: f64, trials: isize) -> Stream<Arrival<isize>> {
164    random_stream(move || {
165        random_binomial_parameter(prob, trials)
166            .map(|x| (x as f64, x))
167    })
168}
169
170/// A stream with random time delays from the Gamma distribution.
171#[inline]
172pub fn random_gamma_stream(kappa: f64, theta: f64) -> Stream<Arrival<f64>> {
173    random_stream(move || {
174        random_gamma_parameter(kappa, theta)
175            .map(|x| (x, x))
176    })
177}
178
179/// A stream with random time delays from the Beta distribution.
180#[inline]
181pub fn random_beta_stream(alpha: f64, beta: f64) -> Stream<Arrival<f64>> {
182    random_stream(move || {
183        random_beta_parameter(alpha, beta)
184            .map(|x| (x, x))
185    })
186}
187
188/// A stream with random time delays from the Weibull distribution.
189#[inline]
190pub fn random_weibull_stream(alpha: f64, beta: f64) -> Stream<Arrival<f64>> {
191    random_stream(move || {
192        random_weibull_parameter(alpha, beta)
193            .map(|x| (x, x))
194    })
195}
196
197/// A stream with random time delays from the specified discrete distribution.
198#[inline]
199pub fn random_discrete_stream(dpdf: Rc<Vec<(f64, f64)>>) -> Stream<Arrival<f64>> {
200    random_stream(move || {
201        random_discrete_parameter(dpdf.clone())
202            .map(|x| (x, x))
203    })
204}