dvcompute_cons/simulation/queue/stats/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::error::*;
11use crate::simulation::Point;
12use crate::simulation::ref_comp::RefComp;
13use crate::simulation::observable::*;
14use crate::simulation::observable::source::*;
15use crate::simulation::simulation::*;
16use crate::simulation::event::*;
17use crate::simulation::process::*;
18use crate::simulation::strategy::*;
19use crate::simulation::resource::*;
20
21use dvcompute_utils::simulation::stats::*;
22use dvcompute_utils::grc::Grc;
23
24/// A type synonym for the ordinary FIFO queue, also known as the FCFS
25/// (First Come - First Serviced) queue.
26pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>;
27
28/// A type synonym for the ordinary LIFO queue, also known as the LCFS
29/// (Last Come - First Serviced) queue.
30pub type LCFSQueue<T> = Queue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>;
31
32/// Represents a bounded queue by using the specified strategies for enqueueing (input), `SI`,
33/// internal storing (in memory), `SM`, and dequeueing (output), `SO`, where `T` denotes
34/// the type of items stored in the queue.
35pub struct Queue<SI, SM, SO, T>
36    where SI: QueueStrategy + 'static,
37          SM: QueueStrategy,
38          SO: QueueStrategy + 'static,
39          T: 'static
40{
41    /// The queue capacity.
42    max_count: isize,
43
44    /// The enqueue resource.
45    enqueue_resource: Grc<Resource<SI>>,
46
47    /// The queue store.
48    queue_store: QueueStorageBox<QueueItem<T>, SM::Priority>,
49
50    /// The dequeue resource.
51    dequeue_resource: Grc<Resource<SO>>,
52
53    /// The queue size.
54    count: RefComp<isize>,
55
56    /// The size statistics.
57    count_stats: RefComp<TimingStats<isize>>,
58
59    /// The enqueue count.
60    enqueue_count: RefComp<isize>,
61
62    /// The count of lost items.
63    enqueue_lost_count: RefComp<isize>,
64
65    /// The count of stored items.
66    enqueue_store_count: RefComp<isize>,
67
68    /// The dequeue count.
69    dequeue_count: RefComp<isize>,
70
71    /// The count of extracted items.
72    dequeue_extract_count: RefComp<isize>,
73
74    /// The wait time.
75    wait_time: RefComp<SamplingStats<f64>>,
76
77    /// The total wait time.
78    total_wait_time: RefComp<SamplingStats<f64>>,
79
80    /// The enqueue wait time.
81    enqueue_wait_time: RefComp<SamplingStats<f64>>,
82
83    /// The dequeue wait time.
84    dequeue_wait_time: RefComp<SamplingStats<f64>>,
85
86    /// The observable source when the enqueue is initiated.
87    enqueue_initiated_source: ObservableSource<T>,
88
89    /// The observable source when the item is lost.
90    enqueue_lost_source: ObservableSource<T>,
91
92    /// The observable source when the item is stored.
93    enqueue_stored_source: ObservableSource<T>,
94
95    /// The observable source when the item is requested for.
96    dequeue_requested_source: ObservableSource<()>,
97
98    /// The observable source when the item is extracted.
99    dequeue_extracted_source: ObservableSource<T>
100}
101
102/// Stores the item and a time of its enqueueing.
103#[derive(Clone)]
104struct QueueItem<T> {
105
106    /// The item value.
107    value: T,
108
109    /// The time of enqueueing the item.
110    input_time: f64,
111
112    /// The time of storing the item.
113    storing_time: f64
114}
115
116/// Create a new bounded FCFS (a.k.a FIFO) queue by the specified capacity.
117#[inline]
118pub fn new_fcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>
119    where T: 'static
120{
121    NewQueue {
122        enqueue_strategy: FCFSStrategy::Instance,
123        storing_strategy: FCFSStrategy::Instance,
124        dequeue_strategy: FCFSStrategy::Instance,
125        max_count: max_count,
126        _phantom: PhantomData
127    }
128}
129
130/// Create a new bounded LCFS (a.k.a LIFO) queue by the specified capacity.
131#[inline]
132pub fn new_lcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>
133    where T: 'static
134{
135    NewQueue {
136        enqueue_strategy: FCFSStrategy::Instance,
137        storing_strategy: LCFSStrategy::Instance,
138        dequeue_strategy: FCFSStrategy::Instance,
139        max_count: max_count,
140        _phantom: PhantomData
141    }
142}
143
144impl<SI, SM, SO, T> Queue<SI, SM, SO, T>
145    where SI: QueueStrategy + 'static,
146          SM: QueueStrategy + 'static,
147          SO: QueueStrategy + 'static,
148          T: Clone + 'static
149{
150    /// Create a new bounded queue by the specified strategies and capacity.
151    #[inline]
152    pub fn new(enqueue_strategy: SI,
153        storing_strategy: SM,
154        dequeue_strategy: SO,
155        max_count: isize) -> NewQueue<SI, SM, SO, T>
156    {
157        NewQueue {
158            enqueue_strategy: enqueue_strategy,
159            storing_strategy: storing_strategy,
160            dequeue_strategy: dequeue_strategy,
161            max_count: max_count,
162            _phantom: PhantomData
163        }
164    }
165
166    /// Return the queue capacity, i.e. its maximum size.
167    #[inline]
168    pub fn max_count(&self) -> isize {
169        self.max_count
170    }
171
172    /// Test whether the queue is empty.
173    #[inline]
174    pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
175        cons_event(move |p| {
176            Result::Ok(queue.count.read_at(p) == 0)
177        })
178    }
179
180    /// Notifies when the `is_empty` property changes.
181    #[inline]
182    pub fn is_empty_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
183        queue.is_empty_changed_()
184            .mapc(move |()| {
185                Queue::is_empty(queue.clone())
186            })
187    }
188
189    /// Notifies when the `is_empty` property changes.
190    #[inline]
191    pub fn is_empty_changed_(&self) -> impl Observable<Message = ()> + Clone {
192        self.count_changed_()
193    }
194
195    /// Test whether the queue is full.
196    #[inline]
197    pub fn is_full(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
198        cons_event(move |p| {
199            Result::Ok(queue.count.read_at(p) == queue.max_count)
200        })
201    }
202
203    /// Notifies when the `is_full` property changes.
204    #[inline]
205    pub fn is_full_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
206        queue.is_full_changed_()
207            .mapc(move |()| {
208                Queue::is_full(queue.clone())
209            })
210    }
211
212    /// Notifies when the `is_full` property changes.
213    #[inline]
214    pub fn is_full_changed_(&self) -> impl Observable<Message = ()> + Clone {
215        self.count_changed_()
216    }
217
218    /// Return the current queue size.
219    #[inline]
220    pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
221        cons_event(move |p| {
222            Result::Ok(queue.count.read_at(p))
223        })
224    }
225
226    /// Return the statistics for the queue size.
227    #[inline]
228    pub fn count_stats(queue: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
229        cons_event(move |p| {
230            Result::Ok(queue.count_stats.read_at(p))
231        })
232    }
233
234    /// Notifies when the `count` property changes.
235    #[inline]
236    pub fn count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
237        queue.count_changed_()
238            .mapc(move |()| {
239                Queue::count(queue.clone())
240            })
241    }
242
243    /// Notifies when the `count` property changes.
244    #[inline]
245    pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
246        self.enqueue_stored().map(|_| {})
247            .merge(self.dequeue_extracted().map(|_| {}))
248    }
249
250    /// Return the total number of enqueue operations, including those ones that have failed due to full capacity.
251    #[inline]
252    pub fn enqueue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
253        cons_event(move |p| {
254            Result::Ok(queue.enqueue_count.read_at(p))
255        })
256    }
257
258    /// Notifies when the `enqueue_count` property changes.
259    #[inline]
260    pub fn enqueue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
261        queue.enqueue_count_changed_()
262            .mapc(move |()| {
263                Queue::enqueue_count(queue.clone())
264            })
265    }
266
267    /// Notifies when the `enqueue_count` property changes.
268    #[inline]
269    pub fn enqueue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
270        self.enqueue_initiated().map(|_| {})
271    }
272
273    /// Return the total number of items that could not be enqueued due to full capacity.
274    #[inline]
275    pub fn enqueue_lost_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
276        cons_event(move |p| {
277            Result::Ok(queue.enqueue_lost_count.read_at(p))
278        })
279    }
280
281    /// Notifies when the `enqueue_lost_count` property changes.
282    #[inline]
283    pub fn enqueue_lost_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
284        queue.enqueue_lost_count_changed_()
285            .mapc(move |()| {
286                Queue::enqueue_lost_count(queue.clone())
287            })
288    }
289
290    /// Notifies when the `enqueue_lost_count` property changes.
291    #[inline]
292    pub fn enqueue_lost_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
293        self.enqueue_lost().map(|_| {})
294    }
295
296    /// Return the total number of input items that were stored.
297    #[inline]
298    pub fn enqueue_store_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
299        cons_event(move |p| {
300            Result::Ok(queue.enqueue_store_count.read_at(p))
301        })
302    }
303
304    /// Notifies when the `enqueue_store_count` property changes.
305    #[inline]
306    pub fn enqueue_store_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
307        queue.enqueue_store_count_changed_()
308            .mapc(move |()| {
309                Queue::enqueue_store_count(queue.clone())
310            })
311    }
312
313    /// Notifies when the `enqueue_store_count` property changes.
314    #[inline]
315    pub fn enqueue_store_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
316        self.enqueue_stored().map(|_| {})
317    }
318
319    /// Return the total number of requests to dequeue the items.
320    #[inline]
321    pub fn dequeue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
322        cons_event(move |p| {
323            Result::Ok(queue.dequeue_count.read_at(p))
324        })
325    }
326
327    /// Notifies when the `dequeue_count` property changes.
328    #[inline]
329    pub fn dequeue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
330        queue.dequeue_count_changed_()
331            .mapc(move |()| {
332                Queue::dequeue_count(queue.clone())
333            })
334    }
335
336    /// Notifies when the `dequeue_count` property changes.
337    #[inline]
338    pub fn dequeue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
339        self.dequeue_requested()
340    }
341
342    /// Return the total number of items that were extracted from the queue with help of dequeue operations.
343    #[inline]
344    pub fn dequeue_extract_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
345        cons_event(move |p| {
346            Result::Ok(queue.dequeue_extract_count.read_at(p))
347        })
348    }
349
350    /// Notifies when the `dequeue_extract_count` property changes.
351    #[inline]
352    pub fn dequeue_extract_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
353        queue.dequeue_extract_count_changed_()
354            .mapc(move |()| {
355                Queue::dequeue_extract_count(queue.clone())
356            })
357    }
358
359    /// Notifies when the `dequeue_extract_count` property changes.
360    #[inline]
361    pub fn dequeue_extract_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
362        self.dequeue_extracted().map(|_| {})
363    }
364
365    /// Return the load factor: the queue size divided by its capacity, i.e. maximum size.
366    #[inline]
367    pub fn load_factor(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
368        cons_event(move |p| {
369            Result::Ok({
370                let x = queue.count.read_at(p);
371                let y = queue.max_count;
372                (x as f64) / (y as f64)
373            })
374        })
375    }
376
377    /// Notifies when the `load_factor` property changes.
378    #[inline]
379    pub fn load_factor_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
380        queue.load_factor_changed_()
381            .mapc(move |()| {
382                Queue::load_factor(queue.clone())
383            })
384    }
385
386    /// Notifies when the `load_factor` property changes.
387    #[inline]
388    pub fn load_factor_changed_(&self) -> impl Observable<Message = ()> + Clone {
389        self.count_changed_()
390    }
391
392    /// Return the rate of input items that were enqueued: how many items per time.
393    #[inline]
394    pub fn enqueue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
395        cons_event(move |p| {
396            Result::Ok({
397                let x  = queue.enqueue_count.read_at(p);
398                let t0 = p.run.specs.start_time;
399                let t  = p.time;
400                (x as f64) / (t - t0)
401            })
402        })
403    }
404
405    /// Return the rate of input items that were stored: how many items per time.
406    #[inline]
407    pub fn store_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
408        cons_event(move |p| {
409            Result::Ok({
410                let x  = queue.enqueue_store_count.read_at(p);
411                let t0 = p.run.specs.start_time;
412                let t  = p.time;
413                (x as f64) / (t - t0)
414            })
415        })
416    }
417
418    /// Return the rate of requests for dequeueing the items: how many items per time.
419    /// It does not include the failed attempts to dequeue immediately without suspension.
420    #[inline]
421    pub fn dequeue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
422        cons_event(move |p| {
423            Result::Ok({
424                let x  = queue.dequeue_count.read_at(p);
425                let t0 = p.run.specs.start_time;
426                let t  = p.time;
427                (x as f64) / (t - t0)
428            })
429        })
430    }
431
432    /// Return the rate of output items that were actually extracted from the queue: how many items per time.
433    #[inline]
434    pub fn dequeue_extract_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
435        cons_event(move |p| {
436            Result::Ok({
437                let x  = queue.dequeue_extract_count.read_at(p);
438                let t0 = p.run.specs.start_time;
439                let t  = p.time;
440                (x as f64) / (t - t0)
441            })
442        })
443    }
444
445    /// Return the wait time from the time at which the item was stored in the queue to
446    /// the time at which it was dequeued.
447    #[inline]
448    pub fn wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
449        cons_event(move |p| {
450            Result::Ok(queue.wait_time.read_at(p))
451        })
452    }
453
454    /// Notifies when the `wait_time` property changes.
455    #[inline]
456    pub fn wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
457        queue.wait_time_changed_()
458            .mapc(move |()| {
459                Queue::wait_time(queue.clone())
460            })
461    }
462
463    /// Notifies when the `wait_time` property changes.
464    #[inline]
465    pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
466        self.dequeue_extracted().map(|_| {})
467    }
468
469    /// Return the total wait time from the time at which the enqueue operation
470    /// was initiated to the time at which the item was dequeued.
471    ///
472    /// In some sense, `total_wait_time` == `enqueue_wait_time` + `wait_time`.
473    #[inline]
474    pub fn total_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
475        cons_event(move |p| {
476            Result::Ok(queue.total_wait_time.read_at(p))
477        })
478    }
479
480    /// Notifies when the `total_wait_time` property changes.
481    #[inline]
482    pub fn total_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
483        queue.total_wait_time_changed_()
484            .mapc(move |()| {
485                Queue::total_wait_time(queue.clone())
486            })
487    }
488
489    /// Notifies when the `total_wait_time` property changes.
490    #[inline]
491    pub fn total_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
492        self.dequeue_extracted().map(|_| {})
493    }
494
495    /// Return the enqueue wait time from the time at which the enqueue operation
496    /// was initiated to the time at which the item was stored in the queue.
497    #[inline]
498    pub fn enqueue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
499        cons_event(move |p| {
500            Result::Ok(queue.enqueue_wait_time.read_at(p))
501        })
502    }
503
504    /// Notifies when the `enqueue_wait_time` property changes.
505    #[inline]
506    pub fn enqueue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
507        queue.enqueue_wait_time_changed_()
508            .mapc(move |()| {
509                Queue::enqueue_wait_time(queue.clone())
510            })
511    }
512
513    /// Notifies when the `enqueue_wait_time` property changes.
514    #[inline]
515    pub fn enqueue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
516        self.enqueue_stored().map(|_| {})
517    }
518
519    /// Return the dequeue wait time from the time at which the dequeue request was made
520    /// to the time at which the corresponding item was actually dequeued.
521    #[inline]
522    pub fn dequeue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
523        cons_event(move |p| {
524            Result::Ok(queue.dequeue_wait_time.read_at(p))
525        })
526    }
527
528    /// Notifies when the `dequeue_wait_time` property changes.
529    #[inline]
530    pub fn dequeue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
531        queue.dequeue_wait_time_changed_()
532            .mapc(move |()| {
533                Queue::dequeue_wait_time(queue.clone())
534            })
535    }
536
537    /// Notifies when the `dequeue_wait_time` property changes.
538    #[inline]
539    pub fn dequeue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
540        self.dequeue_extracted().map(|_| {})
541    }
542
543    /// Return a long-term average queue rate calculated as
544    /// the average queue size divided by the average wait time.
545    ///
546    /// This value may be less than the actual arrival rate as the queue is
547    /// bounded and new arrivals may be blocked while the queue remains full.
548    #[inline]
549    pub fn rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
550        cons_event(move |p| {
551            Result::Ok({
552                let x = queue.count_stats.read_at(p);
553                let y = queue.wait_time.read_at(p);
554                x.mean() / y.mean
555            })
556        })
557    }
558
559    /// Notifies when the `rate` property changes.
560    #[inline]
561    pub fn rate_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
562        queue.rate_changed_()
563            .mapc(move |()| {
564                Queue::rate(queue.clone())
565            })
566    }
567
568    /// Notifies when the `rate` property changes.
569    #[inline]
570    pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
571        self.enqueue_stored().map(|_| {})
572            .merge(self.dequeue_extracted().map(|_| {}))
573    }
574
575    /// Dequeue by suspending the process if the queue is empty.
576    pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
577        cons_event({
578            let queue = queue.clone();
579            move |p| {
580                queue.dequeue_request(p)
581            }
582        })
583        .into_process()
584        .and_then(move |t| {
585            request_resource(queue.dequeue_resource.clone())
586                .and_then(move |()| {
587                    cons_event(move |p| {
588                        queue.dequeue_extract(t, p)
589                    })
590                    .into_process()
591                })
592        })
593    }
594
595    /// Dequeue with output prioerity by suspending the process if the queue is empty.
596    pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
597        where SO::Priority: Clone
598    {
599        cons_event({
600            let queue = queue.clone();
601            move |p| {
602                queue.dequeue_request(p)
603            }
604        })
605        .into_process()
606        .and_then(move |t| {
607            request_resource_with_priority(queue.dequeue_resource.clone(), po)
608                .and_then(move |()| {
609                    cons_event(move |p| {
610                        queue.dequeue_extract(t, p)
611                    })
612                    .into_process()
613                })
614        })
615    }
616
617    /// Try to dequeue immediately.
618    pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
619        try_request_resource_within_event(queue.dequeue_resource.clone())
620            .and_then(move |f| {
621                if f {
622                    cons_event(move |p| {
623                        let t = queue.dequeue_request(p)?;
624                        let x = queue.dequeue_extract(t, p)?;
625                        Result::Ok(Some(x))
626                    }).into_boxed()
627                } else {
628                    return_event(None)
629                        .into_boxed()
630                }
631            })
632    }
633
634    /// Remove the item from the queue and return a flag indicating
635    /// whether the item was found and actually removed.
636    pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
637        where T: PartialEq
638    {
639        let pred = move |x: &T| { *x == item };
640        Queue::delete_by(queue, pred)
641            .map(|x| { x.is_some() })
642    }
643
644    /// Remove the specified item from the queue.
645    pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
646        where T: PartialEq
647    {
648        let pred = move |x: &T| { *x == item };
649        Queue::delete_by(queue, pred)
650            .map(|_| ())
651    }
652
653    /// Remove an item satisfying the specified predicate and return the item if found.
654    pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
655        where F: Fn(&T) -> bool + 'static
656    {
657        try_request_resource_within_event(queue.dequeue_resource.clone())
658            .and_then(move |f| {
659                if f {
660                    cons_event(move |p| {
661                        let pred = move |x: &QueueItem<T>| { pred(&x.value) };
662                        let pred = Box::new(pred);
663                        match queue.queue_store.remove_boxed_by(pred, p) {
664                            None => {
665                                release_resource_within_event(queue.dequeue_resource.clone())
666                                    .call_event(p)?;
667                                Result::Ok(None)
668                            },
669                            Some(i) => {
670                                let t = queue.dequeue_request(p)?;
671                                let x = queue.dequeue_post_extract(t, i, p)?;
672                                Result::Ok(Some(x))
673                            }
674                        }
675                    }).into_boxed()
676                } else {
677                    return_event(None)
678                        .into_boxed()
679                }
680            })
681    }
682
683    /// Test whether there is an item satisfying the specified predicate.
684    pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
685        where F: Fn(&T) -> bool + 'static
686    {
687        cons_event(move |p| {
688            let pred = move |x: &QueueItem<T>| { pred(&x.value) };
689            let pred = Box::new(pred);
690            Result::Ok(queue.queue_store.exists_boxed(pred, p))
691        })
692    }
693
694    /// Find an item satisfying the specified predicate.
695    pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
696        where F: Fn(&T) -> bool + 'static,
697              T: Clone
698    {
699        cons_event(move |p| {
700            let pred = move |x: &QueueItem<T>| { pred(&x.value) };
701            let pred = Box::new(pred);
702            Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.value.clone() }))
703        })
704    }
705
706    /// Clear the queue.
707    pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
708        cons_event(move |p| {
709            loop {
710                let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
711                match x {
712                    None => return Result::Ok(()),
713                    Some(_) => {}
714                }
715            }
716        })
717    }
718
719    /// Enqueue the item by suspending the process if the queue is full.
720    pub fn enqueue(queue: Grc<Self>, item: T) -> impl Process<Item = ()> {
721        cons_event({
722            let queue = queue.clone();
723            move |p| {
724                queue.enqueue_initiate(item, p)
725            }
726        })
727        .into_process()
728        .and_then(move |i| {
729            request_resource(queue.enqueue_resource.clone())
730                .and_then(move |()| {
731                    cons_event(move |p| {
732                        queue.enqueue_store(i, p)
733                    })
734                    .into_process()
735                })
736        })
737    }
738
739    /// Enqueue the item with input priority by suspending the process
740    /// if the queue is full.
741    pub fn enqueue_with_input_priority(queue: Grc<Self>, pi: SI::Priority, item: T) -> impl Process<Item = ()>
742        where SI::Priority: Clone
743    {
744        cons_event({
745            let queue = queue.clone();
746            move |p| {
747                queue.enqueue_initiate(item, p)
748            }
749        })
750        .into_process()
751        .and_then(move |i| {
752            request_resource_with_priority(queue.enqueue_resource.clone(), pi)
753                .and_then(move |()| {
754                    cons_event(move |p| {
755                        queue.enqueue_store(i, p)
756                    })
757                    .into_process()
758                })
759        })
760    }
761
762    /// Enqueue the item with storing priority by suspending the process
763    /// if the queue is full.
764    pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Process<Item = ()>
765        where SM::Priority: Clone
766    {
767        cons_event({
768            let queue = queue.clone();
769            move |p| {
770                queue.enqueue_initiate(item, p)
771            }
772        })
773        .into_process()
774        .and_then(move |i| {
775            request_resource(queue.enqueue_resource.clone())
776                .and_then(move |()| {
777                    cons_event(move |p| {
778                        queue.enqueue_store_with_priority(pm, i, p)
779                    })
780                    .into_process()
781                })
782        })
783    }
784
785    /// Enqueue the item with input and storing priorities by suspending the process
786    /// if the queue is full.
787    pub fn enqueue_with_input_and_storing_priorities(queue: Grc<Self>, pi: SI::Priority, pm: SM::Priority, item: T) -> impl Process<Item = ()>
788        where SI::Priority: Clone,
789              SM::Priority: Clone
790    {
791        cons_event({
792            let queue = queue.clone();
793            move |p| {
794                queue.enqueue_initiate(item, p)
795            }
796        })
797        .into_process()
798        .and_then(move |i| {
799            request_resource_with_priority(queue.enqueue_resource.clone(), pi)
800                .and_then(move |()| {
801                    cons_event(move |p| {
802                        queue.enqueue_store_with_priority(pm, i, p)
803                    })
804                    .into_process()
805                })
806        })
807    }
808
809    /// Try to enqueue the item. Return `false` within `Event` computation if the queue is full.
810    pub fn try_enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
811        cons_event(move |p| {
812            let x = {
813                try_request_resource_within_event(queue.enqueue_resource.clone())
814                    .call_event(p)
815            }?;
816            if x {
817                let i = queue.enqueue_initiate(item, p)?;
818                queue.enqueue_store(i, p)?;
819                Result::Ok(true)
820            } else {
821                Result::Ok(false)
822            }
823        })
824    }
825
826    /// Try to enqueue the item with storing priority. Return `false`
827    /// within `Event` computation if the queue is full.
828    pub fn try_enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
829        cons_event(move |p| {
830            let x = {
831                try_request_resource_within_event(queue.enqueue_resource.clone())
832                    .call_event(p)
833            }?;
834            if x {
835                let i = queue.enqueue_initiate(item, p)?;
836                queue.enqueue_store_with_priority(pm, i, p)?;
837                Result::Ok(true)
838            } else {
839                Result::Ok(false)
840            }
841        })
842    }
843
844    /// Try to enqueue the item. If the queue is full then
845    /// the item will be counted as lost and `false` will be returned.
846    pub fn enqueue_or_lose(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
847        cons_event(move |p| {
848            let x = {
849                try_request_resource_within_event(queue.enqueue_resource.clone())
850                    .call_event(p)
851            }?;
852            if x {
853                let i = queue.enqueue_initiate(item, p)?;
854                queue.enqueue_store(i, p)?;
855                Result::Ok(true)
856            } else {
857                queue.enqueue_deny(item, p)?;
858                Result::Ok(false)
859            }
860        })
861    }
862
863    /// Try to enqueue the item with storing piority. If the queue is full then
864    /// the item will be counted as lost and `false` will be returned.
865    pub fn enqueue_with_storing_priority_or_lose(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
866        cons_event(move |p| {
867            let x = {
868                try_request_resource_within_event(queue.enqueue_resource.clone())
869                    .call_event(p)
870            }?;
871            if x {
872                let i = queue.enqueue_initiate(item, p)?;
873                queue.enqueue_store_with_priority(pm, i, p)?;
874                Result::Ok(true)
875            } else {
876                queue.enqueue_deny(item, p)?;
877                Result::Ok(false)
878            }
879        })
880    }
881
882    /// Try to enqueue the item. If the queue is full then the item will be counted as lost.
883    pub fn enqueue_or_lose_(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
884        Queue::enqueue_or_lose(queue, item)
885            .map(|_| {})
886    }
887
888    /// Try to enqueue the item with storing priority. If the queue is full then
889    /// the item will be counted as lost.
890    pub fn enqueue_with_storing_priority_or_lose_(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()> {
891        Queue::enqueue_with_storing_priority_or_lose(queue, pm, item)
892            .map(|_| {})
893    }
894
895    /// Notifies when the enqueue operation is initiated.
896    #[inline]
897    pub fn enqueue_initiated(&self) -> impl Observable<Message = T> + Clone {
898        self.enqueue_initiated_source.publish()
899    }
900
901    /// Notifies when the item to be enqueued is stored.
902    #[inline]
903    pub fn enqueue_stored(&self) -> impl Observable<Message = T> + Clone {
904        self.enqueue_stored_source.publish()
905    }
906
907    /// Notifies when the item that would have to be enqueued is lost.
908    #[inline]
909    pub fn enqueue_lost(&self) -> impl Observable<Message = T> + Clone {
910        self.enqueue_lost_source.publish()
911    }
912
913    /// Notifies when the dequeue operation is requested for.
914    #[inline]
915    pub fn dequeue_requested(&self) -> impl Observable<Message = ()> + Clone {
916        self.dequeue_requested_source.publish()
917    }
918
919    /// Notifies when the item is dequeued.
920    #[inline]
921    pub fn dequeue_extracted(&self) -> impl Observable<Message = T> + Clone {
922        self.dequeue_extracted_source.publish()
923    }
924
925    /// Notifies whenever any property changes.
926    #[inline]
927    pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
928        self.enqueue_initiated().map(|_| {})
929            .merge(self.enqueue_stored().map(|_| {}))
930            .merge(self.enqueue_lost().map(|_| {}))
931            .merge(self.dequeue_requested())
932            .merge(self.dequeue_extracted().map(|_| {}))
933    }
934
935    /// Accept the dequeue request and return the current simulation time.
936    fn dequeue_request(&self, p: &Point) -> simulation::Result<f64> {
937        let c  = self.dequeue_count.read_at(p);
938        let c2 = c + 1;
939        self.dequeue_count.write_at(c2, p);
940        self.dequeue_requested_source.trigger_at(&(), p)?;
941        Result::Ok(p.time)
942    }
943
944    /// Extract an item by the dequeue request.
945    fn dequeue_extract(&self, t_r: f64, p: &Point) -> simulation::Result<T> {
946        let i = self.queue_store.pop(p).unwrap();
947        self.dequeue_post_extract(t_r, i, p)
948    }
949
950    /// A post action after extracting the item by the dequeue request.
951    fn dequeue_post_extract(&self, t_r: f64, i: QueueItem<T>, p: &Point) -> simulation::Result<T> {
952        let t  = p.time;
953        let c  = self.count.read_at(p);
954        let c2 = c - 1;
955        let stats  = self.count_stats.read_at(p);
956        let stats2 = stats.add(t, c2);
957        let ec  = self.dequeue_extract_count.read_at(p);
958        let ec2 = ec + 1;
959        self.count.write_at(c2, p);
960        self.count_stats.write_at(stats2, p);
961        self.dequeue_extract_count.write_at(ec2, p);
962        self.dequeue_stat(t_r, &i, p);
963        release_resource_within_event(self.enqueue_resource.clone())
964            .call_event(p)?;
965        self.dequeue_extracted_source
966            .trigger_at(&i.value, p)?;
967        Result::Ok(i.value)
968    }
969
970    /// Update the statistics for the output wait time of the dequeue operation
971    /// and the wait time of storing in the queue.
972    fn dequeue_stat(&self, t_r: f64, i: &QueueItem<T>, p: &Point) {
973        let t0 = i.input_time;
974        let t1 = i.storing_time;
975        let t  = p.time;
976        let stats  = self.dequeue_wait_time.read_at(p);
977        let stats2 = stats.add(t - t_r);
978        self.dequeue_wait_time.write_at(stats2, p);
979        let stats  = self.total_wait_time.read_at(p);
980        let stats2 = stats.add(t - t0);
981        self.total_wait_time.write_at(stats2, p);
982        let stats  = self.wait_time.read_at(p);
983        let stats2 = stats.add(t - t1);
984        self.wait_time.write_at(stats2, p);
985    }
986
987    /// Initiate the process of enqueueing the item.
988    fn enqueue_initiate(&self, item: T, p: &Point) -> simulation::Result<QueueItem<T>> {
989        let t = p.time;
990        let c = self.enqueue_count.read_at(p);
991        self.enqueue_count.write_at(c + 1, p);
992        self.enqueue_initiated_source
993            .trigger_at(&item, p)?;
994        Result::Ok(QueueItem {
995            value: item,
996            input_time: t,
997            storing_time: t
998        })
999    }
1000
1001    /// Store the item.
1002    fn enqueue_store(&self, item: QueueItem<T>, p: &Point) -> simulation::Result<()> {
1003        let t  = p.time;
1004        let i2 = QueueItem {
1005            value: item.value,
1006            input_time: item.input_time,
1007            storing_time: t
1008        };
1009        self.queue_store.push(i2.clone(), p);
1010        let c  = self.count.read_at(p);
1011        let c2 = c + 1;
1012        self.count.write_at(c2, p);
1013        let stats  = self.count_stats.read_at(p);
1014        let stats2 = stats.add(t, c2);
1015        self.count_stats.write_at(stats2, p);
1016        let sc  = self.enqueue_store_count.read_at(p);
1017        let sc2 = sc + 1;
1018        self.enqueue_store_count.write_at(sc2, p);
1019        self.enqueue_stat(&i2, p);
1020        release_resource_within_event(self.dequeue_resource.clone())
1021            .call_event(p)?;
1022        self.enqueue_stored_source
1023            .trigger_at(&i2.value, p)
1024    }
1025
1026    /// Store the item with priority.
1027    fn enqueue_store_with_priority(&self, pm: SM::Priority, item: QueueItem<T>, p: &Point) -> simulation::Result<()> {
1028        let t  = p.time;
1029        let i2 = QueueItem {
1030            value: item.value,
1031            input_time: item.input_time,
1032            storing_time: t
1033        };
1034        self.queue_store.push_with_priority(pm, i2.clone(), p);
1035        let c  = self.count.read_at(p);
1036        let c2 = c + 1;
1037        self.count.write_at(c2, p);
1038        let stats  = self.count_stats.read_at(p);
1039        let stats2 = stats.add(t, c2);
1040        self.count_stats.write_at(stats2, p);
1041        let sc  = self.enqueue_store_count.read_at(p);
1042        let sc2 = sc + 1;
1043        self.enqueue_store_count.write_at(sc2, p);
1044        self.enqueue_stat(&i2, p);
1045        release_resource_within_event(self.dequeue_resource.clone())
1046            .call_event(p)?;
1047        self.enqueue_stored_source
1048            .trigger_at(&i2.value, p)
1049    }
1050
1051    /// Deny the enqueue operation.
1052    fn enqueue_deny(&self, item: T, p: &Point) -> simulation::Result<()> {
1053        let c  = self.enqueue_lost_count.read_at(p);
1054        let c2 = c + 1;
1055        self.enqueue_lost_count.write_at(c2, p);
1056        self.enqueue_lost_source
1057            .trigger_at(&item, p)
1058    }
1059
1060    /// Update the statistics for the input wait time of the enqueue operation.
1061    fn enqueue_stat(&self, i: &QueueItem<T>, p: &Point) {
1062        let t0 = i.input_time;
1063        let t1 = i.storing_time;
1064        let stats  = self.enqueue_wait_time.read_at(p);
1065        let stats2 = stats.add(t1 - t0);
1066        self.enqueue_wait_time.write_at(stats2, p);
1067    }
1068
1069    /// Reset the statistics.
1070    pub fn reset(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
1071        cons_event(move |p| {
1072            let t = p.time;
1073            let count = queue.count.read_at(p);
1074            queue.count_stats.write_at(TimingStats::from_sample(t, count), p);
1075            queue.enqueue_count.write_at(0, p);
1076            queue.enqueue_lost_count.write_at(0, p);
1077            queue.enqueue_store_count.write_at(0, p);
1078            queue.dequeue_count.write_at(0, p);
1079            queue.dequeue_extract_count.write_at(0, p);
1080            queue.wait_time.write_at(SamplingStats::empty(), p);
1081            queue.total_wait_time.write_at(SamplingStats::empty(), p);
1082            queue.enqueue_wait_time.write_at(SamplingStats::empty(), p);
1083            queue.dequeue_wait_time.write_at(SamplingStats::empty(), p);
1084            Result::Ok(())
1085        })
1086    }
1087
1088    /// Wait while the queue is full.
1089    pub fn wait_while_full(queue: Grc<Self>) -> impl Process<Item = ()> {
1090        Queue::is_full(queue.clone())
1091            .into_process()
1092            .and_then(move |x| {
1093                if x {
1094                    process_await(queue.dequeue_extracted())
1095                        .and_then(move |_| {
1096                            Queue::wait_while_full(queue)
1097                        })
1098                        .into_boxed()
1099                } else {
1100                    return_process(())
1101                        .into_boxed()
1102                }
1103            })
1104    }
1105}
1106
1107/// Computation that creates a new `Queue`.
1108#[derive(Clone)]
1109pub struct NewQueue<SI, SM, SO, T> {
1110
1111    /// The enqueue strategy.
1112    enqueue_strategy: SI,
1113
1114    /// The storing strategy.
1115    storing_strategy: SM,
1116
1117    /// The output strategy.
1118    dequeue_strategy: SO,
1119
1120    /// The capacity.
1121    max_count: isize,
1122
1123    /// To keep the type parameter.
1124    _phantom: PhantomData<T>
1125}
1126
1127impl<SI, SM, SO, T> Event for NewQueue<SI, SM, SO, T>
1128    where SI: QueueStrategy + 'static,
1129          SM: QueueStrategy,
1130          SO: QueueStrategy + 'static,
1131          T: 'static
1132{
1133    type Item = Queue<SI, SM, SO, T>;
1134
1135    #[doc(hidden)]
1136    #[inline]
1137    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1138        let NewQueue { enqueue_strategy, storing_strategy, dequeue_strategy, max_count, _phantom } = self;
1139        if max_count < 0 {
1140            let msg = String::from("The queue capacity cannot be actually negative");
1141            let err = Error::retry(msg);
1142            Result::Err(err)
1143        } else {
1144            let t = p.time;
1145            let enqueue_resource = {
1146                Resource::<SI>::new_with_max_count(enqueue_strategy, max_count, Some(max_count))
1147                    .call_simulation(p.run)?
1148            };
1149            let queue_store = storing_strategy.new_storage();
1150            let dequeue_resource = {
1151                Resource::<SO>::new_with_max_count(dequeue_strategy, 0, Some(max_count))
1152                    .call_simulation(p.run)?
1153            };
1154            Result::Ok(Queue {
1155                max_count: max_count,
1156                enqueue_resource: Grc::new(enqueue_resource),
1157                queue_store: queue_store,
1158                dequeue_resource: Grc::new(dequeue_resource),
1159                count: RefComp::new(0),
1160                count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
1161                enqueue_count: RefComp::new(0),
1162                enqueue_lost_count: RefComp::new(0),
1163                enqueue_store_count: RefComp::new(0),
1164                dequeue_count: RefComp::new(0),
1165                dequeue_extract_count: RefComp::new(0),
1166                wait_time: RefComp::new(SamplingStats::empty()),
1167                total_wait_time: RefComp::new(SamplingStats::empty()),
1168                enqueue_wait_time: RefComp::new(SamplingStats::empty()),
1169                dequeue_wait_time: RefComp::new(SamplingStats::empty()),
1170                enqueue_initiated_source: ObservableSource::new(),
1171                enqueue_lost_source: ObservableSource::new(),
1172                enqueue_stored_source: ObservableSource::new(),
1173                dequeue_requested_source: ObservableSource::new(),
1174                dequeue_extracted_source: ObservableSource::new()
1175            })
1176        }
1177    }
1178}