dvcompute_gpss_branch/simulation/block/
generator.rs1use std::marker::PhantomData;
8
9use dvcompute_utils::simulation::arrival::*;
10use dvcompute_utils::grc::Grc;
11
12use dvcompute_dist::simulation::simulation::*;
13use dvcompute_dist::simulation::event::*;
14use dvcompute_dist::simulation::observable::*;
15use dvcompute_dist::simulation::observable::observer::*;
16use dvcompute_dist::simulation::observable::disposable::*;
17use dvcompute_dist::simulation::stream::*;
18
19use crate::simulation::block::*;
20use crate::simulation::block::ops::*;
21use crate::simulation::transact::*;
22
23#[must_use = "computations are lazy and do nothing unless to be run"]
25pub struct BlockChain<T> where T: 'static {
26 gen: Grc<Box<dyn Fn() -> BlockBox<T, ()>>>
27}
28
29impl<T> BlockChain<T> {
30
31 #[doc(hidden)]
33 #[inline]
34 pub fn new<F>(f: F) -> Self
35 where F: Fn() -> BlockBox<T, ()> + Clone + 'static
36 {
37 BlockChain {
38 gen: Grc::new(Box::new(move || f.clone()()))
39 }
40 }
41
42 #[doc(hidden)]
44 #[inline]
45 pub fn call_chain(&self) -> BlockBox<T, ()> {
46 (self.gen)()
47 }
48}
49
50impl<T> Clone for BlockChain<T> {
51
52 fn clone(&self) -> Self {
53 let gen = &self.gen;
54 BlockChain {
55 gen: gen.clone()
56 }
57 }
58}
59
60pub trait GeneratorBlock<T>: Block<Input = BlockFn<T, ()>, Output = ()> where T: 'static {}
62
63#[inline]
65pub fn observable_generator_block0<T, O>(observable: O) -> impl GeneratorBlock<Transact<T>> + Clone
66 where O: Observable<Message = Arrival<T>> + Clone + 'static,
67 T: Clone + 'static
68{
69 observable_generator_block(observable, 0)
70}
71
72#[inline]
74pub fn observable_generator_block<T, O>(observable: O, priority: isize) -> impl GeneratorBlock<Transact<T>> + Clone
75 where O: Observable<Message = Arrival<T>> + Clone + 'static,
76 T: Clone + 'static
77{
78 observable_generator_block_c(observable, move || { return_event(priority) })
79}
80
81#[inline]
83pub fn observable_generator_block_c<T, O, F, M>(observable: O, priority: F) -> impl GeneratorBlock<Transact<T>> + Clone
84 where O: Observable<Message = Arrival<T>> + Clone + 'static,
85 F: Fn() -> M + Clone + 'static,
86 M: Event<Item = isize> + Clone + 'static,
87 T: Clone + 'static
88{
89 ObservableGenerator { observable: observable, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
90}
91
92#[inline]
94pub fn stream_generator_block0<T>(stream: Stream<Arrival<T>>) -> impl GeneratorBlock<Transact<T>>
95 where T: Clone + 'static
96{
97 stream_generator_block(stream, 0)
98}
99
100#[inline]
102pub fn stream_generator_block<T>(stream: Stream<Arrival<T>>, priority: isize) -> impl GeneratorBlock<Transact<T>>
103 where T: Clone + 'static
104{
105 stream_generator_block_c(stream, move || { return_event(priority) })
106}
107
108#[inline]
110pub fn stream_generator_block_c<T, F, M>(stream: Stream<Arrival<T>>, priority: F) -> impl GeneratorBlock<Transact<T>>
111 where F: Fn() -> M + Clone + 'static,
112 M: Event<Item = isize> + Clone + 'static,
113 T: Clone + 'static
114{
115 StreamGenerator { stream: stream, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
116}
117
118#[must_use = "computations are lazy and do nothing unless to be run"]
120#[derive(Clone)]
121struct ObservableGenerator<T, O, F, M> {
122
123 observable: O,
125
126 priority: F,
128
129 _phantom1: PhantomData<M>,
131
132 _phantom2: PhantomData<T>
134}
135
136impl<T, O, F, M> Block for ObservableGenerator<T, O, F, M>
137 where O: Observable<Message = Arrival<T>> + Clone + 'static,
138 F: Fn() -> M + Clone + 'static,
139 M: Event<Item = isize> + Clone + 'static,
140 T: Clone + 'static
141{
142 type Input = BlockFn<Transact<T>, ()>;
143 type Output = ();
144
145 #[doc(hidden)]
146 fn call_block<C>(self, a: Self::Input, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
147 where C: FnOnce(simulation::Result<Self::Output>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
148 {
149 let cont = ProcessBoxCont::new(cont);
150 self.call_block_boxed(a, cont, pid, p)
151 }
152
153 #[doc(hidden)]
154 fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
155 if is_process_cancelled(&pid, p) {
156 revoke_process_boxed(cont, pid, p)
157 } else {
158 let ObservableGenerator { observable, priority, _phantom1, _phantom2 } = self;
159 let h = {
160 observable.subscribe({
161 cons_observer(move |arrival: &Arrival<T>, p| {
162 let chain = a.clone();
163 priority()
164 .and_then(move |priority| {
165 Transact::new(arrival.clone(), priority)
166 .into_event()
167 .and_then(move |t| {
168 TransactId::take(t.transact_id.clone())
169 .and_then(move |()| {
170 let block = chain.next();
171 block.run(t)
172 })
173 .run()
174 })
175 })
176 .call_event(p)
177 })
178 }).call_event(p)?
179 };
180 never_process()
181 .finally({
182 h.into_event().into_process()
183 })
184 .call_process_boxed(cont, pid, p)
185 }
186 }
187}
188
189impl<T, O, F, M> GeneratorBlock<Transact<T>> for ObservableGenerator<T, O, F, M>
190 where O: Observable<Message = Arrival<T>> + Clone + 'static,
191 F: Fn() -> M + Clone + 'static,
192 M: Event<Item = isize> + Clone + 'static,
193 T: Clone + 'static
194{}
195
196#[must_use = "computations are lazy and do nothing unless to be run"]
198#[derive(Clone)]
199struct StreamGenerator<T, F, M> where T: 'static {
200
201 stream: Stream<Arrival<T>>,
203
204 priority: F,
206
207 _phantom1: PhantomData<M>,
209
210 _phantom2: PhantomData<T>
212}
213
214impl<T, F, M> Block for StreamGenerator<T, F, M>
215 where F: Fn() -> M + Clone + 'static,
216 M: Event<Item = isize> + Clone + 'static,
217 T: Clone + 'static
218{
219 type Input = BlockFn<Transact<T>, ()>;
220 type Output = ();
221
222 #[doc(hidden)]
223 fn call_block<C>(self, a: Self::Input, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
224 where C: FnOnce(simulation::Result<Self::Output>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
225 {
226 let cont = ProcessBoxCont::new(cont);
227 self.call_block_boxed(a, cont, pid, p)
228 }
229
230 #[doc(hidden)]
231 fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
232 if is_process_cancelled(&pid, p) {
233 revoke_process_boxed(cont, pid, p)
234 } else {
235 let StreamGenerator { stream, priority, _phantom1, _phantom2 } = self;
236 generator_stream_loop(stream, priority, a)
237 .call_process_boxed(cont, pid, p)
238 }
239 }
240}
241
242fn generator_stream_loop<T, F, M>(stream: Stream<Arrival<T>>, priority: F, a: BlockFn<Transact<T>, ()>) -> ProcessBox<()>
244 where F: Fn() -> M + Clone + 'static,
245 M: Event<Item = isize> + Clone + 'static,
246 T: Clone + 'static
247{
248 let Stream::Cons(comp) = stream;
249 comp.and_then(move |(arrival, xs)| {
250 let chain = a.clone();
251 priority().and_then(move |priority| {
252 Transact::new(arrival, priority)
253 .into_event()
254 .and_then(move |t| {
255 TransactId::take(t.transact_id.clone())
256 .and_then(move |()| {
257 let block = chain.next();
258 block.run(t)
259 })
260 .run()
261 })
262 })
263 .into_process()
264 .and_then(move |()| {
265 generator_stream_loop(xs, priority, a)
266 })
267 }).into_boxed()
268}
269
270impl<T, F, M> GeneratorBlock<Transact<T>> for StreamGenerator<T, F, M>
271 where F: Fn() -> M + Clone + 'static,
272 M: Event<Item = isize> + Clone + 'static,
273 T: Clone + 'static
274{}