dvcompute_gpss_branch/simulation/block/
generator.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9use dvcompute_utils::simulation::arrival::*;
10use dvcompute_utils::grc::Grc;
11
12use dvcompute_dist::simulation::simulation::*;
13use dvcompute_dist::simulation::event::*;
14use dvcompute_dist::simulation::observable::*;
15use dvcompute_dist::simulation::observable::observer::*;
16use dvcompute_dist::simulation::observable::disposable::*;
17use dvcompute_dist::simulation::stream::*;
18
19use crate::simulation::block::*;
20use crate::simulation::block::ops::*;
21use crate::simulation::transact::*;
22
23/// It represents a chain of `Block` computations.
24#[must_use = "computations are lazy and do nothing unless to be run"]
25pub struct BlockChain<T> where T: 'static {
26    gen: Grc<Box<dyn Fn() -> BlockBox<T, ()>>>
27}
28
29impl<T> BlockChain<T> {
30
31    /// Create a new chain of computations.
32    #[doc(hidden)]
33    #[inline]
34    pub fn new<F>(f: F) -> Self
35        where F: Fn() -> BlockBox<T, ()> + Clone + 'static
36    {
37        BlockChain {
38            gen: Grc::new(Box::new(move || f.clone()()))
39        }
40    }
41
42    /// Call the chain function.
43    #[doc(hidden)]
44    #[inline]
45    pub fn call_chain(&self) -> BlockBox<T, ()> {
46        (self.gen)()
47    }
48}
49
50impl<T> Clone for BlockChain<T> {
51
52    fn clone(&self) -> Self {
53        let gen = &self.gen;
54        BlockChain {
55            gen: gen.clone()
56        }
57    }
58}
59
60/// The generator block.
61pub trait GeneratorBlock<T>: Block<Input = BlockFn<T, ()>, Output = ()> where T: 'static {}
62
63/// Return a generator block by the specified observable using zero priority.
64#[inline]
65pub fn observable_generator_block0<T, O>(observable: O) -> impl GeneratorBlock<Transact<T>> + Clone
66    where O: Observable<Message = Arrival<T>> + Clone + 'static,
67          T: Clone + 'static
68{
69    observable_generator_block(observable, 0)
70}
71
72/// Return a generator block by the specified observable and priority.
73#[inline]
74pub fn observable_generator_block<T, O>(observable: O, priority: isize) -> impl GeneratorBlock<Transact<T>> + Clone
75    where O: Observable<Message = Arrival<T>> + Clone + 'static,
76          T: Clone + 'static
77{
78    observable_generator_block_c(observable, move || { return_event(priority) })
79}
80
81/// Return a generator block by the specified observable and priority.
82#[inline]
83pub fn observable_generator_block_c<T, O, F, M>(observable: O, priority: F) -> impl GeneratorBlock<Transact<T>> + Clone
84    where O: Observable<Message = Arrival<T>> + Clone + 'static,
85          F: Fn() -> M + Clone + 'static,
86          M: Event<Item = isize> + Clone + 'static,
87          T: Clone + 'static
88{
89    ObservableGenerator { observable: observable, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
90}
91
92/// Return a generator block by the specified stream using zero priority.
93#[inline]
94pub fn stream_generator_block0<T>(stream: Stream<Arrival<T>>) -> impl GeneratorBlock<Transact<T>>
95    where T: Clone + 'static
96{
97    stream_generator_block(stream, 0)
98}
99
100/// Return a generator block by the specified stream and priority.
101#[inline]
102pub fn stream_generator_block<T>(stream: Stream<Arrival<T>>, priority: isize) -> impl GeneratorBlock<Transact<T>>
103    where T: Clone + 'static
104{
105    stream_generator_block_c(stream, move || { return_event(priority) })
106}
107
108/// Return a generator block by the specified stream and priority.
109#[inline]
110pub fn stream_generator_block_c<T, F, M>(stream: Stream<Arrival<T>>, priority: F) -> impl GeneratorBlock<Transact<T>>
111    where F: Fn() -> M + Clone + 'static,
112          M: Event<Item = isize> + Clone + 'static,
113          T: Clone + 'static
114{
115    StreamGenerator { stream: stream, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
116}
117
118/// The generator block by the specified observable and priority.
119#[must_use = "computations are lazy and do nothing unless to be run"]
120#[derive(Clone)]
121struct ObservableGenerator<T, O, F, M> {
122
123    /// The observable computation of time delays.
124    observable: O,
125
126    /// The computation of priorities.
127    priority: F,
128
129    /// To keep the type parameter.
130    _phantom1: PhantomData<M>,
131
132    /// To keep the type parameter.
133    _phantom2: PhantomData<T>
134}
135
136impl<T, O, F, M> Block for ObservableGenerator<T, O, F, M>
137    where O: Observable<Message = Arrival<T>> + Clone + 'static,
138          F: Fn() -> M + Clone + 'static,
139          M: Event<Item = isize> + Clone + 'static,
140          T: Clone + 'static
141{
142    type Input  = BlockFn<Transact<T>, ()>;
143    type Output = ();
144
145    #[doc(hidden)]
146    fn call_block<C>(self, a: Self::Input, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
147        where C: FnOnce(simulation::Result<Self::Output>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
148    {
149        let cont = ProcessBoxCont::new(cont);
150        self.call_block_boxed(a, cont, pid, p)
151    }
152
153    #[doc(hidden)]
154    fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
155        if is_process_cancelled(&pid, p) {
156            revoke_process_boxed(cont, pid, p)
157        } else {
158            let ObservableGenerator { observable, priority, _phantom1, _phantom2 } = self;
159            let h = {
160                observable.subscribe({
161                    cons_observer(move |arrival: &Arrival<T>, p| {
162                        let chain = a.clone();
163                        priority()
164                            .and_then(move |priority| {
165                                Transact::new(arrival.clone(), priority)
166                                    .into_event()
167                                    .and_then(move |t| {
168                                        TransactId::take(t.transact_id.clone())
169                                            .and_then(move |()| {
170                                                let block = chain.next();
171                                                block.run(t)
172                                            })
173                                            .run()
174                                    })
175                            })
176                            .call_event(p)
177                    })
178                }).call_event(p)?
179            };
180            never_process()
181                .finally({
182                    h.into_event().into_process()
183                })
184                .call_process_boxed(cont, pid, p)
185        }
186    }
187}
188
189impl<T, O, F, M> GeneratorBlock<Transact<T>> for ObservableGenerator<T, O, F, M>
190    where O: Observable<Message = Arrival<T>> + Clone + 'static,
191          F: Fn() -> M + Clone + 'static,
192          M: Event<Item = isize> + Clone + 'static,
193          T: Clone + 'static
194{}
195
196/// The generator block by the specified stream and priority.
197#[must_use = "computations are lazy and do nothing unless to be run"]
198#[derive(Clone)]
199struct StreamGenerator<T, F, M> where T: 'static {
200
201  /// The stream computation of time delays.
202  stream: Stream<Arrival<T>>,
203
204  /// The computation of priorities.
205  priority: F,
206
207  /// To keep the type parameter.
208  _phantom1: PhantomData<M>,
209
210  /// To keep the type parameter.
211  _phantom2: PhantomData<T>
212}
213
214impl<T, F, M> Block for StreamGenerator<T, F, M>
215    where F: Fn() -> M + Clone + 'static,
216          M: Event<Item = isize> + Clone + 'static,
217          T: Clone + 'static
218{
219    type Input  = BlockFn<Transact<T>, ()>;
220    type Output = ();
221
222    #[doc(hidden)]
223    fn call_block<C>(self, a: Self::Input, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
224        where C: FnOnce(simulation::Result<Self::Output>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
225    {
226        let cont = ProcessBoxCont::new(cont);
227        self.call_block_boxed(a, cont, pid, p)
228    }
229
230    #[doc(hidden)]
231    fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
232        if is_process_cancelled(&pid, p) {
233            revoke_process_boxed(cont, pid, p)
234        } else {
235            let StreamGenerator { stream, priority, _phantom1, _phantom2 } = self;
236            generator_stream_loop(stream, priority, a)
237                .call_process_boxed(cont, pid, p)
238        }
239    }
240}
241
242/// Handle the specified stream.
243fn generator_stream_loop<T, F, M>(stream: Stream<Arrival<T>>, priority: F, a: BlockFn<Transact<T>, ()>) -> ProcessBox<()>
244    where F: Fn() -> M + Clone + 'static,
245          M: Event<Item = isize> + Clone + 'static,
246          T: Clone + 'static
247{
248    let Stream::Cons(comp) = stream;
249    comp.and_then(move |(arrival, xs)| {
250        let chain = a.clone();
251        priority().and_then(move |priority| {
252            Transact::new(arrival, priority)
253                .into_event()
254                .and_then(move |t| {
255                    TransactId::take(t.transact_id.clone())
256                        .and_then(move |()| {
257                            let block = chain.next();
258                            block.run(t)
259                        })
260                        .run()
261                })
262        })
263        .into_process()
264        .and_then(move |()| {
265            generator_stream_loop(xs, priority, a)
266        })
267    }).into_boxed()
268}
269
270impl<T, F, M> GeneratorBlock<Transact<T>> for StreamGenerator<T, F, M>
271    where F: Fn() -> M + Clone + 'static,
272          M: Event<Item = isize> + Clone + 'static,
273          T: Clone + 'static
274{}