1use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::error::*;
11use crate::simulation::Point;
12use crate::simulation::ref_comp::RefComp;
13use crate::simulation::observable::*;
14use crate::simulation::observable::source::*;
15use crate::simulation::simulation::*;
16use crate::simulation::event::*;
17use crate::simulation::process::*;
18use crate::simulation::strategy::*;
19use crate::simulation::resource::*;
20
21use dvcompute_utils::simulation::stats::*;
22use dvcompute_utils::grc::Grc;
23
24pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>;
27
28pub type LCFSQueue<T> = Queue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>;
31
32pub struct Queue<SI, SM, SO, T>
36 where SI: QueueStrategy + 'static,
37 SM: QueueStrategy,
38 SO: QueueStrategy + 'static,
39 T: 'static
40{
41 max_count: isize,
43
44 enqueue_resource: Grc<Resource<SI>>,
46
47 queue_store: QueueStorageBox<QueueItem<T>, SM::Priority>,
49
50 dequeue_resource: Grc<Resource<SO>>,
52
53 count: RefComp<isize>,
55
56 count_stats: RefComp<TimingStats<isize>>,
58
59 enqueue_count: RefComp<isize>,
61
62 enqueue_lost_count: RefComp<isize>,
64
65 enqueue_store_count: RefComp<isize>,
67
68 dequeue_count: RefComp<isize>,
70
71 dequeue_extract_count: RefComp<isize>,
73
74 wait_time: RefComp<SamplingStats<f64>>,
76
77 total_wait_time: RefComp<SamplingStats<f64>>,
79
80 enqueue_wait_time: RefComp<SamplingStats<f64>>,
82
83 dequeue_wait_time: RefComp<SamplingStats<f64>>,
85
86 enqueue_initiated_source: ObservableSource<T>,
88
89 enqueue_lost_source: ObservableSource<T>,
91
92 enqueue_stored_source: ObservableSource<T>,
94
95 dequeue_requested_source: ObservableSource<()>,
97
98 dequeue_extracted_source: ObservableSource<T>
100}
101
102#[derive(Clone)]
104struct QueueItem<T> {
105
106 value: T,
108
109 input_time: f64,
111
112 storing_time: f64
114}
115
116#[inline]
118pub fn new_fcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>
119 where T: 'static
120{
121 NewQueue {
122 enqueue_strategy: FCFSStrategy::Instance,
123 storing_strategy: FCFSStrategy::Instance,
124 dequeue_strategy: FCFSStrategy::Instance,
125 max_count: max_count,
126 _phantom: PhantomData
127 }
128}
129
130#[inline]
132pub fn new_lcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>
133 where T: 'static
134{
135 NewQueue {
136 enqueue_strategy: FCFSStrategy::Instance,
137 storing_strategy: LCFSStrategy::Instance,
138 dequeue_strategy: FCFSStrategy::Instance,
139 max_count: max_count,
140 _phantom: PhantomData
141 }
142}
143
144impl<SI, SM, SO, T> Queue<SI, SM, SO, T>
145 where SI: QueueStrategy + 'static,
146 SM: QueueStrategy + 'static,
147 SO: QueueStrategy + 'static,
148 T: Clone + 'static
149{
150 #[inline]
152 pub fn new(enqueue_strategy: SI,
153 storing_strategy: SM,
154 dequeue_strategy: SO,
155 max_count: isize) -> NewQueue<SI, SM, SO, T>
156 {
157 NewQueue {
158 enqueue_strategy: enqueue_strategy,
159 storing_strategy: storing_strategy,
160 dequeue_strategy: dequeue_strategy,
161 max_count: max_count,
162 _phantom: PhantomData
163 }
164 }
165
166 #[inline]
168 pub fn max_count(&self) -> isize {
169 self.max_count
170 }
171
172 #[inline]
174 pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
175 cons_event(move |p| {
176 Result::Ok(queue.count.read_at(p) == 0)
177 })
178 }
179
180 #[inline]
182 pub fn is_empty_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
183 queue.is_empty_changed_()
184 .mapc(move |()| {
185 Queue::is_empty(queue.clone())
186 })
187 }
188
189 #[inline]
191 pub fn is_empty_changed_(&self) -> impl Observable<Message = ()> + Clone {
192 self.count_changed_()
193 }
194
195 #[inline]
197 pub fn is_full(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
198 cons_event(move |p| {
199 Result::Ok(queue.count.read_at(p) == queue.max_count)
200 })
201 }
202
203 #[inline]
205 pub fn is_full_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
206 queue.is_full_changed_()
207 .mapc(move |()| {
208 Queue::is_full(queue.clone())
209 })
210 }
211
212 #[inline]
214 pub fn is_full_changed_(&self) -> impl Observable<Message = ()> + Clone {
215 self.count_changed_()
216 }
217
218 #[inline]
220 pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
221 cons_event(move |p| {
222 Result::Ok(queue.count.read_at(p))
223 })
224 }
225
226 #[inline]
228 pub fn count_stats(queue: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
229 cons_event(move |p| {
230 Result::Ok(queue.count_stats.read_at(p))
231 })
232 }
233
234 #[inline]
236 pub fn count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
237 queue.count_changed_()
238 .mapc(move |()| {
239 Queue::count(queue.clone())
240 })
241 }
242
243 #[inline]
245 pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
246 self.enqueue_stored().map(|_| {})
247 .merge(self.dequeue_extracted().map(|_| {}))
248 }
249
250 #[inline]
252 pub fn enqueue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
253 cons_event(move |p| {
254 Result::Ok(queue.enqueue_count.read_at(p))
255 })
256 }
257
258 #[inline]
260 pub fn enqueue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
261 queue.enqueue_count_changed_()
262 .mapc(move |()| {
263 Queue::enqueue_count(queue.clone())
264 })
265 }
266
267 #[inline]
269 pub fn enqueue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
270 self.enqueue_initiated().map(|_| {})
271 }
272
273 #[inline]
275 pub fn enqueue_lost_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
276 cons_event(move |p| {
277 Result::Ok(queue.enqueue_lost_count.read_at(p))
278 })
279 }
280
281 #[inline]
283 pub fn enqueue_lost_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
284 queue.enqueue_lost_count_changed_()
285 .mapc(move |()| {
286 Queue::enqueue_lost_count(queue.clone())
287 })
288 }
289
290 #[inline]
292 pub fn enqueue_lost_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
293 self.enqueue_lost().map(|_| {})
294 }
295
296 #[inline]
298 pub fn enqueue_store_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
299 cons_event(move |p| {
300 Result::Ok(queue.enqueue_store_count.read_at(p))
301 })
302 }
303
304 #[inline]
306 pub fn enqueue_store_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
307 queue.enqueue_store_count_changed_()
308 .mapc(move |()| {
309 Queue::enqueue_store_count(queue.clone())
310 })
311 }
312
313 #[inline]
315 pub fn enqueue_store_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
316 self.enqueue_stored().map(|_| {})
317 }
318
319 #[inline]
321 pub fn dequeue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
322 cons_event(move |p| {
323 Result::Ok(queue.dequeue_count.read_at(p))
324 })
325 }
326
327 #[inline]
329 pub fn dequeue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
330 queue.dequeue_count_changed_()
331 .mapc(move |()| {
332 Queue::dequeue_count(queue.clone())
333 })
334 }
335
336 #[inline]
338 pub fn dequeue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
339 self.dequeue_requested()
340 }
341
342 #[inline]
344 pub fn dequeue_extract_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
345 cons_event(move |p| {
346 Result::Ok(queue.dequeue_extract_count.read_at(p))
347 })
348 }
349
350 #[inline]
352 pub fn dequeue_extract_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
353 queue.dequeue_extract_count_changed_()
354 .mapc(move |()| {
355 Queue::dequeue_extract_count(queue.clone())
356 })
357 }
358
359 #[inline]
361 pub fn dequeue_extract_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
362 self.dequeue_extracted().map(|_| {})
363 }
364
365 #[inline]
367 pub fn load_factor(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
368 cons_event(move |p| {
369 Result::Ok({
370 let x = queue.count.read_at(p);
371 let y = queue.max_count;
372 (x as f64) / (y as f64)
373 })
374 })
375 }
376
377 #[inline]
379 pub fn load_factor_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
380 queue.load_factor_changed_()
381 .mapc(move |()| {
382 Queue::load_factor(queue.clone())
383 })
384 }
385
386 #[inline]
388 pub fn load_factor_changed_(&self) -> impl Observable<Message = ()> + Clone {
389 self.count_changed_()
390 }
391
392 #[inline]
394 pub fn enqueue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
395 cons_event(move |p| {
396 Result::Ok({
397 let x = queue.enqueue_count.read_at(p);
398 let t0 = p.run.specs.start_time;
399 let t = p.time;
400 (x as f64) / (t - t0)
401 })
402 })
403 }
404
405 #[inline]
407 pub fn store_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
408 cons_event(move |p| {
409 Result::Ok({
410 let x = queue.enqueue_store_count.read_at(p);
411 let t0 = p.run.specs.start_time;
412 let t = p.time;
413 (x as f64) / (t - t0)
414 })
415 })
416 }
417
418 #[inline]
421 pub fn dequeue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
422 cons_event(move |p| {
423 Result::Ok({
424 let x = queue.dequeue_count.read_at(p);
425 let t0 = p.run.specs.start_time;
426 let t = p.time;
427 (x as f64) / (t - t0)
428 })
429 })
430 }
431
432 #[inline]
434 pub fn dequeue_extract_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
435 cons_event(move |p| {
436 Result::Ok({
437 let x = queue.dequeue_extract_count.read_at(p);
438 let t0 = p.run.specs.start_time;
439 let t = p.time;
440 (x as f64) / (t - t0)
441 })
442 })
443 }
444
445 #[inline]
448 pub fn wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
449 cons_event(move |p| {
450 Result::Ok(queue.wait_time.read_at(p))
451 })
452 }
453
454 #[inline]
456 pub fn wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
457 queue.wait_time_changed_()
458 .mapc(move |()| {
459 Queue::wait_time(queue.clone())
460 })
461 }
462
463 #[inline]
465 pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
466 self.dequeue_extracted().map(|_| {})
467 }
468
469 #[inline]
474 pub fn total_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
475 cons_event(move |p| {
476 Result::Ok(queue.total_wait_time.read_at(p))
477 })
478 }
479
480 #[inline]
482 pub fn total_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
483 queue.total_wait_time_changed_()
484 .mapc(move |()| {
485 Queue::total_wait_time(queue.clone())
486 })
487 }
488
489 #[inline]
491 pub fn total_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
492 self.dequeue_extracted().map(|_| {})
493 }
494
495 #[inline]
498 pub fn enqueue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
499 cons_event(move |p| {
500 Result::Ok(queue.enqueue_wait_time.read_at(p))
501 })
502 }
503
504 #[inline]
506 pub fn enqueue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
507 queue.enqueue_wait_time_changed_()
508 .mapc(move |()| {
509 Queue::enqueue_wait_time(queue.clone())
510 })
511 }
512
513 #[inline]
515 pub fn enqueue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
516 self.enqueue_stored().map(|_| {})
517 }
518
519 #[inline]
522 pub fn dequeue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
523 cons_event(move |p| {
524 Result::Ok(queue.dequeue_wait_time.read_at(p))
525 })
526 }
527
528 #[inline]
530 pub fn dequeue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
531 queue.dequeue_wait_time_changed_()
532 .mapc(move |()| {
533 Queue::dequeue_wait_time(queue.clone())
534 })
535 }
536
537 #[inline]
539 pub fn dequeue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
540 self.dequeue_extracted().map(|_| {})
541 }
542
543 #[inline]
549 pub fn rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
550 cons_event(move |p| {
551 Result::Ok({
552 let x = queue.count_stats.read_at(p);
553 let y = queue.wait_time.read_at(p);
554 x.mean() / y.mean
555 })
556 })
557 }
558
559 #[inline]
561 pub fn rate_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
562 queue.rate_changed_()
563 .mapc(move |()| {
564 Queue::rate(queue.clone())
565 })
566 }
567
568 #[inline]
570 pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
571 self.enqueue_stored().map(|_| {})
572 .merge(self.dequeue_extracted().map(|_| {}))
573 }
574
575 pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
577 cons_event({
578 let queue = queue.clone();
579 move |p| {
580 queue.dequeue_request(p)
581 }
582 })
583 .into_process()
584 .and_then(move |t| {
585 request_resource(queue.dequeue_resource.clone())
586 .and_then(move |()| {
587 cons_event(move |p| {
588 queue.dequeue_extract(t, p)
589 })
590 .into_process()
591 })
592 })
593 }
594
595 pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
597 where SO::Priority: Clone
598 {
599 cons_event({
600 let queue = queue.clone();
601 move |p| {
602 queue.dequeue_request(p)
603 }
604 })
605 .into_process()
606 .and_then(move |t| {
607 request_resource_with_priority(queue.dequeue_resource.clone(), po)
608 .and_then(move |()| {
609 cons_event(move |p| {
610 queue.dequeue_extract(t, p)
611 })
612 .into_process()
613 })
614 })
615 }
616
617 pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
619 try_request_resource_within_event(queue.dequeue_resource.clone())
620 .and_then(move |f| {
621 if f {
622 cons_event(move |p| {
623 let t = queue.dequeue_request(p)?;
624 let x = queue.dequeue_extract(t, p)?;
625 Result::Ok(Some(x))
626 }).into_boxed()
627 } else {
628 return_event(None)
629 .into_boxed()
630 }
631 })
632 }
633
634 pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
637 where T: PartialEq
638 {
639 let pred = move |x: &T| { *x == item };
640 Queue::delete_by(queue, pred)
641 .map(|x| { x.is_some() })
642 }
643
644 pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
646 where T: PartialEq
647 {
648 let pred = move |x: &T| { *x == item };
649 Queue::delete_by(queue, pred)
650 .map(|_| ())
651 }
652
653 pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
655 where F: Fn(&T) -> bool + 'static
656 {
657 try_request_resource_within_event(queue.dequeue_resource.clone())
658 .and_then(move |f| {
659 if f {
660 cons_event(move |p| {
661 let pred = move |x: &QueueItem<T>| { pred(&x.value) };
662 let pred = Box::new(pred);
663 match queue.queue_store.remove_boxed_by(pred, p) {
664 None => {
665 release_resource_within_event(queue.dequeue_resource.clone())
666 .call_event(p)?;
667 Result::Ok(None)
668 },
669 Some(i) => {
670 let t = queue.dequeue_request(p)?;
671 let x = queue.dequeue_post_extract(t, i, p)?;
672 Result::Ok(Some(x))
673 }
674 }
675 }).into_boxed()
676 } else {
677 return_event(None)
678 .into_boxed()
679 }
680 })
681 }
682
683 pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
685 where F: Fn(&T) -> bool + 'static
686 {
687 cons_event(move |p| {
688 let pred = move |x: &QueueItem<T>| { pred(&x.value) };
689 let pred = Box::new(pred);
690 Result::Ok(queue.queue_store.exists_boxed(pred, p))
691 })
692 }
693
694 pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
696 where F: Fn(&T) -> bool + 'static,
697 T: Clone
698 {
699 cons_event(move |p| {
700 let pred = move |x: &QueueItem<T>| { pred(&x.value) };
701 let pred = Box::new(pred);
702 Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.value.clone() }))
703 })
704 }
705
706 pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
708 cons_event(move |p| {
709 loop {
710 let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
711 match x {
712 None => return Result::Ok(()),
713 Some(_) => {}
714 }
715 }
716 })
717 }
718
719 pub fn enqueue(queue: Grc<Self>, item: T) -> impl Process<Item = ()> {
721 cons_event({
722 let queue = queue.clone();
723 move |p| {
724 queue.enqueue_initiate(item, p)
725 }
726 })
727 .into_process()
728 .and_then(move |i| {
729 request_resource(queue.enqueue_resource.clone())
730 .and_then(move |()| {
731 cons_event(move |p| {
732 queue.enqueue_store(i, p)
733 })
734 .into_process()
735 })
736 })
737 }
738
739 pub fn enqueue_with_input_priority(queue: Grc<Self>, pi: SI::Priority, item: T) -> impl Process<Item = ()>
742 where SI::Priority: Clone
743 {
744 cons_event({
745 let queue = queue.clone();
746 move |p| {
747 queue.enqueue_initiate(item, p)
748 }
749 })
750 .into_process()
751 .and_then(move |i| {
752 request_resource_with_priority(queue.enqueue_resource.clone(), pi)
753 .and_then(move |()| {
754 cons_event(move |p| {
755 queue.enqueue_store(i, p)
756 })
757 .into_process()
758 })
759 })
760 }
761
762 pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Process<Item = ()>
765 where SM::Priority: Clone
766 {
767 cons_event({
768 let queue = queue.clone();
769 move |p| {
770 queue.enqueue_initiate(item, p)
771 }
772 })
773 .into_process()
774 .and_then(move |i| {
775 request_resource(queue.enqueue_resource.clone())
776 .and_then(move |()| {
777 cons_event(move |p| {
778 queue.enqueue_store_with_priority(pm, i, p)
779 })
780 .into_process()
781 })
782 })
783 }
784
785 pub fn enqueue_with_input_and_storing_priorities(queue: Grc<Self>, pi: SI::Priority, pm: SM::Priority, item: T) -> impl Process<Item = ()>
788 where SI::Priority: Clone,
789 SM::Priority: Clone
790 {
791 cons_event({
792 let queue = queue.clone();
793 move |p| {
794 queue.enqueue_initiate(item, p)
795 }
796 })
797 .into_process()
798 .and_then(move |i| {
799 request_resource_with_priority(queue.enqueue_resource.clone(), pi)
800 .and_then(move |()| {
801 cons_event(move |p| {
802 queue.enqueue_store_with_priority(pm, i, p)
803 })
804 .into_process()
805 })
806 })
807 }
808
809 pub fn try_enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
811 cons_event(move |p| {
812 let x = {
813 try_request_resource_within_event(queue.enqueue_resource.clone())
814 .call_event(p)
815 }?;
816 if x {
817 let i = queue.enqueue_initiate(item, p)?;
818 queue.enqueue_store(i, p)?;
819 Result::Ok(true)
820 } else {
821 Result::Ok(false)
822 }
823 })
824 }
825
826 pub fn try_enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
829 cons_event(move |p| {
830 let x = {
831 try_request_resource_within_event(queue.enqueue_resource.clone())
832 .call_event(p)
833 }?;
834 if x {
835 let i = queue.enqueue_initiate(item, p)?;
836 queue.enqueue_store_with_priority(pm, i, p)?;
837 Result::Ok(true)
838 } else {
839 Result::Ok(false)
840 }
841 })
842 }
843
844 pub fn enqueue_or_lose(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
847 cons_event(move |p| {
848 let x = {
849 try_request_resource_within_event(queue.enqueue_resource.clone())
850 .call_event(p)
851 }?;
852 if x {
853 let i = queue.enqueue_initiate(item, p)?;
854 queue.enqueue_store(i, p)?;
855 Result::Ok(true)
856 } else {
857 queue.enqueue_deny(item, p)?;
858 Result::Ok(false)
859 }
860 })
861 }
862
863 pub fn enqueue_with_storing_priority_or_lose(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
866 cons_event(move |p| {
867 let x = {
868 try_request_resource_within_event(queue.enqueue_resource.clone())
869 .call_event(p)
870 }?;
871 if x {
872 let i = queue.enqueue_initiate(item, p)?;
873 queue.enqueue_store_with_priority(pm, i, p)?;
874 Result::Ok(true)
875 } else {
876 queue.enqueue_deny(item, p)?;
877 Result::Ok(false)
878 }
879 })
880 }
881
882 pub fn enqueue_or_lose_(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
884 Queue::enqueue_or_lose(queue, item)
885 .map(|_| {})
886 }
887
888 pub fn enqueue_with_storing_priority_or_lose_(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()> {
891 Queue::enqueue_with_storing_priority_or_lose(queue, pm, item)
892 .map(|_| {})
893 }
894
895 #[inline]
897 pub fn enqueue_initiated(&self) -> impl Observable<Message = T> + Clone {
898 self.enqueue_initiated_source.publish()
899 }
900
901 #[inline]
903 pub fn enqueue_stored(&self) -> impl Observable<Message = T> + Clone {
904 self.enqueue_stored_source.publish()
905 }
906
907 #[inline]
909 pub fn enqueue_lost(&self) -> impl Observable<Message = T> + Clone {
910 self.enqueue_lost_source.publish()
911 }
912
913 #[inline]
915 pub fn dequeue_requested(&self) -> impl Observable<Message = ()> + Clone {
916 self.dequeue_requested_source.publish()
917 }
918
919 #[inline]
921 pub fn dequeue_extracted(&self) -> impl Observable<Message = T> + Clone {
922 self.dequeue_extracted_source.publish()
923 }
924
925 #[inline]
927 pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
928 self.enqueue_initiated().map(|_| {})
929 .merge(self.enqueue_stored().map(|_| {}))
930 .merge(self.enqueue_lost().map(|_| {}))
931 .merge(self.dequeue_requested())
932 .merge(self.dequeue_extracted().map(|_| {}))
933 }
934
935 fn dequeue_request(&self, p: &Point) -> simulation::Result<f64> {
937 let c = self.dequeue_count.read_at(p);
938 let c2 = c + 1;
939 self.dequeue_count.write_at(c2, p);
940 self.dequeue_requested_source.trigger_at(&(), p)?;
941 Result::Ok(p.time)
942 }
943
944 fn dequeue_extract(&self, t_r: f64, p: &Point) -> simulation::Result<T> {
946 let i = self.queue_store.pop(p).unwrap();
947 self.dequeue_post_extract(t_r, i, p)
948 }
949
950 fn dequeue_post_extract(&self, t_r: f64, i: QueueItem<T>, p: &Point) -> simulation::Result<T> {
952 let t = p.time;
953 let c = self.count.read_at(p);
954 let c2 = c - 1;
955 let stats = self.count_stats.read_at(p);
956 let stats2 = stats.add(t, c2);
957 let ec = self.dequeue_extract_count.read_at(p);
958 let ec2 = ec + 1;
959 self.count.write_at(c2, p);
960 self.count_stats.write_at(stats2, p);
961 self.dequeue_extract_count.write_at(ec2, p);
962 self.dequeue_stat(t_r, &i, p);
963 release_resource_within_event(self.enqueue_resource.clone())
964 .call_event(p)?;
965 self.dequeue_extracted_source
966 .trigger_at(&i.value, p)?;
967 Result::Ok(i.value)
968 }
969
970 fn dequeue_stat(&self, t_r: f64, i: &QueueItem<T>, p: &Point) {
973 let t0 = i.input_time;
974 let t1 = i.storing_time;
975 let t = p.time;
976 let stats = self.dequeue_wait_time.read_at(p);
977 let stats2 = stats.add(t - t_r);
978 self.dequeue_wait_time.write_at(stats2, p);
979 let stats = self.total_wait_time.read_at(p);
980 let stats2 = stats.add(t - t0);
981 self.total_wait_time.write_at(stats2, p);
982 let stats = self.wait_time.read_at(p);
983 let stats2 = stats.add(t - t1);
984 self.wait_time.write_at(stats2, p);
985 }
986
987 fn enqueue_initiate(&self, item: T, p: &Point) -> simulation::Result<QueueItem<T>> {
989 let t = p.time;
990 let c = self.enqueue_count.read_at(p);
991 self.enqueue_count.write_at(c + 1, p);
992 self.enqueue_initiated_source
993 .trigger_at(&item, p)?;
994 Result::Ok(QueueItem {
995 value: item,
996 input_time: t,
997 storing_time: t
998 })
999 }
1000
1001 fn enqueue_store(&self, item: QueueItem<T>, p: &Point) -> simulation::Result<()> {
1003 let t = p.time;
1004 let i2 = QueueItem {
1005 value: item.value,
1006 input_time: item.input_time,
1007 storing_time: t
1008 };
1009 self.queue_store.push(i2.clone(), p);
1010 let c = self.count.read_at(p);
1011 let c2 = c + 1;
1012 self.count.write_at(c2, p);
1013 let stats = self.count_stats.read_at(p);
1014 let stats2 = stats.add(t, c2);
1015 self.count_stats.write_at(stats2, p);
1016 let sc = self.enqueue_store_count.read_at(p);
1017 let sc2 = sc + 1;
1018 self.enqueue_store_count.write_at(sc2, p);
1019 self.enqueue_stat(&i2, p);
1020 release_resource_within_event(self.dequeue_resource.clone())
1021 .call_event(p)?;
1022 self.enqueue_stored_source
1023 .trigger_at(&i2.value, p)
1024 }
1025
1026 fn enqueue_store_with_priority(&self, pm: SM::Priority, item: QueueItem<T>, p: &Point) -> simulation::Result<()> {
1028 let t = p.time;
1029 let i2 = QueueItem {
1030 value: item.value,
1031 input_time: item.input_time,
1032 storing_time: t
1033 };
1034 self.queue_store.push_with_priority(pm, i2.clone(), p);
1035 let c = self.count.read_at(p);
1036 let c2 = c + 1;
1037 self.count.write_at(c2, p);
1038 let stats = self.count_stats.read_at(p);
1039 let stats2 = stats.add(t, c2);
1040 self.count_stats.write_at(stats2, p);
1041 let sc = self.enqueue_store_count.read_at(p);
1042 let sc2 = sc + 1;
1043 self.enqueue_store_count.write_at(sc2, p);
1044 self.enqueue_stat(&i2, p);
1045 release_resource_within_event(self.dequeue_resource.clone())
1046 .call_event(p)?;
1047 self.enqueue_stored_source
1048 .trigger_at(&i2.value, p)
1049 }
1050
1051 fn enqueue_deny(&self, item: T, p: &Point) -> simulation::Result<()> {
1053 let c = self.enqueue_lost_count.read_at(p);
1054 let c2 = c + 1;
1055 self.enqueue_lost_count.write_at(c2, p);
1056 self.enqueue_lost_source
1057 .trigger_at(&item, p)
1058 }
1059
1060 fn enqueue_stat(&self, i: &QueueItem<T>, p: &Point) {
1062 let t0 = i.input_time;
1063 let t1 = i.storing_time;
1064 let stats = self.enqueue_wait_time.read_at(p);
1065 let stats2 = stats.add(t1 - t0);
1066 self.enqueue_wait_time.write_at(stats2, p);
1067 }
1068
1069 pub fn reset(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
1071 cons_event(move |p| {
1072 let t = p.time;
1073 let count = queue.count.read_at(p);
1074 queue.count_stats.write_at(TimingStats::from_sample(t, count), p);
1075 queue.enqueue_count.write_at(0, p);
1076 queue.enqueue_lost_count.write_at(0, p);
1077 queue.enqueue_store_count.write_at(0, p);
1078 queue.dequeue_count.write_at(0, p);
1079 queue.dequeue_extract_count.write_at(0, p);
1080 queue.wait_time.write_at(SamplingStats::empty(), p);
1081 queue.total_wait_time.write_at(SamplingStats::empty(), p);
1082 queue.enqueue_wait_time.write_at(SamplingStats::empty(), p);
1083 queue.dequeue_wait_time.write_at(SamplingStats::empty(), p);
1084 Result::Ok(())
1085 })
1086 }
1087
1088 pub fn wait_while_full(queue: Grc<Self>) -> impl Process<Item = ()> {
1090 Queue::is_full(queue.clone())
1091 .into_process()
1092 .and_then(move |x| {
1093 if x {
1094 process_await(queue.dequeue_extracted())
1095 .and_then(move |_| {
1096 Queue::wait_while_full(queue)
1097 })
1098 .into_boxed()
1099 } else {
1100 return_process(())
1101 .into_boxed()
1102 }
1103 })
1104 }
1105}
1106
1107#[derive(Clone)]
1109pub struct NewQueue<SI, SM, SO, T> {
1110
1111 enqueue_strategy: SI,
1113
1114 storing_strategy: SM,
1116
1117 dequeue_strategy: SO,
1119
1120 max_count: isize,
1122
1123 _phantom: PhantomData<T>
1125}
1126
1127impl<SI, SM, SO, T> Event for NewQueue<SI, SM, SO, T>
1128 where SI: QueueStrategy + 'static,
1129 SM: QueueStrategy,
1130 SO: QueueStrategy + 'static,
1131 T: 'static
1132{
1133 type Item = Queue<SI, SM, SO, T>;
1134
1135 #[doc(hidden)]
1136 #[inline]
1137 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1138 let NewQueue { enqueue_strategy, storing_strategy, dequeue_strategy, max_count, _phantom } = self;
1139 if max_count < 0 {
1140 let msg = String::from("The queue capacity cannot be actually negative");
1141 let err = Error::retry(msg);
1142 Result::Err(err)
1143 } else {
1144 let t = p.time;
1145 let enqueue_resource = {
1146 Resource::<SI>::new_with_max_count(enqueue_strategy, max_count, Some(max_count))
1147 .call_simulation(p.run)?
1148 };
1149 let queue_store = storing_strategy.new_storage();
1150 let dequeue_resource = {
1151 Resource::<SO>::new_with_max_count(dequeue_strategy, 0, Some(max_count))
1152 .call_simulation(p.run)?
1153 };
1154 Result::Ok(Queue {
1155 max_count: max_count,
1156 enqueue_resource: Grc::new(enqueue_resource),
1157 queue_store: queue_store,
1158 dequeue_resource: Grc::new(dequeue_resource),
1159 count: RefComp::new(0),
1160 count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
1161 enqueue_count: RefComp::new(0),
1162 enqueue_lost_count: RefComp::new(0),
1163 enqueue_store_count: RefComp::new(0),
1164 dequeue_count: RefComp::new(0),
1165 dequeue_extract_count: RefComp::new(0),
1166 wait_time: RefComp::new(SamplingStats::empty()),
1167 total_wait_time: RefComp::new(SamplingStats::empty()),
1168 enqueue_wait_time: RefComp::new(SamplingStats::empty()),
1169 dequeue_wait_time: RefComp::new(SamplingStats::empty()),
1170 enqueue_initiated_source: ObservableSource::new(),
1171 enqueue_lost_source: ObservableSource::new(),
1172 enqueue_stored_source: ObservableSource::new(),
1173 dequeue_requested_source: ObservableSource::new(),
1174 dequeue_extracted_source: ObservableSource::new()
1175 })
1176 }
1177 }
1178}