dvcompute_gpss_branch/simulation/
queue.rs1use std::hash::{Hash, Hasher};
8
9use dvcompute_dist::simulation;
10use dvcompute_dist::simulation::ref_comp::RefComp;
11use dvcompute_dist::simulation::Point;
12use dvcompute_dist::simulation::event::*;
13use dvcompute_dist::simulation::observable::*;
14use dvcompute_dist::simulation::observable::source::*;
15
16use dvcompute_utils::simulation::stats::*;
17use dvcompute_utils::grc::Grc;
18
19use crate::simulation::transact::*;
20
21pub struct Queue {
23
24 pub sequence_no: u64,
26
27 content: RefComp<isize>,
29
30 content_stats: RefComp<TimingStats<isize>>,
32
33 enqueue_count: RefComp<isize>,
35
36 enqueue_zero_entry_count: RefComp<isize>,
38
39 wait_time: RefComp<SamplingStats<f64>>,
41
42 non_zero_entry_wait_time: RefComp<SamplingStats<f64>>,
44
45 enqueued: ObservableSource<()>,
47
48 dequeued: ObservableSource<()>
50}
51
52#[derive(Clone)]
54pub struct QueueEntry {
55
56 pub queue: Grc<Queue>,
58
59 pub enqueue_time: f64
61}
62
63impl PartialEq for Queue {
64
65 fn eq(&self, other: &Self) -> bool {
66 self.content == other.content
67 }
68}
69
70impl Eq for Queue {}
71
72impl Hash for Queue {
73
74 fn hash<H: Hasher>(&self, state: &mut H) {
75 self.sequence_no.hash(state)
76 }
77}
78
79impl Queue {
80
81 #[inline]
83 pub fn new() -> NewQueue {
84 NewQueue {}
85 }
86
87 #[inline]
89 pub fn is_empty(queue: Grc<Queue>) -> impl Event<Item = bool> + Clone {
90 cons_event(move |p| {
91 let n = queue.content.read_at(p);
92 Result::Ok(n == 0)
93 })
94 }
95
96 #[inline]
98 pub fn content(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
99 cons_event(move |p| {
100 let n = queue.content.read_at(p);
101 Result::Ok(n)
102 })
103 }
104
105 #[inline]
107 pub fn content_stats(queue: Grc<Queue>) -> impl Event<Item = TimingStats<isize>> + Clone {
108 cons_event(move |p| {
109 let stats = queue.content_stats.read_at(p);
110 Result::Ok(stats)
111 })
112 }
113
114 #[inline]
116 pub fn content_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
117 Queue::content_changed_(&queue)
118 .mapc(move |()| { Queue::content(queue.clone()) })
119 }
120
121 #[inline]
123 pub fn content_changed_(&self) -> impl Observable<Message = ()> + Clone {
124 self.enqueued().merge(self.dequeued())
125 }
126
127 #[inline]
129 pub fn enqueue_count(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
130 cons_event(move |p| {
131 let n = queue.enqueue_count.read_at(p);
132 Result::Ok(n)
133 })
134 }
135
136 #[inline]
138 pub fn enqueue_count_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
139 Queue::enqueue_count_changed_(&queue)
140 .mapc(move |()| { Queue::enqueue_count(queue.clone()) })
141 }
142
143 #[inline]
145 pub fn enqueue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
146 self.enqueued()
147 }
148
149 #[inline]
151 pub fn enqueue_zero_entry_count(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
152 cons_event(move |p| {
153 let n = queue.enqueue_zero_entry_count.read_at(p);
154 Result::Ok(n)
155 })
156 }
157
158 #[inline]
160 pub fn enqueue_zero_entry_count_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
161 Queue::enqueue_zero_entry_count_changed_(&queue)
162 .mapc(move |()| { Queue::enqueue_zero_entry_count(queue.clone()) })
163 }
164
165 #[inline]
167 pub fn enqueue_zero_entry_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
168 self.dequeued()
169 }
170
171 #[inline]
173 pub fn wait_time(queue: Grc<Queue>) -> impl Event<Item = SamplingStats<f64>> + Clone {
174 cons_event(move |p| {
175 let stats = queue.wait_time.read_at(p);
176 Result::Ok(stats)
177 })
178 }
179
180 #[inline]
182 pub fn wait_time_changed(queue: Grc<Queue>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
183 Queue::wait_time_changed_(&queue)
184 .mapc(move |()| { Queue::wait_time(queue.clone()) })
185 }
186
187 #[inline]
189 pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
190 self.dequeued()
191 }
192
193 #[inline]
195 pub fn non_zero_entry_wait_time(queue: Grc<Queue>) -> impl Event<Item = SamplingStats<f64>> + Clone {
196 cons_event(move |p| {
197 let stats = queue.non_zero_entry_wait_time.read_at(p);
198 Result::Ok(stats)
199 })
200 }
201
202 #[inline]
204 pub fn non_zero_entry_wait_time_changed(queue: Grc<Queue>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
205 Queue::non_zero_entry_wait_time_changed_(&queue)
206 .mapc(move |()| { Queue::non_zero_entry_wait_time(queue.clone()) })
207 }
208
209 #[inline]
211 pub fn non_zero_entry_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
212 self.dequeued()
213 }
214
215 #[inline]
218 pub fn rate(queue: Grc<Queue>) -> impl Event<Item = f64> + Clone {
219 cons_event(move |p| {
220 let x = queue.content_stats.read_at(p);
221 let y = queue.wait_time.read_at(p);
222 Result::Ok(x.mean() / y.mean)
223 })
224 }
225
226 #[inline]
228 pub fn rate_changed(queue: Grc<Queue>) -> impl Observable<Message = f64> + Clone {
229 Queue::rate_changed_(&queue)
230 .mapc(move |()| { Queue::rate(queue.clone()) })
231 }
232
233 #[inline]
235 pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
236 self.enqueued().merge(self.dequeued())
237 }
238
239 #[inline]
241 pub fn enqueued(&self) -> impl Observable<Message = ()> + Clone {
242 self.enqueued.publish()
243 }
244
245 #[inline]
247 pub fn dequeued(&self) -> impl Observable<Message = ()> + Clone {
248 self.dequeued.publish()
249 }
250
251 #[inline]
253 pub fn enqueue(queue: Grc<Queue>, transact_id: Grc<TransactId>, increment: isize) -> Enqueue {
254 Enqueue { queue: queue, transact_id: transact_id, increment: increment }
255 }
256
257 #[inline]
259 pub fn dequeue(queue: Grc<Queue>, transact_id: Grc<TransactId>, decrement: isize) -> Dequeue {
260 Dequeue { queue: queue, transact_id: transact_id, decrement: decrement }
261 }
262
263 #[inline]
265 pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
266 self.enqueued().merge(self.dequeued())
267 }
268
269 pub fn reset(queue: Grc<Queue>) -> impl Event<Item = ()> + Clone {
271 cons_event(move |p| {
272 let content = queue.content.read_at(p);
273 queue.content_stats.write_at(TimingStats::from_sample(p.time, content), p);
274 queue.enqueue_count.write_at(0, p);
275 queue.enqueue_zero_entry_count.write_at(0, p);
276 queue.wait_time.write_at(SamplingStats::empty(), p);
277 queue.non_zero_entry_wait_time.write_at(SamplingStats::empty(), p);
278 Result::Ok(())
279 })
280 }
281}
282
283#[derive(Clone)]
285pub struct NewQueue {}
286
287impl Event for NewQueue {
288
289 type Item = Queue;
290
291 #[doc(hidden)]
292 #[inline]
293 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
294 let t = p.time;
295 let gen = &p.run.generator;
296 let sequence_no = gen.random_sequence_no();
297 Result::Ok(Queue {
298 sequence_no: sequence_no,
299 content: RefComp::new(0),
300 content_stats: RefComp::new(TimingStats::from_sample(t, 0)),
301 enqueue_count: RefComp::new(0),
302 enqueue_zero_entry_count: RefComp::new(0),
303 wait_time: RefComp::new(SamplingStats::empty()),
304 non_zero_entry_wait_time: RefComp::new(SamplingStats::empty()),
305 enqueued: ObservableSource::new(),
306 dequeued: ObservableSource::new()
307 })
308 }
309}
310
311#[derive(Clone)]
313pub struct Enqueue {
314
315 queue: Grc<Queue>,
317
318 transact_id: Grc<TransactId>,
320
321 increment: isize
323}
324
325impl Event for Enqueue {
326
327 type Item = ();
328
329 #[doc(hidden)]
330 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
331 let Enqueue { queue, transact_id, increment } = self;
332 let t = p.time;
333 let e = QueueEntry { queue: queue.clone(), enqueue_time: t };
334 let n = queue.enqueue_count.read_at(p);
335 let c = queue.content.read_at(p);
336 let stats = queue.content_stats.read_at(p);
337 queue.enqueue_count.write_at(n + 1, p);
338 queue.content.write_at(c + increment, p);
339 queue.content_stats.write_at(stats.add(t, c + increment), p);
340 match transact_id.register_queue_entry(e, p) {
341 Result::Err(e) => Result::Err(e),
342 Result::Ok(()) => queue.enqueued.trigger_at(&(), p)
343 }
344 }
345}
346
347#[derive(Clone)]
349pub struct Dequeue {
350
351 queue: Grc<Queue>,
353
354 transact_id: Grc<TransactId>,
356
357 decrement: isize
359}
360
361impl Event for Dequeue {
362
363 type Item = ();
364
365 #[doc(hidden)]
366 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
367 let Dequeue { queue, transact_id, decrement } = self;
368 match transact_id.unregister_queue_entry(&queue, p) {
369 Result::Err(e) => Result::Err(e),
370 Result::Ok(e) => {
371 let t = p.time;
372 let t0 = e.enqueue_time;
373 let dt = t - t0;
374 let c = queue.content.read_at(p);
375 let stats = queue.content_stats.read_at(p);
376 let wait_time = queue.wait_time.read_at(p);
377 queue.content.write_at(c - decrement, p);
378 queue.content_stats.write_at(stats.add(t, c - decrement), p);
379 queue.wait_time.write_at(wait_time.add(dt), p);
380 if t == t0 {
381 let c2 = queue.enqueue_zero_entry_count.read_at(p);
382 queue.enqueue_zero_entry_count.write_at(c2 + 1, p);
383 } else {
384 let wait_time2 = queue.non_zero_entry_wait_time.read_at(p);
385 queue.non_zero_entry_wait_time.write_at(wait_time2.add(dt), p);
386 }
387 queue.dequeued.trigger_at(&(), p)
388 }
389 }
390 }
391}