dvcompute_gpss_branch/simulation/
queue.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::hash::{Hash, Hasher};
8
9use dvcompute_dist::simulation;
10use dvcompute_dist::simulation::ref_comp::RefComp;
11use dvcompute_dist::simulation::Point;
12use dvcompute_dist::simulation::event::*;
13use dvcompute_dist::simulation::observable::*;
14use dvcompute_dist::simulation::observable::source::*;
15
16use dvcompute_utils::simulation::stats::*;
17use dvcompute_utils::grc::Grc;
18
19use crate::simulation::transact::*;
20
21/// Defines a queue entity.
22pub struct Queue {
23
24    /// The sequence number.
25    pub sequence_no: u64,
26
27    /// The content.
28    content: RefComp<isize>,
29
30    /// The content statistics.
31    content_stats: RefComp<TimingStats<isize>>,
32
33    /// The enqueue count.
34    enqueue_count: RefComp<isize>,
35
36    /// The enqueue zero entry count.
37    enqueue_zero_entry_count: RefComp<isize>,
38
39    /// The wait time.
40    wait_time: RefComp<SamplingStats<f64>>,
41
42    /// The non-zero entry wait time.
43    non_zero_entry_wait_time: RefComp<SamplingStats<f64>>,
44
45    /// Triggered when enqueued.
46    enqueued: ObservableSource<()>,
47
48    /// Triggered when dequeued.
49    dequeued: ObservableSource<()>
50}
51
52/// The information about the queue entry.
53#[derive(Clone)]
54pub struct QueueEntry {
55
56    /// The entry queue.
57    pub queue: Grc<Queue>,
58
59    /// The time of registering the queue entry.
60    pub enqueue_time: f64
61}
62
63impl PartialEq for Queue {
64
65    fn eq(&self, other: &Self) -> bool {
66        self.content == other.content
67    }
68}
69
70impl Eq for Queue {}
71
72impl Hash for Queue {
73
74    fn hash<H: Hasher>(&self, state: &mut H) {
75        self.sequence_no.hash(state)
76    }
77}
78
79impl Queue {
80
81    /// Create a new queue within `Event` computation.
82    #[inline]
83    pub fn new() -> NewQueue {
84        NewQueue {}
85    }
86
87    /// Test whether the queue is empty.
88    #[inline]
89    pub fn is_empty(queue: Grc<Queue>) -> impl Event<Item = bool> + Clone {
90        cons_event(move |p| {
91            let n = queue.content.read_at(p);
92            Result::Ok(n == 0)
93        })
94    }
95
96    /// Return the current queue content.
97    #[inline]
98    pub fn content(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
99        cons_event(move |p| {
100            let n = queue.content.read_at(p);
101            Result::Ok(n)
102        })
103    }
104
105    /// Return the queue content statistics.
106    #[inline]
107    pub fn content_stats(queue: Grc<Queue>) -> impl Event<Item = TimingStats<isize>> + Clone {
108        cons_event(move |p| {
109            let stats = queue.content_stats.read_at(p);
110            Result::Ok(stats)
111        })
112    }
113
114    /// Triggered when the `content` property changes.
115    #[inline]
116    pub fn content_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
117        Queue::content_changed_(&queue)
118            .mapc(move |()| { Queue::content(queue.clone()) })
119    }
120
121    /// Triggered when the `content` property changes.
122    #[inline]
123    pub fn content_changed_(&self) -> impl Observable<Message = ()> + Clone {
124        self.enqueued().merge(self.dequeued())
125    }
126
127    /// Return the total number of input items that were enqueued.
128    #[inline]
129    pub fn enqueue_count(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
130        cons_event(move |p| {
131            let n = queue.enqueue_count.read_at(p);
132            Result::Ok(n)
133        })
134    }
135
136    /// Triggered when the `enqueue_count` property changes.
137    #[inline]
138    pub fn enqueue_count_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
139        Queue::enqueue_count_changed_(&queue)
140            .mapc(move |()| { Queue::enqueue_count(queue.clone()) })
141    }
142
143    /// Triggered when the `enqueue_count` property changes.
144    #[inline]
145    pub fn enqueue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
146        self.enqueued()
147    }
148
149    /// Return the total number of zero entry items.
150    #[inline]
151    pub fn enqueue_zero_entry_count(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
152        cons_event(move |p| {
153            let n = queue.enqueue_zero_entry_count.read_at(p);
154            Result::Ok(n)
155        })
156    }
157
158    /// Triggered when the `enqueue_zero_entry_count` property changes.
159    #[inline]
160    pub fn enqueue_zero_entry_count_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
161        Queue::enqueue_zero_entry_count_changed_(&queue)
162            .mapc(move |()| { Queue::enqueue_zero_entry_count(queue.clone()) })
163    }
164
165    /// Triggered when the `enqueue_zero_entry_count` property changes.
166    #[inline]
167    pub fn enqueue_zero_entry_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
168        self.dequeued()
169    }
170
171    /// Return the wait (or residence) time.
172    #[inline]
173    pub fn wait_time(queue: Grc<Queue>) -> impl Event<Item = SamplingStats<f64>> + Clone {
174        cons_event(move |p| {
175            let stats = queue.wait_time.read_at(p);
176            Result::Ok(stats)
177        })
178    }
179
180    /// Triggered when the `wait_time` property changes.
181    #[inline]
182    pub fn wait_time_changed(queue: Grc<Queue>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
183        Queue::wait_time_changed_(&queue)
184            .mapc(move |()| { Queue::wait_time(queue.clone()) })
185    }
186
187    /// Triggered when the `wait_time` property changes.
188    #[inline]
189    pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
190        self.dequeued()
191    }
192
193    /// Return the wait (or residence) time by excluding zero entries.
194    #[inline]
195    pub fn non_zero_entry_wait_time(queue: Grc<Queue>) -> impl Event<Item = SamplingStats<f64>> + Clone {
196        cons_event(move |p| {
197            let stats = queue.non_zero_entry_wait_time.read_at(p);
198            Result::Ok(stats)
199        })
200    }
201
202    /// Triggered when the `wait_time` property changes.
203    #[inline]
204    pub fn non_zero_entry_wait_time_changed(queue: Grc<Queue>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
205        Queue::non_zero_entry_wait_time_changed_(&queue)
206            .mapc(move |()| { Queue::non_zero_entry_wait_time(queue.clone()) })
207    }
208
209    /// Triggered when the `non_zero_entry_wait_time` property changes.
210    #[inline]
211    pub fn non_zero_entry_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
212        self.dequeued()
213    }
214
215    /// Return a long-term average queue rate calculated as
216    /// the average queue content divided by the average wait time.
217    #[inline]
218    pub fn rate(queue: Grc<Queue>) -> impl Event<Item = f64> + Clone {
219        cons_event(move |p| {
220            let x = queue.content_stats.read_at(p);
221            let y = queue.wait_time.read_at(p);
222            Result::Ok(x.mean() / y.mean)
223        })
224    }
225
226    /// Triggered when the `rate` property changes.
227    #[inline]
228    pub fn rate_changed(queue: Grc<Queue>) -> impl Observable<Message = f64> + Clone {
229        Queue::rate_changed_(&queue)
230            .mapc(move |()| { Queue::rate(queue.clone()) })
231    }
232
233    /// Triggered when the `rate` property changes.
234    #[inline]
235    pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
236        self.enqueued().merge(self.dequeued())
237    }
238
239    /// Notifies when enqueuing an item.
240    #[inline]
241    pub fn enqueued(&self) -> impl Observable<Message = ()> + Clone {
242        self.enqueued.publish()
243    }
244
245    /// Notifies when dequeuing the item.
246    #[inline]
247    pub fn dequeued(&self) -> impl Observable<Message = ()> + Clone {
248        self.dequeued.publish()
249    }
250
251    /// Enqueue the item.
252    #[inline]
253    pub fn enqueue(queue: Grc<Queue>, transact_id: Grc<TransactId>, increment: isize) -> Enqueue {
254        Enqueue { queue: queue, transact_id: transact_id, increment: increment }
255    }
256
257    /// Dequeue the item.
258    #[inline]
259    pub fn dequeue(queue: Grc<Queue>, transact_id: Grc<TransactId>, decrement: isize) -> Dequeue {
260        Dequeue { queue: queue, transact_id: transact_id, decrement: decrement }
261    }
262
263    /// Signal whenever any property of the queue changes.
264    #[inline]
265    pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
266        self.enqueued().merge(self.dequeued())
267    }
268
269    /// Reset the statistics.
270    pub fn reset(queue: Grc<Queue>) -> impl Event<Item = ()> + Clone {
271        cons_event(move |p| {
272            let content = queue.content.read_at(p);
273            queue.content_stats.write_at(TimingStats::from_sample(p.time, content), p);
274            queue.enqueue_count.write_at(0, p);
275            queue.enqueue_zero_entry_count.write_at(0, p);
276            queue.wait_time.write_at(SamplingStats::empty(), p);
277            queue.non_zero_entry_wait_time.write_at(SamplingStats::empty(), p);
278            Result::Ok(())
279        })
280    }
281}
282
283/// Computation that creates a new queue.
284#[derive(Clone)]
285pub struct NewQueue {}
286
287impl Event for NewQueue {
288
289    type Item = Queue;
290
291    #[doc(hidden)]
292    #[inline]
293    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
294        let t = p.time;
295        let gen = &p.run.generator;
296        let sequence_no = gen.random_sequence_no();
297        Result::Ok(Queue {
298            sequence_no: sequence_no,
299            content: RefComp::new(0),
300            content_stats: RefComp::new(TimingStats::from_sample(t, 0)),
301            enqueue_count: RefComp::new(0),
302            enqueue_zero_entry_count: RefComp::new(0),
303            wait_time: RefComp::new(SamplingStats::empty()),
304            non_zero_entry_wait_time: RefComp::new(SamplingStats::empty()),
305            enqueued: ObservableSource::new(),
306            dequeued: ObservableSource::new()
307        })
308    }
309}
310
311/// Computation that enqueues the item.
312#[derive(Clone)]
313pub struct Enqueue {
314
315    /// The queue.
316    queue: Grc<Queue>,
317
318    /// The identifier of the transact to be enqueued.
319    transact_id: Grc<TransactId>,
320
321    /// The content increment.
322    increment: isize
323}
324
325impl Event for Enqueue {
326
327    type Item = ();
328
329    #[doc(hidden)]
330    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
331        let Enqueue { queue, transact_id, increment } = self;
332        let t = p.time;
333        let e = QueueEntry { queue: queue.clone(), enqueue_time: t };
334        let n = queue.enqueue_count.read_at(p);
335        let c = queue.content.read_at(p);
336        let stats = queue.content_stats.read_at(p);
337        queue.enqueue_count.write_at(n + 1, p);
338        queue.content.write_at(c + increment, p);
339        queue.content_stats.write_at(stats.add(t, c + increment), p);
340        match transact_id.register_queue_entry(e, p) {
341            Result::Err(e) => Result::Err(e),
342            Result::Ok(()) => queue.enqueued.trigger_at(&(), p)
343        }
344    }
345}
346
347/// Computation that dequeues the item.
348#[derive(Clone)]
349pub struct Dequeue {
350
351    /// The queue.
352    queue: Grc<Queue>,
353
354    /// The identifier of the transact to be dequeued.
355    transact_id: Grc<TransactId>,
356
357    /// The content increment.
358    decrement: isize
359}
360
361impl Event for Dequeue {
362
363    type Item = ();
364
365    #[doc(hidden)]
366    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
367        let Dequeue { queue, transact_id, decrement } = self;
368        match transact_id.unregister_queue_entry(&queue, p) {
369            Result::Err(e) => Result::Err(e),
370            Result::Ok(e) => {
371                let t  = p.time;
372                let t0 = e.enqueue_time;
373                let dt = t - t0;
374                let c  = queue.content.read_at(p);
375                let stats = queue.content_stats.read_at(p);
376                let wait_time = queue.wait_time.read_at(p);
377                queue.content.write_at(c - decrement, p);
378                queue.content_stats.write_at(stats.add(t, c - decrement), p);
379                queue.wait_time.write_at(wait_time.add(dt), p);
380                if t == t0 {
381                    let c2 = queue.enqueue_zero_entry_count.read_at(p);
382                    queue.enqueue_zero_entry_count.write_at(c2 + 1, p);
383                } else {
384                    let wait_time2 = queue.non_zero_entry_wait_time.read_at(p);
385                    queue.non_zero_entry_wait_time.write_at(wait_time2.add(dt), p);
386                }
387                queue.dequeued.trigger_at(&(), p)
388            }
389        }
390    }
391}