dvcompute_branch/simulation/queue/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::rc::Rc;
8use std::marker::PhantomData;
9
10use crate::simulation;
11use crate::simulation::error::*;
12use crate::simulation::Point;
13use crate::simulation::ref_comp::RefComp;
14use crate::simulation::simulation::*;
15use crate::simulation::event::*;
16use crate::simulation::process::*;
17use crate::simulation::strategy::*;
18use crate::simulation::resource::*;
19
20use dvcompute_utils::grc::Grc;
21
22/// The queues that gather their statistics when simulating.
23pub mod stats;
24
25/// Represents unbounded queues.
26pub mod unbounded;
27
28/// A type synonym for the ordinary FIFO queue, also known as the FCFS
29/// (First Come - First Serviced) queue.
30pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>;
31
32/// A type synonym for the ordinary LIFO queue, also known as the LCFS
33/// (Last Come - First Serviced) queue.
34pub type LCFSQueue<T> = Queue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>;
35
36/// Represents an optimized bounded queue by using the specified strategies for enqueueing (input), `SI`,
37/// internal storing (in memory), `SM`, and dequeueing (output), `SO`, where `T` denotes
38/// the type of items stored in the queue.
39pub struct Queue<SI, SM, SO, T>
40    where SI: QueueStrategy + 'static,
41          SM: QueueStrategy,
42          SO: QueueStrategy + 'static
43{
44    /// The queue capacity.
45    max_count: isize,
46
47    /// The enqueue resource.
48    enqueue_resource: Grc<Resource<SI>>,
49
50    /// The queue store.
51    queue_store: QueueStorageBox<T, SM::Priority>,
52
53    /// The dequeue resource.
54    dequeue_resource: Grc<Resource<SO>>,
55
56    /// The queue size.
57    count: RefComp<isize>
58}
59
60/// Create a new bounded FCFS (a.k.a FIFO) queue by the specified capacity.
61#[inline]
62pub fn new_fcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>
63    where T: 'static
64{
65    NewQueue {
66        enqueue_strategy: FCFSStrategy::Instance,
67        storing_strategy: FCFSStrategy::Instance,
68        dequeue_strategy: FCFSStrategy::Instance,
69        max_count: max_count,
70        _phantom: PhantomData
71    }
72}
73
74/// Create a new bounded LCFS (a.k.a LIFO) queue by the specified capacity.
75#[inline]
76pub fn new_lcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>
77    where T: 'static
78{
79    NewQueue {
80        enqueue_strategy: FCFSStrategy::Instance,
81        storing_strategy: LCFSStrategy::Instance,
82        dequeue_strategy: FCFSStrategy::Instance,
83        max_count: max_count,
84        _phantom: PhantomData
85    }
86}
87
88impl<SI, SM, SO, T> Queue<SI, SM, SO, T>
89    where SI: QueueStrategy + Clone + 'static,
90          SM: QueueStrategy + Clone + 'static,
91          SO: QueueStrategy + Clone + 'static,
92          T: Clone + 'static
93{
94    /// Create a new bounded queue by the specified strategies and capacity.
95    #[inline]
96    pub fn new(enqueue_strategy: SI,
97        storing_strategy: SM,
98        dequeue_strategy: SO,
99        max_count: isize) -> NewQueue<SI, SM, SO, T>
100    {
101        NewQueue {
102            enqueue_strategy: enqueue_strategy,
103            storing_strategy: storing_strategy,
104            dequeue_strategy: dequeue_strategy,
105            max_count: max_count,
106            _phantom: PhantomData
107        }
108    }
109
110    /// Return the queue capacity, i.e. its maximum size.
111    #[inline]
112    pub fn max_count(&self) -> isize {
113        self.max_count
114    }
115
116    /// Test whether the queue is empty.
117    #[inline]
118    pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
119        cons_event(move |p| {
120            Result::Ok(queue.count.read_at(p) == 0)
121        })
122    }
123
124    /// Test whether the queue is full.
125    #[inline]
126    pub fn is_full(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
127        cons_event(move |p| {
128            Result::Ok(queue.count.read_at(p) == queue.max_count)
129        })
130    }
131
132    /// Return the current queue size.
133    #[inline]
134    pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
135        cons_event(move |p| {
136            Result::Ok(queue.count.read_at(p))
137        })
138    }
139
140    /// Return the load factor: the queue size divided by its capacity, i.e. maximum size.
141    #[inline]
142    pub fn load_factor(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
143        cons_event(move |p| {
144            Result::Ok({
145                let x = queue.count.read_at(p);
146                let y = queue.max_count;
147                (x as f64) / (y as f64)
148            })
149        })
150    }
151
152    /// Dequeue by suspending the process if the queue is empty.
153    pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> + Clone {
154        request_resource(queue.dequeue_resource.clone())
155            .and_then(move |()| {
156                cons_event(move |p| {
157                    queue.dequeue_extract(p)
158                })
159                .into_process()
160            })
161    }
162
163    /// Dequeue with output prioerity by suspending the process if the queue is empty.
164    pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T> + Clone
165        where SO::Priority: Clone
166    {
167        request_resource_with_priority(queue.dequeue_resource.clone(), po)
168            .and_then(move |()| {
169                cons_event(move |p| {
170                    queue.dequeue_extract(p)
171                })
172                .into_process()
173            })
174    }
175
176    /// Try to dequeue immediately.
177    pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> + Clone {
178        try_request_resource_within_event(queue.dequeue_resource.clone())
179            .and_then(move |f| {
180                if f {
181                    cons_event(move |p| {
182                        let x = queue.dequeue_extract(p)?;
183                        Result::Ok(Some(x))
184                    }).into_boxed()
185                } else {
186                    return_event(None)
187                        .into_boxed()
188                }
189            })
190    }
191
192    /// Remove the item from the queue and return a flag indicating
193    /// whether the item was found and actually removed.
194    pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool> + Clone
195        where T: PartialEq
196    {
197        let pred = move |x: &T| { *x == item };
198        Queue::delete_by(queue, pred)
199            .map(|x| { x.is_some() })
200    }
201
202    /// Remove the specified item from the queue.
203    pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()> + Clone
204        where T: PartialEq
205    {
206        let pred = move |x: &T| { *x == item };
207        Queue::delete_by(queue, pred)
208            .map(|_| ())
209    }
210
211    /// Remove an item satisfying the specified predicate and return the item if found.
212    pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>> + Clone
213        where F: Fn(&T) -> bool + Clone + 'static
214    {
215        try_request_resource_within_event(queue.dequeue_resource.clone())
216            .and_then(move |f| {
217                if f {
218                    cons_event(move |p| {
219                        let pred = move |x: &T| { pred(x) };
220                        let pred = Rc::new(pred);
221                        match queue.queue_store.remove_boxed_by(pred, p) {
222                            None => {
223                                release_resource_within_event(queue.dequeue_resource.clone())
224                                    .call_event(p)?;
225                                Result::Ok(None)
226                            },
227                            Some(i) => {
228                                let x = queue.dequeue_post_extract(i, p)?;
229                                Result::Ok(Some(x))
230                            }
231                        }
232                    }).into_boxed()
233                } else {
234                    return_event(None)
235                        .into_boxed()
236                }
237            })
238    }
239
240    /// Test whether there is an item satisfying the specified predicate.
241    pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool> + Clone
242        where F: Fn(&T) -> bool + Clone + 'static
243    {
244        cons_event(move |p| {
245            let pred = move |x: &T| { pred(x) };
246            let pred = Rc::new(pred);
247            Result::Ok(queue.queue_store.exists_boxed(pred, p))
248        })
249    }
250
251    /// Find an item satisfying the specified predicate.
252    pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>> + Clone
253        where F: Fn(&T) -> bool + Clone + 'static,
254              T: Clone
255    {
256        cons_event(move |p| {
257            let pred = move |x: &T| { pred(x) };
258            let pred = Rc::new(pred);
259            Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.clone() }))
260        })
261    }
262
263    /// Clear the queue.
264    pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
265        cons_event(move |p| {
266            loop {
267                let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
268                match x {
269                    None => return Result::Ok(()),
270                    Some(_) => {}
271                }
272            }
273        })
274    }
275
276    /// Enqueue the item by suspending the process if the queue is full.
277    pub fn enqueue(queue: Grc<Self>, item: T) -> impl Process<Item = ()> + Clone {
278        request_resource(queue.enqueue_resource.clone())
279            .and_then(move |()| {
280                cons_event(move |p| {
281                    queue.enqueue_store(item, p)
282                })
283                .into_process()
284            })
285    }
286
287    /// Enqueue the item with input priority by suspending the process
288    /// if the queue is full.
289    pub fn enqueue_with_input_priority(queue: Grc<Self>, pi: SI::Priority, item: T) -> impl Process<Item = ()> + Clone
290        where SI::Priority: Clone
291    {
292        request_resource_with_priority(queue.enqueue_resource.clone(), pi)
293            .and_then(move |()| {
294                cons_event(move |p| {
295                    queue.enqueue_store(item, p)
296                })
297                .into_process()
298            })
299    }
300
301    /// Enqueue the item with storing priority by suspending the process
302    /// if the queue is full.
303    pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Process<Item = ()> + Clone
304        where SM::Priority: Clone
305    {
306        request_resource(queue.enqueue_resource.clone())
307            .and_then(move |()| {
308                cons_event(move |p| {
309                    queue.enqueue_store_with_priority(pm, item, p)
310                })
311                .into_process()
312            })
313    }
314
315    /// Enqueue the item with input and storing priorities by suspending the process
316    /// if the queue is full.
317    pub fn enqueue_with_input_and_storing_priorities(queue: Grc<Self>, pi: SI::Priority, pm: SM::Priority, item: T) -> impl Process<Item = ()> + Clone
318        where SI::Priority: Clone,
319              SM::Priority: Clone
320    {
321        request_resource_with_priority(queue.enqueue_resource.clone(), pi)
322            .and_then(move |()| {
323                cons_event(move |p| {
324                    queue.enqueue_store_with_priority(pm, item, p)
325                })
326                .into_process()
327            })
328    }
329
330    /// Try to enqueue the item. Return `false` within `Event` computation if the queue is full.
331    pub fn try_enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = bool> + Clone {
332        cons_event(move |p| {
333            let x = {
334                try_request_resource_within_event(queue.enqueue_resource.clone())
335                    .call_event(p)
336            }?;
337            if x {
338                queue.enqueue_store(item, p)?;
339                Result::Ok(true)
340            } else {
341                Result::Ok(false)
342            }
343        })
344    }
345
346    /// Try to enqueue the item with storing priority. Return `false`
347    /// within `Event` computation if the queue is full.
348    pub fn try_enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> + Clone
349        where SM::Priority: Clone
350    {
351        cons_event(move |p| {
352            let x = {
353                try_request_resource_within_event(queue.enqueue_resource.clone())
354                    .call_event(p)
355            }?;
356            if x {
357                queue.enqueue_store_with_priority(pm, item, p)?;
358                Result::Ok(true)
359            } else {
360                Result::Ok(false)
361            }
362        })
363    }
364
365    /// Extract an item by the dequeue request.
366    fn dequeue_extract(&self, p: &Point) -> simulation::Result<T> {
367        let i = self.queue_store.pop(p).unwrap();
368        self.dequeue_post_extract(i, p)
369    }
370
371    /// A post action after extracting the item by the dequeue request.
372    fn dequeue_post_extract(&self, i: T, p: &Point) -> simulation::Result<T> {
373        let c  = self.count.read_at(p);
374        let c2 = c - 1;
375        self.count.write_at(c2, p);
376        release_resource_within_event(self.enqueue_resource.clone())
377            .call_event(p)?;
378        Result::Ok(i)
379    }
380
381    /// Store the item.
382    fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
383        self.queue_store.push(item, p);
384        let c  = self.count.read_at(p);
385        let c2 = c + 1;
386        self.count.write_at(c2, p);
387        release_resource_within_event(self.dequeue_resource.clone())
388            .call_event(p)
389    }
390
391    /// Store the item with priority.
392    fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
393        self.queue_store.push_with_priority(pm, item, p);
394        let c  = self.count.read_at(p);
395        let c2 = c + 1;
396        self.count.write_at(c2, p);
397        release_resource_within_event(self.dequeue_resource.clone())
398            .call_event(p)
399    }
400}
401
402/// Computation that creates a new `Queue`.
403#[derive(Clone)]
404pub struct NewQueue<SI, SM, SO, T> {
405
406    /// The enqueue strategy.
407    enqueue_strategy: SI,
408
409    /// The storing strategy.
410    storing_strategy: SM,
411
412    /// The output strategy.
413    dequeue_strategy: SO,
414
415    /// The capacity.
416    max_count: isize,
417
418    /// To keep the type parameter.
419    _phantom: PhantomData<T>
420}
421
422impl<SI, SM, SO, T> Event for NewQueue<SI, SM, SO, T>
423    where SI: QueueStrategy + 'static,
424          SM: QueueStrategy,
425          SO: QueueStrategy + 'static,
426          T: Clone + 'static
427{
428    type Item = Queue<SI, SM, SO, T>;
429
430    #[doc(hidden)]
431    #[inline]
432    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
433        let NewQueue { enqueue_strategy, storing_strategy, dequeue_strategy, max_count, _phantom } = self;
434        if max_count < 0 {
435            let msg = String::from("The queue capacity cannot be actually negative");
436            let err = Error::retry(msg);
437            Result::Err(err)
438        } else {
439            let enqueue_resource = {
440                Resource::<SI>::new_with_max_count(enqueue_strategy, max_count, Some(max_count))
441                    .call_simulation(p.run)?
442            };
443            let queue_store = storing_strategy.new_storage();
444            let dequeue_resource = {
445                Resource::<SO>::new_with_max_count(dequeue_strategy, 0, Some(max_count))
446                    .call_simulation(p.run)?
447            };
448            Result::Ok(Queue {
449                max_count: max_count,
450                enqueue_resource: Grc::new(enqueue_resource),
451                queue_store: queue_store,
452                dequeue_resource: Grc::new(dequeue_resource),
453                count: RefComp::new(0)
454            })
455        }
456    }
457}