dvcompute/simulation/queue/unbounded/stats/
mod.rs1use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::Point;
11use crate::simulation::ref_comp::RefComp;
12use crate::simulation::observable::*;
13use crate::simulation::observable::source::*;
14use crate::simulation::simulation::*;
15use crate::simulation::event::*;
16use crate::simulation::process::*;
17use crate::simulation::strategy::*;
18use crate::simulation::resource::*;
19
20use dvcompute_utils::simulation::stats::*;
21use dvcompute_utils::grc::Grc;
22
23pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, T>;
26
27pub type LCFSQueue<T> = Queue<LCFSStrategy, FCFSStrategy, T>;
30
31pub struct Queue<SM, SO, T>
34 where SM: QueueStrategy,
35 SO: QueueStrategy + 'static,
36 T: 'static
37{
38 queue_store: QueueStorageBox<QueueItem<T>, SM::Priority>,
40
41 dequeue_resource: Grc<Resource<SO>>,
43
44 count: RefComp<isize>,
46
47 count_stats: RefComp<TimingStats<isize>>,
49
50 enqueue_store_count: RefComp<isize>,
52
53 dequeue_count: RefComp<isize>,
55
56 dequeue_extract_count: RefComp<isize>,
58
59 wait_time: RefComp<SamplingStats<f64>>,
61
62 dequeue_wait_time: RefComp<SamplingStats<f64>>,
64
65 enqueue_stored_source: ObservableSource<T>,
67
68 dequeue_requested_source: ObservableSource<()>,
70
71 dequeue_extracted_source: ObservableSource<T>
73}
74
75#[derive(Clone)]
77struct QueueItem<T> {
78
79 value: T,
81
82 storing_time: f64
84}
85
86#[inline]
88pub fn new_fcfs_queue<T>() -> NewQueue<FCFSStrategy, FCFSStrategy, T>
89 where T: 'static
90{
91 NewQueue {
92 storing_strategy: FCFSStrategy::Instance,
93 dequeue_strategy: FCFSStrategy::Instance,
94 _phantom: PhantomData
95 }
96}
97
98#[inline]
100pub fn new_lcfs_queue<T>() -> NewQueue<LCFSStrategy, FCFSStrategy, T>
101 where T: 'static
102{
103 NewQueue {
104 storing_strategy: LCFSStrategy::Instance,
105 dequeue_strategy: FCFSStrategy::Instance,
106 _phantom: PhantomData
107 }
108}
109
110impl<SM, SO, T> Queue<SM, SO, T>
111 where SM: QueueStrategy + 'static,
112 SO: QueueStrategy + 'static,
113 T: Clone + 'static
114{
115 #[inline]
117 pub fn new(storing_strategy: SM, dequeue_strategy: SO) -> NewQueue<SM, SO, T> {
118 NewQueue {
119 storing_strategy: storing_strategy,
120 dequeue_strategy: dequeue_strategy,
121 _phantom: PhantomData
122 }
123 }
124
125 #[inline]
127 pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
128 cons_event(move |p| {
129 Result::Ok(queue.count.read_at(p) == 0)
130 })
131 }
132
133 #[inline]
135 pub fn is_empty_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
136 queue.is_empty_changed_()
137 .mapc(move |()| {
138 Queue::is_empty(queue.clone())
139 })
140 }
141
142 #[inline]
144 pub fn is_empty_changed_(&self) -> impl Observable<Message = ()> + Clone {
145 self.count_changed_()
146 }
147
148 #[inline]
150 pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
151 cons_event(move |p| {
152 Result::Ok(queue.count.read_at(p))
153 })
154 }
155
156 #[inline]
158 pub fn count_stats(queue: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
159 cons_event(move |p| {
160 Result::Ok(queue.count_stats.read_at(p))
161 })
162 }
163
164 #[inline]
166 pub fn count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
167 queue.count_changed_()
168 .mapc(move |()| {
169 Queue::count(queue.clone())
170 })
171 }
172
173 #[inline]
175 pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
176 self.enqueue_stored().map(|_| {})
177 .merge(self.dequeue_extracted().map(|_| {}))
178 }
179
180 #[inline]
182 pub fn enqueue_store_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
183 cons_event(move |p| {
184 Result::Ok(queue.enqueue_store_count.read_at(p))
185 })
186 }
187
188 #[inline]
190 pub fn enqueue_store_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
191 queue.enqueue_store_count_changed_()
192 .mapc(move |()| {
193 Queue::enqueue_store_count(queue.clone())
194 })
195 }
196
197 #[inline]
199 pub fn enqueue_store_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
200 self.enqueue_stored().map(|_| {})
201 }
202
203 #[inline]
205 pub fn dequeue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
206 cons_event(move |p| {
207 Result::Ok(queue.dequeue_count.read_at(p))
208 })
209 }
210
211 #[inline]
213 pub fn dequeue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
214 queue.dequeue_count_changed_()
215 .mapc(move |()| {
216 Queue::dequeue_count(queue.clone())
217 })
218 }
219
220 #[inline]
222 pub fn dequeue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
223 self.dequeue_requested()
224 }
225
226 #[inline]
228 pub fn dequeue_extract_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
229 cons_event(move |p| {
230 Result::Ok(queue.dequeue_extract_count.read_at(p))
231 })
232 }
233
234 #[inline]
236 pub fn dequeue_extract_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
237 queue.dequeue_extract_count_changed_()
238 .mapc(move |()| {
239 Queue::dequeue_extract_count(queue.clone())
240 })
241 }
242
243 #[inline]
245 pub fn dequeue_extract_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
246 self.dequeue_extracted().map(|_| {})
247 }
248
249 #[inline]
251 pub fn store_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
252 cons_event(move |p| {
253 Result::Ok({
254 let x = queue.enqueue_store_count.read_at(p);
255 let t0 = p.run.specs.start_time;
256 let t = p.time;
257 (x as f64) / (t - t0)
258 })
259 })
260 }
261
262 #[inline]
265 pub fn dequeue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
266 cons_event(move |p| {
267 Result::Ok({
268 let x = queue.dequeue_count.read_at(p);
269 let t0 = p.run.specs.start_time;
270 let t = p.time;
271 (x as f64) / (t - t0)
272 })
273 })
274 }
275
276 #[inline]
278 pub fn dequeue_extract_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
279 cons_event(move |p| {
280 Result::Ok({
281 let x = queue.dequeue_extract_count.read_at(p);
282 let t0 = p.run.specs.start_time;
283 let t = p.time;
284 (x as f64) / (t - t0)
285 })
286 })
287 }
288
289 #[inline]
292 pub fn wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
293 cons_event(move |p| {
294 Result::Ok(queue.wait_time.read_at(p))
295 })
296 }
297
298 #[inline]
300 pub fn wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
301 queue.wait_time_changed_()
302 .mapc(move |()| {
303 Queue::wait_time(queue.clone())
304 })
305 }
306
307 #[inline]
309 pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
310 self.dequeue_extracted().map(|_| {})
311 }
312
313 #[inline]
316 pub fn dequeue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
317 cons_event(move |p| {
318 Result::Ok(queue.dequeue_wait_time.read_at(p))
319 })
320 }
321
322 #[inline]
324 pub fn dequeue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
325 queue.dequeue_wait_time_changed_()
326 .mapc(move |()| {
327 Queue::dequeue_wait_time(queue.clone())
328 })
329 }
330
331 #[inline]
333 pub fn dequeue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
334 self.dequeue_extracted().map(|_| {})
335 }
336
337 #[inline]
340 pub fn rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
341 cons_event(move |p| {
342 Result::Ok({
343 let x = queue.count_stats.read_at(p);
344 let y = queue.wait_time.read_at(p);
345 x.mean() / y.mean
346 })
347 })
348 }
349
350 #[inline]
352 pub fn rate_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
353 queue.rate_changed_()
354 .mapc(move |()| {
355 Queue::rate(queue.clone())
356 })
357 }
358
359 #[inline]
361 pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
362 self.enqueue_stored().map(|_| {})
363 .merge(self.dequeue_extracted().map(|_| {}))
364 }
365
366 pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
368 cons_event({
369 let queue = queue.clone();
370 move |p| {
371 queue.dequeue_request(p)
372 }
373 })
374 .into_process()
375 .and_then(move |t| {
376 request_resource(queue.dequeue_resource.clone())
377 .and_then(move |()| {
378 cons_event(move |p| {
379 queue.dequeue_extract(t, p)
380 })
381 .into_process()
382 })
383 })
384 }
385
386 pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
388 where SO::Priority: Clone
389 {
390 cons_event({
391 let queue = queue.clone();
392 move |p| {
393 queue.dequeue_request(p)
394 }
395 })
396 .into_process()
397 .and_then(move |t| {
398 request_resource_with_priority(queue.dequeue_resource.clone(), po)
399 .and_then(move |()| {
400 cons_event(move |p| {
401 queue.dequeue_extract(t, p)
402 })
403 .into_process()
404 })
405 })
406 }
407
408 pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
410 try_request_resource_within_event(queue.dequeue_resource.clone())
411 .and_then(move |f| {
412 if f {
413 cons_event(move |p| {
414 let t = queue.dequeue_request(p)?;
415 let x = queue.dequeue_extract(t, p)?;
416 Result::Ok(Some(x))
417 }).into_boxed()
418 } else {
419 return_event(None)
420 .into_boxed()
421 }
422 })
423 }
424
425 pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
428 where T: PartialEq
429 {
430 let pred = move |x: &T| { *x == item };
431 Queue::delete_by(queue, pred)
432 .map(|x| { x.is_some() })
433 }
434
435 pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
437 where T: PartialEq
438 {
439 let pred = move |x: &T| { *x == item };
440 Queue::delete_by(queue, pred)
441 .map(|_| ())
442 }
443
444 pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
446 where F: Fn(&T) -> bool + 'static
447 {
448 try_request_resource_within_event(queue.dequeue_resource.clone())
449 .and_then(move |f| {
450 if f {
451 cons_event(move |p| {
452 let pred = move |x: &QueueItem<T>| { pred(&x.value) };
453 let pred = Box::new(pred);
454 match queue.queue_store.remove_boxed_by(pred, p) {
455 None => {
456 release_resource_within_event(queue.dequeue_resource.clone())
457 .call_event(p)?;
458 Result::Ok(None)
459 },
460 Some(i) => {
461 let t = queue.dequeue_request(p)?;
462 let x = queue.dequeue_post_extract(t, i, p)?;
463 Result::Ok(Some(x))
464 }
465 }
466 }).into_boxed()
467 } else {
468 return_event(None)
469 .into_boxed()
470 }
471 })
472 }
473
474 pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
476 where F: Fn(&T) -> bool + 'static
477 {
478 cons_event(move |p| {
479 let pred = move |x: &QueueItem<T>| { pred(&x.value) };
480 let pred = Box::new(pred);
481 Result::Ok(queue.queue_store.exists_boxed(pred, p))
482 })
483 }
484
485 pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
487 where F: Fn(&T) -> bool + 'static,
488 T: Clone
489 {
490 cons_event(move |p| {
491 let pred = move |x: &QueueItem<T>| { pred(&x.value) };
492 let pred = Box::new(pred);
493 Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.value.clone() }))
494 })
495 }
496
497 pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
499 cons_event(move |p| {
500 loop {
501 let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
502 match x {
503 None => return Result::Ok(()),
504 Some(_) => {}
505 }
506 }
507 })
508 }
509
510 #[inline]
512 pub fn enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
513 cons_event(move |p| {
514 queue.enqueue_store(item, p)
515 })
516 }
517
518 #[inline]
520 pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()>
521 where SM::Priority: Clone
522 {
523 cons_event(move |p| {
524 queue.enqueue_store_with_priority(pm, item, p)
525 })
526 }
527
528 #[inline]
530 pub fn enqueue_stored(&self) -> impl Observable<Message = T> + Clone {
531 self.enqueue_stored_source.publish()
532 }
533
534 #[inline]
536 pub fn dequeue_requested(&self) -> impl Observable<Message = ()> + Clone {
537 self.dequeue_requested_source.publish()
538 }
539
540 #[inline]
542 pub fn dequeue_extracted(&self) -> impl Observable<Message = T> + Clone {
543 self.dequeue_extracted_source.publish()
544 }
545
546 #[inline]
548 pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
549 self.enqueue_stored().map(|_| {})
550 .merge(self.dequeue_requested())
551 .merge(self.dequeue_extracted().map(|_| {}))
552 }
553
554 fn dequeue_request(&self, p: &Point) -> simulation::Result<f64> {
556 let c = self.dequeue_count.read_at(p);
557 let c2 = c + 1;
558 self.dequeue_count.write_at(c2, p);
559 self.dequeue_requested_source.trigger_at(&(), p)?;
560 Result::Ok(p.time)
561 }
562
563 fn dequeue_extract(&self, t_r: f64, p: &Point) -> simulation::Result<T> {
565 let i = self.queue_store.pop(p).unwrap();
566 self.dequeue_post_extract(t_r, i, p)
567 }
568
569 fn dequeue_post_extract(&self, t_r: f64, i: QueueItem<T>, p: &Point) -> simulation::Result<T> {
571 let t = p.time;
572 let c = self.count.read_at(p);
573 let c2 = c - 1;
574 let stats = self.count_stats.read_at(p);
575 let stats2 = stats.add(t, c2);
576 let ec = self.dequeue_extract_count.read_at(p);
577 let ec2 = ec + 1;
578 self.count.write_at(c2, p);
579 self.count_stats.write_at(stats2, p);
580 self.dequeue_extract_count.write_at(ec2, p);
581 self.dequeue_stat(t_r, &i, p);
582 self.dequeue_extracted_source
583 .trigger_at(&i.value, p)?;
584 Result::Ok(i.value)
585 }
586
587 fn dequeue_stat(&self, t_r: f64, i: &QueueItem<T>, p: &Point) {
590 let t1 = i.storing_time;
591 let t = p.time;
592 let stats = self.dequeue_wait_time.read_at(p);
593 let stats2 = stats.add(t - t_r);
594 self.dequeue_wait_time.write_at(stats2, p);
595 let stats = self.wait_time.read_at(p);
596 let stats2 = stats.add(t - t1);
597 self.wait_time.write_at(stats2, p);
598 }
599
600 fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
602 let t = p.time;
603 let i2 = QueueItem {
604 value: item,
605 storing_time: t
606 };
607 self.queue_store.push(i2.clone(), p);
608 let c = self.count.read_at(p);
609 let c2 = c + 1;
610 self.count.write_at(c2, p);
611 let stats = self.count_stats.read_at(p);
612 let stats2 = stats.add(t, c2);
613 self.count_stats.write_at(stats2, p);
614 let sc = self.enqueue_store_count.read_at(p);
615 let sc2 = sc + 1;
616 self.enqueue_store_count.write_at(sc2, p);
617 release_resource_within_event(self.dequeue_resource.clone())
618 .call_event(p)?;
619 self.enqueue_stored_source
620 .trigger_at(&i2.value, p)
621 }
622
623 fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
625 let t = p.time;
626 let i2 = QueueItem {
627 value: item,
628 storing_time: t
629 };
630 self.queue_store.push_with_priority(pm, i2.clone(), p);
631 let c = self.count.read_at(p);
632 let c2 = c + 1;
633 self.count.write_at(c2, p);
634 let stats = self.count_stats.read_at(p);
635 let stats2 = stats.add(t, c2);
636 self.count_stats.write_at(stats2, p);
637 let sc = self.enqueue_store_count.read_at(p);
638 let sc2 = sc + 1;
639 self.enqueue_store_count.write_at(sc2, p);
640 release_resource_within_event(self.dequeue_resource.clone())
641 .call_event(p)?;
642 self.enqueue_stored_source
643 .trigger_at(&i2.value, p)
644 }
645
646 pub fn reset(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
648 cons_event(move |p| {
649 let t = p.time;
650 let count = queue.count.read_at(p);
651 queue.count_stats.write_at(TimingStats::from_sample(t, count), p);
652 queue.enqueue_store_count.write_at(0, p);
653 queue.dequeue_count.write_at(0, p);
654 queue.dequeue_extract_count.write_at(0, p);
655 queue.wait_time.write_at(SamplingStats::empty(), p);
656 queue.dequeue_wait_time.write_at(SamplingStats::empty(), p);
657 Result::Ok(())
658 })
659 }
660}
661
662#[derive(Clone)]
664pub struct NewQueue<SM, SO, T> {
665
666 storing_strategy: SM,
668
669 dequeue_strategy: SO,
671
672 _phantom: PhantomData<T>
674}
675
676impl<SM, SO, T> Event for NewQueue<SM, SO, T>
677 where SM: QueueStrategy,
678 SO: QueueStrategy + 'static,
679 T: 'static
680{
681 type Item = Queue<SM, SO, T>;
682
683 #[doc(hidden)]
684 #[inline]
685 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
686 let NewQueue { storing_strategy, dequeue_strategy, _phantom } = self;
687 let t = p.time;
688 let queue_store = storing_strategy.new_storage();
689 let dequeue_resource = {
690 Resource::<SO>::new_with_max_count(dequeue_strategy, 0, None)
691 .call_simulation(p.run)?
692 };
693 Result::Ok(Queue {
694 queue_store: queue_store,
695 dequeue_resource: Grc::new(dequeue_resource),
696 count: RefComp::new(0),
697 count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
698 enqueue_store_count: RefComp::new(0),
699 dequeue_count: RefComp::new(0),
700 dequeue_extract_count: RefComp::new(0),
701 wait_time: RefComp::new(SamplingStats::empty()),
702 dequeue_wait_time: RefComp::new(SamplingStats::empty()),
703 enqueue_stored_source: ObservableSource::new(),
704 dequeue_requested_source: ObservableSource::new(),
705 dequeue_extracted_source: ObservableSource::new()
706 })
707 }
708}