dvcompute/simulation/queue/unbounded/stats/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::Point;
11use crate::simulation::ref_comp::RefComp;
12use crate::simulation::observable::*;
13use crate::simulation::observable::source::*;
14use crate::simulation::simulation::*;
15use crate::simulation::event::*;
16use crate::simulation::process::*;
17use crate::simulation::strategy::*;
18use crate::simulation::resource::*;
19
20use dvcompute_utils::simulation::stats::*;
21use dvcompute_utils::grc::Grc;
22
23/// A type synonym for the ordinary FIFO queue, also known as the FCFS
24/// (First Come - First Serviced) queue.
25pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, T>;
26
27/// A type synonym for the ordinary LIFO queue, also known as the LCFS
28/// (Last Come - First Serviced) queue.
29pub type LCFSQueue<T> = Queue<LCFSStrategy, FCFSStrategy, T>;
30
31/// Represents an unbounded queue by using the specified strategies for internal storing (in memory), `SM`,
32/// and dequeueing (output), `SO`, where `T` denotes the type of items stored in the queue.
33pub struct Queue<SM, SO, T>
34    where SM: QueueStrategy,
35          SO: QueueStrategy + 'static,
36          T: 'static
37{
38    /// The queue store.
39    queue_store: QueueStorageBox<QueueItem<T>, SM::Priority>,
40
41    /// The dequeue resource.
42    dequeue_resource: Grc<Resource<SO>>,
43
44    /// The queue size.
45    count: RefComp<isize>,
46
47    /// The size statistics.
48    count_stats: RefComp<TimingStats<isize>>,
49
50    /// The count of stored items.
51    enqueue_store_count: RefComp<isize>,
52
53    /// The dequeue count.
54    dequeue_count: RefComp<isize>,
55
56    /// The count of extracted items.
57    dequeue_extract_count: RefComp<isize>,
58
59    /// The wait time.
60    wait_time: RefComp<SamplingStats<f64>>,
61
62    /// The dequeue wait time.
63    dequeue_wait_time: RefComp<SamplingStats<f64>>,
64
65    /// The observable source when the item is stored.
66    enqueue_stored_source: ObservableSource<T>,
67
68    /// The observable source when the item is requested for.
69    dequeue_requested_source: ObservableSource<()>,
70
71    /// The observable source when the item is extracted.
72    dequeue_extracted_source: ObservableSource<T>
73}
74
75/// Stores the item and a time of its enqueueing.
76#[derive(Clone)]
77struct QueueItem<T> {
78
79    /// The item value.
80    value: T,
81
82    /// The time of storing the item.
83    storing_time: f64
84}
85
86/// Create a new unbounded FCFS (a.k.a FIFO) queue by the specified capacity.
87#[inline]
88pub fn new_fcfs_queue<T>() -> NewQueue<FCFSStrategy, FCFSStrategy, T>
89    where T: 'static
90{
91    NewQueue {
92        storing_strategy: FCFSStrategy::Instance,
93        dequeue_strategy: FCFSStrategy::Instance,
94        _phantom: PhantomData
95    }
96}
97
98/// Create a new unbounded LCFS (a.k.a LIFO) queue by the specified capacity.
99#[inline]
100pub fn new_lcfs_queue<T>() -> NewQueue<LCFSStrategy, FCFSStrategy, T>
101    where T: 'static
102{
103    NewQueue {
104        storing_strategy: LCFSStrategy::Instance,
105        dequeue_strategy: FCFSStrategy::Instance,
106        _phantom: PhantomData
107    }
108}
109
110impl<SM, SO, T> Queue<SM, SO, T>
111    where SM: QueueStrategy + 'static,
112          SO: QueueStrategy + 'static,
113          T: Clone + 'static
114{
115    /// Create a new unbounded queue by the specified strategies.
116    #[inline]
117    pub fn new(storing_strategy: SM, dequeue_strategy: SO) -> NewQueue<SM, SO, T> {
118        NewQueue {
119            storing_strategy: storing_strategy,
120            dequeue_strategy: dequeue_strategy,
121            _phantom: PhantomData
122        }
123    }
124
125    /// Test whether the queue is empty.
126    #[inline]
127    pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
128        cons_event(move |p| {
129            Result::Ok(queue.count.read_at(p) == 0)
130        })
131    }
132
133    /// Notifies when the `is_empty` property changes.
134    #[inline]
135    pub fn is_empty_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
136        queue.is_empty_changed_()
137            .mapc(move |()| {
138                Queue::is_empty(queue.clone())
139            })
140    }
141
142    /// Notifies when the `is_empty` property changes.
143    #[inline]
144    pub fn is_empty_changed_(&self) -> impl Observable<Message = ()> + Clone {
145        self.count_changed_()
146    }
147
148    /// Return the current queue size.
149    #[inline]
150    pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
151        cons_event(move |p| {
152            Result::Ok(queue.count.read_at(p))
153        })
154    }
155
156    /// Return the statistics for the queue size.
157    #[inline]
158    pub fn count_stats(queue: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
159        cons_event(move |p| {
160            Result::Ok(queue.count_stats.read_at(p))
161        })
162    }
163
164    /// Notifies when the `count` property changes.
165    #[inline]
166    pub fn count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
167        queue.count_changed_()
168            .mapc(move |()| {
169                Queue::count(queue.clone())
170            })
171    }
172
173    /// Notifies when the `count` property changes.
174    #[inline]
175    pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
176        self.enqueue_stored().map(|_| {})
177            .merge(self.dequeue_extracted().map(|_| {}))
178    }
179
180    /// Return the total number of input items that were stored.
181    #[inline]
182    pub fn enqueue_store_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
183        cons_event(move |p| {
184            Result::Ok(queue.enqueue_store_count.read_at(p))
185        })
186    }
187
188    /// Notifies when the `enqueue_store_count` property changes.
189    #[inline]
190    pub fn enqueue_store_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
191        queue.enqueue_store_count_changed_()
192            .mapc(move |()| {
193                Queue::enqueue_store_count(queue.clone())
194            })
195    }
196
197    /// Notifies when the `enqueue_store_count` property changes.
198    #[inline]
199    pub fn enqueue_store_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
200        self.enqueue_stored().map(|_| {})
201    }
202
203    /// Return the total number of requests to dequeue the items.
204    #[inline]
205    pub fn dequeue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
206        cons_event(move |p| {
207            Result::Ok(queue.dequeue_count.read_at(p))
208        })
209    }
210
211    /// Notifies when the `dequeue_count` property changes.
212    #[inline]
213    pub fn dequeue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
214        queue.dequeue_count_changed_()
215            .mapc(move |()| {
216                Queue::dequeue_count(queue.clone())
217            })
218    }
219
220    /// Notifies when the `dequeue_count` property changes.
221    #[inline]
222    pub fn dequeue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
223        self.dequeue_requested()
224    }
225
226    /// Return the total number of items that were extracted from the queue with help of dequeue operations.
227    #[inline]
228    pub fn dequeue_extract_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
229        cons_event(move |p| {
230            Result::Ok(queue.dequeue_extract_count.read_at(p))
231        })
232    }
233
234    /// Notifies when the `dequeue_extract_count` property changes.
235    #[inline]
236    pub fn dequeue_extract_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
237        queue.dequeue_extract_count_changed_()
238            .mapc(move |()| {
239                Queue::dequeue_extract_count(queue.clone())
240            })
241    }
242
243    /// Notifies when the `dequeue_extract_count` property changes.
244    #[inline]
245    pub fn dequeue_extract_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
246        self.dequeue_extracted().map(|_| {})
247    }
248
249    /// Return the rate of input items that were stored: how many items per time.
250    #[inline]
251    pub fn store_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
252        cons_event(move |p| {
253            Result::Ok({
254                let x  = queue.enqueue_store_count.read_at(p);
255                let t0 = p.run.specs.start_time;
256                let t  = p.time;
257                (x as f64) / (t - t0)
258            })
259        })
260    }
261
262    /// Return the rate of requests for dequeueing the items: how many items per time.
263    /// It does not include the failed attempts to dequeue immediately without suspension.
264    #[inline]
265    pub fn dequeue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
266        cons_event(move |p| {
267            Result::Ok({
268                let x  = queue.dequeue_count.read_at(p);
269                let t0 = p.run.specs.start_time;
270                let t  = p.time;
271                (x as f64) / (t - t0)
272            })
273        })
274    }
275
276    /// Return the rate of output items that were actually extracted from the queue: how many items per time.
277    #[inline]
278    pub fn dequeue_extract_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
279        cons_event(move |p| {
280            Result::Ok({
281                let x  = queue.dequeue_extract_count.read_at(p);
282                let t0 = p.run.specs.start_time;
283                let t  = p.time;
284                (x as f64) / (t - t0)
285            })
286        })
287    }
288
289    /// Return the wait time from the time at which the item was stored in the queue to
290    /// the time at which it was dequeued.
291    #[inline]
292    pub fn wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
293        cons_event(move |p| {
294            Result::Ok(queue.wait_time.read_at(p))
295        })
296    }
297
298    /// Notifies when the `wait_time` property changes.
299    #[inline]
300    pub fn wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
301        queue.wait_time_changed_()
302            .mapc(move |()| {
303                Queue::wait_time(queue.clone())
304            })
305    }
306
307    /// Notifies when the `wait_time` property changes.
308    #[inline]
309    pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
310        self.dequeue_extracted().map(|_| {})
311    }
312
313    /// Return the dequeue wait time from the time at which the dequeue request was made
314    /// to the time at which the corresponding item was actually dequeued.
315    #[inline]
316    pub fn dequeue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
317        cons_event(move |p| {
318            Result::Ok(queue.dequeue_wait_time.read_at(p))
319        })
320    }
321
322    /// Notifies when the `dequeue_wait_time` property changes.
323    #[inline]
324    pub fn dequeue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
325        queue.dequeue_wait_time_changed_()
326            .mapc(move |()| {
327                Queue::dequeue_wait_time(queue.clone())
328            })
329    }
330
331    /// Notifies when the `dequeue_wait_time` property changes.
332    #[inline]
333    pub fn dequeue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
334        self.dequeue_extracted().map(|_| {})
335    }
336
337    /// Return a long-term average queue rate calculated as
338    /// the average queue size divided by the average wait time.
339    #[inline]
340    pub fn rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
341        cons_event(move |p| {
342            Result::Ok({
343                let x = queue.count_stats.read_at(p);
344                let y = queue.wait_time.read_at(p);
345                x.mean() / y.mean
346            })
347        })
348    }
349
350    /// Notifies when the `rate` property changes.
351    #[inline]
352    pub fn rate_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
353        queue.rate_changed_()
354            .mapc(move |()| {
355                Queue::rate(queue.clone())
356            })
357    }
358
359    /// Notifies when the `rate` property changes.
360    #[inline]
361    pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
362        self.enqueue_stored().map(|_| {})
363            .merge(self.dequeue_extracted().map(|_| {}))
364    }
365
366    /// Dequeue by suspending the process if the queue is empty.
367    pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
368        cons_event({
369            let queue = queue.clone();
370            move |p| {
371                queue.dequeue_request(p)
372            }
373        })
374        .into_process()
375        .and_then(move |t| {
376            request_resource(queue.dequeue_resource.clone())
377                .and_then(move |()| {
378                    cons_event(move |p| {
379                        queue.dequeue_extract(t, p)
380                    })
381                    .into_process()
382                })
383        })
384    }
385
386    /// Dequeue with output prioerity by suspending the process if the queue is empty.
387    pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
388        where SO::Priority: Clone
389    {
390        cons_event({
391            let queue = queue.clone();
392            move |p| {
393                queue.dequeue_request(p)
394            }
395        })
396        .into_process()
397        .and_then(move |t| {
398            request_resource_with_priority(queue.dequeue_resource.clone(), po)
399                .and_then(move |()| {
400                    cons_event(move |p| {
401                        queue.dequeue_extract(t, p)
402                    })
403                    .into_process()
404                })
405        })
406    }
407
408    /// Try to dequeue immediately.
409    pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
410        try_request_resource_within_event(queue.dequeue_resource.clone())
411            .and_then(move |f| {
412                if f {
413                    cons_event(move |p| {
414                        let t = queue.dequeue_request(p)?;
415                        let x = queue.dequeue_extract(t, p)?;
416                        Result::Ok(Some(x))
417                    }).into_boxed()
418                } else {
419                    return_event(None)
420                        .into_boxed()
421                }
422            })
423    }
424
425    /// Remove the item from the queue and return a flag indicating
426    /// whether the item was found and actually removed.
427    pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
428        where T: PartialEq
429    {
430        let pred = move |x: &T| { *x == item };
431        Queue::delete_by(queue, pred)
432            .map(|x| { x.is_some() })
433    }
434
435    /// Remove the specified item from the queue.
436    pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
437        where T: PartialEq
438    {
439        let pred = move |x: &T| { *x == item };
440        Queue::delete_by(queue, pred)
441            .map(|_| ())
442    }
443
444    /// Remove an item satisfying the specified predicate and return the item if found.
445    pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
446        where F: Fn(&T) -> bool + 'static
447    {
448        try_request_resource_within_event(queue.dequeue_resource.clone())
449            .and_then(move |f| {
450                if f {
451                    cons_event(move |p| {
452                        let pred = move |x: &QueueItem<T>| { pred(&x.value) };
453                        let pred = Box::new(pred);
454                        match queue.queue_store.remove_boxed_by(pred, p) {
455                            None => {
456                                release_resource_within_event(queue.dequeue_resource.clone())
457                                    .call_event(p)?;
458                                Result::Ok(None)
459                            },
460                            Some(i) => {
461                                let t = queue.dequeue_request(p)?;
462                                let x = queue.dequeue_post_extract(t, i, p)?;
463                                Result::Ok(Some(x))
464                            }
465                        }
466                    }).into_boxed()
467                } else {
468                    return_event(None)
469                        .into_boxed()
470                }
471            })
472    }
473
474    /// Test whether there is an item satisfying the specified predicate.
475    pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
476        where F: Fn(&T) -> bool + 'static
477    {
478        cons_event(move |p| {
479            let pred = move |x: &QueueItem<T>| { pred(&x.value) };
480            let pred = Box::new(pred);
481            Result::Ok(queue.queue_store.exists_boxed(pred, p))
482        })
483    }
484
485    /// Find an item satisfying the specified predicate.
486    pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
487        where F: Fn(&T) -> bool + 'static,
488              T: Clone
489    {
490        cons_event(move |p| {
491            let pred = move |x: &QueueItem<T>| { pred(&x.value) };
492            let pred = Box::new(pred);
493            Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.value.clone() }))
494        })
495    }
496
497    /// Clear the queue.
498    pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
499        cons_event(move |p| {
500            loop {
501                let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
502                match x {
503                    None => return Result::Ok(()),
504                    Some(_) => {}
505                }
506            }
507        })
508    }
509
510    /// Enqueue the item.
511    #[inline]
512    pub fn enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
513        cons_event(move |p| {
514            queue.enqueue_store(item, p)
515        })
516    }
517
518    /// Enqueue the item with storing priority.
519    #[inline]
520    pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()>
521        where SM::Priority: Clone
522    {
523        cons_event(move |p| {
524            queue.enqueue_store_with_priority(pm, item, p)
525        })
526    }
527
528    /// Notifies when the item to be enqueued is stored.
529    #[inline]
530    pub fn enqueue_stored(&self) -> impl Observable<Message = T> + Clone {
531        self.enqueue_stored_source.publish()
532    }
533
534    /// Notifies when the dequeue operation is requested for.
535    #[inline]
536    pub fn dequeue_requested(&self) -> impl Observable<Message = ()> + Clone {
537        self.dequeue_requested_source.publish()
538    }
539
540    /// Notifies when the item is dequeued.
541    #[inline]
542    pub fn dequeue_extracted(&self) -> impl Observable<Message = T> + Clone {
543        self.dequeue_extracted_source.publish()
544    }
545
546    /// Notifies whenever any property changes.
547    #[inline]
548    pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
549        self.enqueue_stored().map(|_| {})
550            .merge(self.dequeue_requested())
551            .merge(self.dequeue_extracted().map(|_| {}))
552    }
553
554    /// Accept the dequeue request and return the current simulation time.
555    fn dequeue_request(&self, p: &Point) -> simulation::Result<f64> {
556        let c  = self.dequeue_count.read_at(p);
557        let c2 = c + 1;
558        self.dequeue_count.write_at(c2, p);
559        self.dequeue_requested_source.trigger_at(&(), p)?;
560        Result::Ok(p.time)
561    }
562
563    /// Extract an item by the dequeue request.
564    fn dequeue_extract(&self, t_r: f64, p: &Point) -> simulation::Result<T> {
565        let i = self.queue_store.pop(p).unwrap();
566        self.dequeue_post_extract(t_r, i, p)
567    }
568
569    /// A post action after extracting the item by the dequeue request.
570    fn dequeue_post_extract(&self, t_r: f64, i: QueueItem<T>, p: &Point) -> simulation::Result<T> {
571        let t  = p.time;
572        let c  = self.count.read_at(p);
573        let c2 = c - 1;
574        let stats  = self.count_stats.read_at(p);
575        let stats2 = stats.add(t, c2);
576        let ec  = self.dequeue_extract_count.read_at(p);
577        let ec2 = ec + 1;
578        self.count.write_at(c2, p);
579        self.count_stats.write_at(stats2, p);
580        self.dequeue_extract_count.write_at(ec2, p);
581        self.dequeue_stat(t_r, &i, p);
582        self.dequeue_extracted_source
583            .trigger_at(&i.value, p)?;
584        Result::Ok(i.value)
585    }
586
587    /// Update the statistics for the output wait time of the dequeue operation
588    /// and the wait time in the queue.
589    fn dequeue_stat(&self, t_r: f64, i: &QueueItem<T>, p: &Point) {
590        let t1 = i.storing_time;
591        let t  = p.time;
592        let stats  = self.dequeue_wait_time.read_at(p);
593        let stats2 = stats.add(t - t_r);
594        self.dequeue_wait_time.write_at(stats2, p);
595        let stats  = self.wait_time.read_at(p);
596        let stats2 = stats.add(t - t1);
597        self.wait_time.write_at(stats2, p);
598    }
599
600    /// Store the item.
601    fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
602        let t  = p.time;
603        let i2 = QueueItem {
604            value: item,
605            storing_time: t
606        };
607        self.queue_store.push(i2.clone(), p);
608        let c  = self.count.read_at(p);
609        let c2 = c + 1;
610        self.count.write_at(c2, p);
611        let stats  = self.count_stats.read_at(p);
612        let stats2 = stats.add(t, c2);
613        self.count_stats.write_at(stats2, p);
614        let sc  = self.enqueue_store_count.read_at(p);
615        let sc2 = sc + 1;
616        self.enqueue_store_count.write_at(sc2, p);
617        release_resource_within_event(self.dequeue_resource.clone())
618            .call_event(p)?;
619        self.enqueue_stored_source
620            .trigger_at(&i2.value, p)
621    }
622
623    /// Store the item with priority.
624    fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
625        let t  = p.time;
626        let i2 = QueueItem {
627            value: item,
628            storing_time: t
629        };
630        self.queue_store.push_with_priority(pm, i2.clone(), p);
631        let c  = self.count.read_at(p);
632        let c2 = c + 1;
633        self.count.write_at(c2, p);
634        let stats  = self.count_stats.read_at(p);
635        let stats2 = stats.add(t, c2);
636        self.count_stats.write_at(stats2, p);
637        let sc  = self.enqueue_store_count.read_at(p);
638        let sc2 = sc + 1;
639        self.enqueue_store_count.write_at(sc2, p);
640        release_resource_within_event(self.dequeue_resource.clone())
641            .call_event(p)?;
642        self.enqueue_stored_source
643            .trigger_at(&i2.value, p)
644    }
645
646    /// Reset the statistics.
647    pub fn reset(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
648        cons_event(move |p| {
649            let t = p.time;
650            let count = queue.count.read_at(p);
651            queue.count_stats.write_at(TimingStats::from_sample(t, count), p);
652            queue.enqueue_store_count.write_at(0, p);
653            queue.dequeue_count.write_at(0, p);
654            queue.dequeue_extract_count.write_at(0, p);
655            queue.wait_time.write_at(SamplingStats::empty(), p);
656            queue.dequeue_wait_time.write_at(SamplingStats::empty(), p);
657            Result::Ok(())
658        })
659    }
660}
661
662/// Computation that creates a new `Queue`.
663#[derive(Clone)]
664pub struct NewQueue<SM, SO, T> {
665
666    /// The storing strategy.
667    storing_strategy: SM,
668
669    /// The output strategy.
670    dequeue_strategy: SO,
671
672    /// To keep the type parameter.
673    _phantom: PhantomData<T>
674}
675
676impl<SM, SO, T> Event for NewQueue<SM, SO, T>
677    where SM: QueueStrategy,
678          SO: QueueStrategy + 'static,
679          T: 'static
680{
681    type Item = Queue<SM, SO, T>;
682
683    #[doc(hidden)]
684    #[inline]
685    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
686        let NewQueue { storing_strategy, dequeue_strategy, _phantom } = self;
687        let t = p.time;
688        let queue_store = storing_strategy.new_storage();
689        let dequeue_resource = {
690            Resource::<SO>::new_with_max_count(dequeue_strategy, 0, None)
691                .call_simulation(p.run)?
692        };
693        Result::Ok(Queue {
694            queue_store: queue_store,
695            dequeue_resource: Grc::new(dequeue_resource),
696            count: RefComp::new(0),
697            count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
698            enqueue_store_count: RefComp::new(0),
699            dequeue_count: RefComp::new(0),
700            dequeue_extract_count: RefComp::new(0),
701            wait_time: RefComp::new(SamplingStats::empty()),
702            dequeue_wait_time: RefComp::new(SamplingStats::empty()),
703            enqueue_stored_source: ObservableSource::new(),
704            dequeue_requested_source: ObservableSource::new(),
705            dequeue_extracted_source: ObservableSource::new()
706        })
707    }
708}