dvcompute_cons/simulation/queue/unbounded/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::Point;
11use crate::simulation::ref_comp::RefComp;
12use crate::simulation::simulation::*;
13use crate::simulation::event::*;
14use crate::simulation::process::*;
15use crate::simulation::strategy::*;
16use crate::simulation::resource::*;
17
18use dvcompute_utils::grc::Grc;
19
20/// The unbounded queues that gather their statistics when simulating.
21pub mod stats;
22
23/// A type synonym for the ordinary FIFO queue, also known as the FCFS
24/// (First Come - First Serviced) queue.
25pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, T>;
26
27/// A type synonym for the ordinary LIFO queue, also known as the LCFS
28/// (Last Come - First Serviced) queue.
29pub type LCFSQueue<T> = Queue<LCFSStrategy, FCFSStrategy, T>;
30
31/// Represents an optimized unbounded queue by using the specified strategies for internal storing (in memory), `SM`,
32/// and dequeueing (output), `SO`, where `T` denotes the type of items stored in the queue.
33pub struct Queue<SM, SO, T>
34    where SM: QueueStrategy,
35          SO: QueueStrategy + 'static
36{
37    /// The queue store.
38    queue_store: QueueStorageBox<T, SM::Priority>,
39
40    /// The dequeue resource.
41    dequeue_resource: Grc<Resource<SO>>,
42
43    /// The queue size.
44    count: RefComp<isize>
45}
46
47/// Create a new unbounded FCFS (a.k.a FIFO) queue by the specified capacity.
48#[inline]
49pub fn new_fcfs_queue<T>() -> NewQueue<FCFSStrategy, FCFSStrategy, T>
50    where T: 'static
51{
52    NewQueue {
53        storing_strategy: FCFSStrategy::Instance,
54        dequeue_strategy: FCFSStrategy::Instance,
55        _phantom: PhantomData
56    }
57}
58
59/// Create a new unbounded LCFS (a.k.a LIFO) queue by the specified capacity.
60#[inline]
61pub fn new_lcfs_queue<T>() -> NewQueue<LCFSStrategy, FCFSStrategy, T>
62    where T: 'static
63{
64    NewQueue {
65        storing_strategy: LCFSStrategy::Instance,
66        dequeue_strategy: FCFSStrategy::Instance,
67        _phantom: PhantomData
68    }
69}
70
71impl<SM, SO, T> Queue<SM, SO, T>
72    where SM: QueueStrategy + 'static,
73          SO: QueueStrategy + 'static,
74          T: Clone + 'static
75{
76    /// Create a new unbounded queue by the specified strategies.
77    #[inline]
78    pub fn new(storing_strategy: SM, dequeue_strategy: SO) -> NewQueue<SM, SO, T> {
79        NewQueue {
80            storing_strategy: storing_strategy,
81            dequeue_strategy: dequeue_strategy,
82            _phantom: PhantomData
83        }
84    }
85
86    /// Test whether the queue is empty.
87    #[inline]
88    pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
89        cons_event(move |p| {
90            Result::Ok(queue.count.read_at(p) == 0)
91        })
92    }
93
94    /// Return the current queue size.
95    #[inline]
96    pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
97        cons_event(move |p| {
98            Result::Ok(queue.count.read_at(p))
99        })
100    }
101
102    /// Dequeue by suspending the process if the queue is empty.
103    pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
104        request_resource(queue.dequeue_resource.clone())
105            .and_then(move |()| {
106                cons_event(move |p| {
107                    queue.dequeue_extract(p)
108                })
109                .into_process()
110            })
111    }
112
113    /// Dequeue with output prioerity by suspending the process if the queue is empty.
114    pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
115        where SO::Priority: Clone
116    {
117        request_resource_with_priority(queue.dequeue_resource.clone(), po)
118            .and_then(move |()| {
119                cons_event(move |p| {
120                    queue.dequeue_extract(p)
121                })
122                .into_process()
123            })
124    }
125
126    /// Try to dequeue immediately.
127    pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
128        try_request_resource_within_event(queue.dequeue_resource.clone())
129            .and_then(move |f| {
130                if f {
131                    cons_event(move |p| {
132                        let x = queue.dequeue_extract(p)?;
133                        Result::Ok(Some(x))
134                    }).into_boxed()
135                } else {
136                    return_event(None)
137                        .into_boxed()
138                }
139            })
140    }
141
142    /// Remove the item from the queue and return a flag indicating
143    /// whether the item was found and actually removed.
144    pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
145        where T: PartialEq
146    {
147        let pred = move |x: &T| { *x == item };
148        Queue::delete_by(queue, pred)
149            .map(|x| { x.is_some() })
150    }
151
152    /// Remove the specified item from the queue.
153    pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
154        where T: PartialEq
155    {
156        let pred = move |x: &T| { *x == item };
157        Queue::delete_by(queue, pred)
158            .map(|_| ())
159    }
160
161    /// Remove an item satisfying the specified predicate and return the item if found.
162    pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
163        where F: Fn(&T) -> bool + 'static
164    {
165        try_request_resource_within_event(queue.dequeue_resource.clone())
166            .and_then(move |f| {
167                if f {
168                    cons_event(move |p| {
169                        let pred = move |x: &T| { pred(x) };
170                        let pred = Box::new(pred);
171                        match queue.queue_store.remove_boxed_by(pred, p) {
172                            None => {
173                                release_resource_within_event(queue.dequeue_resource.clone())
174                                    .call_event(p)?;
175                                Result::Ok(None)
176                            },
177                            Some(i) => {
178                                let x = queue.dequeue_post_extract(i, p)?;
179                                Result::Ok(Some(x))
180                            }
181                        }
182                    }).into_boxed()
183                } else {
184                    return_event(None)
185                        .into_boxed()
186                }
187            })
188    }
189
190    /// Test whether there is an item satisfying the specified predicate.
191    pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
192        where F: Fn(&T) -> bool + 'static
193    {
194        cons_event(move |p| {
195            let pred = move |x: &T| { pred(x) };
196            let pred = Box::new(pred);
197            Result::Ok(queue.queue_store.exists_boxed(pred, p))
198        })
199    }
200
201    /// Find an item satisfying the specified predicate.
202    pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
203        where F: Fn(&T) -> bool + 'static,
204              T: Clone
205    {
206        cons_event(move |p| {
207            let pred = move |x: &T| { pred(x) };
208            let pred = Box::new(pred);
209            Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.clone() }))
210        })
211    }
212
213    /// Clear the queue.
214    pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
215        cons_event(move |p| {
216            loop {
217                let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
218                match x {
219                    None => return Result::Ok(()),
220                    Some(_) => {}
221                }
222            }
223        })
224    }
225
226    /// Enqueue the item.
227    #[inline]
228    pub fn enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
229        cons_event(move |p| {
230            queue.enqueue_store(item, p)
231        })
232    }
233
234    /// Enqueue the item with storing priority.
235    #[inline]
236    pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()>
237        where SM::Priority: Clone
238    {
239        cons_event(move |p| {
240            queue.enqueue_store_with_priority(pm, item, p)
241        })
242    }
243
244    /// Extract an item by the dequeue request.
245    fn dequeue_extract(&self, p: &Point) -> simulation::Result<T> {
246        let i = self.queue_store.pop(p).unwrap();
247        self.dequeue_post_extract(i, p)
248    }
249
250    /// A post action after extracting the item by the dequeue request.
251    fn dequeue_post_extract(&self, i: T, p: &Point) -> simulation::Result<T> {
252        let c  = self.count.read_at(p);
253        let c2 = c - 1;
254        self.count.write_at(c2, p);
255        Result::Ok(i)
256    }
257
258    /// Store the item.
259    fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
260        self.queue_store.push(item, p);
261        let c  = self.count.read_at(p);
262        let c2 = c + 1;
263        self.count.write_at(c2, p);
264        release_resource_within_event(self.dequeue_resource.clone())
265            .call_event(p)
266    }
267
268    /// Store the item with priority.
269    fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
270        self.queue_store.push_with_priority(pm, item, p);
271        let c  = self.count.read_at(p);
272        let c2 = c + 1;
273        self.count.write_at(c2, p);
274        release_resource_within_event(self.dequeue_resource.clone())
275            .call_event(p)
276    }
277}
278
279/// Computation that creates a new `Queue`.
280#[derive(Clone)]
281pub struct NewQueue<SM, SO, T> {
282
283    /// The storing strategy.
284    storing_strategy: SM,
285
286    /// The output strategy.
287    dequeue_strategy: SO,
288
289    /// To keep the type parameter.
290    _phantom: PhantomData<T>
291}
292
293impl<SM, SO, T> Event for NewQueue<SM, SO, T>
294    where SM: QueueStrategy,
295          SO: QueueStrategy + 'static,
296          T: 'static
297{
298    type Item = Queue<SM, SO, T>;
299
300    #[doc(hidden)]
301    #[inline]
302    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
303        let NewQueue { storing_strategy, dequeue_strategy, _phantom } = self;
304        let queue_store = storing_strategy.new_storage();
305        let dequeue_resource = {
306            Resource::<SO>::new_with_max_count(dequeue_strategy, 0, None)
307                .call_simulation(p.run)?
308        };
309        Result::Ok(Queue {
310            queue_store: queue_store,
311            dequeue_resource: Grc::new(dequeue_resource),
312            count: RefComp::new(0),
313        })
314    }
315}