dvcompute_branch/simulation/queue/
mod.rs1use std::rc::Rc;
8use std::marker::PhantomData;
9
10use crate::simulation;
11use crate::simulation::error::*;
12use crate::simulation::Point;
13use crate::simulation::ref_comp::RefComp;
14use crate::simulation::simulation::*;
15use crate::simulation::event::*;
16use crate::simulation::process::*;
17use crate::simulation::strategy::*;
18use crate::simulation::resource::*;
19
20use dvcompute_utils::grc::Grc;
21
22pub mod stats;
24
25pub mod unbounded;
27
28pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>;
31
32pub type LCFSQueue<T> = Queue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>;
35
36pub struct Queue<SI, SM, SO, T>
40 where SI: QueueStrategy + 'static,
41 SM: QueueStrategy,
42 SO: QueueStrategy + 'static
43{
44 max_count: isize,
46
47 enqueue_resource: Grc<Resource<SI>>,
49
50 queue_store: QueueStorageBox<T, SM::Priority>,
52
53 dequeue_resource: Grc<Resource<SO>>,
55
56 count: RefComp<isize>
58}
59
60#[inline]
62pub fn new_fcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>
63 where T: 'static
64{
65 NewQueue {
66 enqueue_strategy: FCFSStrategy::Instance,
67 storing_strategy: FCFSStrategy::Instance,
68 dequeue_strategy: FCFSStrategy::Instance,
69 max_count: max_count,
70 _phantom: PhantomData
71 }
72}
73
74#[inline]
76pub fn new_lcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>
77 where T: 'static
78{
79 NewQueue {
80 enqueue_strategy: FCFSStrategy::Instance,
81 storing_strategy: LCFSStrategy::Instance,
82 dequeue_strategy: FCFSStrategy::Instance,
83 max_count: max_count,
84 _phantom: PhantomData
85 }
86}
87
88impl<SI, SM, SO, T> Queue<SI, SM, SO, T>
89 where SI: QueueStrategy + Clone + 'static,
90 SM: QueueStrategy + Clone + 'static,
91 SO: QueueStrategy + Clone + 'static,
92 T: Clone + 'static
93{
94 #[inline]
96 pub fn new(enqueue_strategy: SI,
97 storing_strategy: SM,
98 dequeue_strategy: SO,
99 max_count: isize) -> NewQueue<SI, SM, SO, T>
100 {
101 NewQueue {
102 enqueue_strategy: enqueue_strategy,
103 storing_strategy: storing_strategy,
104 dequeue_strategy: dequeue_strategy,
105 max_count: max_count,
106 _phantom: PhantomData
107 }
108 }
109
110 #[inline]
112 pub fn max_count(&self) -> isize {
113 self.max_count
114 }
115
116 #[inline]
118 pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
119 cons_event(move |p| {
120 Result::Ok(queue.count.read_at(p) == 0)
121 })
122 }
123
124 #[inline]
126 pub fn is_full(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
127 cons_event(move |p| {
128 Result::Ok(queue.count.read_at(p) == queue.max_count)
129 })
130 }
131
132 #[inline]
134 pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
135 cons_event(move |p| {
136 Result::Ok(queue.count.read_at(p))
137 })
138 }
139
140 #[inline]
142 pub fn load_factor(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
143 cons_event(move |p| {
144 Result::Ok({
145 let x = queue.count.read_at(p);
146 let y = queue.max_count;
147 (x as f64) / (y as f64)
148 })
149 })
150 }
151
152 pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> + Clone {
154 request_resource(queue.dequeue_resource.clone())
155 .and_then(move |()| {
156 cons_event(move |p| {
157 queue.dequeue_extract(p)
158 })
159 .into_process()
160 })
161 }
162
163 pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T> + Clone
165 where SO::Priority: Clone
166 {
167 request_resource_with_priority(queue.dequeue_resource.clone(), po)
168 .and_then(move |()| {
169 cons_event(move |p| {
170 queue.dequeue_extract(p)
171 })
172 .into_process()
173 })
174 }
175
176 pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> + Clone {
178 try_request_resource_within_event(queue.dequeue_resource.clone())
179 .and_then(move |f| {
180 if f {
181 cons_event(move |p| {
182 let x = queue.dequeue_extract(p)?;
183 Result::Ok(Some(x))
184 }).into_boxed()
185 } else {
186 return_event(None)
187 .into_boxed()
188 }
189 })
190 }
191
192 pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool> + Clone
195 where T: PartialEq
196 {
197 let pred = move |x: &T| { *x == item };
198 Queue::delete_by(queue, pred)
199 .map(|x| { x.is_some() })
200 }
201
202 pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()> + Clone
204 where T: PartialEq
205 {
206 let pred = move |x: &T| { *x == item };
207 Queue::delete_by(queue, pred)
208 .map(|_| ())
209 }
210
211 pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>> + Clone
213 where F: Fn(&T) -> bool + Clone + 'static
214 {
215 try_request_resource_within_event(queue.dequeue_resource.clone())
216 .and_then(move |f| {
217 if f {
218 cons_event(move |p| {
219 let pred = move |x: &T| { pred(x) };
220 let pred = Rc::new(pred);
221 match queue.queue_store.remove_boxed_by(pred, p) {
222 None => {
223 release_resource_within_event(queue.dequeue_resource.clone())
224 .call_event(p)?;
225 Result::Ok(None)
226 },
227 Some(i) => {
228 let x = queue.dequeue_post_extract(i, p)?;
229 Result::Ok(Some(x))
230 }
231 }
232 }).into_boxed()
233 } else {
234 return_event(None)
235 .into_boxed()
236 }
237 })
238 }
239
240 pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool> + Clone
242 where F: Fn(&T) -> bool + Clone + 'static
243 {
244 cons_event(move |p| {
245 let pred = move |x: &T| { pred(x) };
246 let pred = Rc::new(pred);
247 Result::Ok(queue.queue_store.exists_boxed(pred, p))
248 })
249 }
250
251 pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>> + Clone
253 where F: Fn(&T) -> bool + Clone + 'static,
254 T: Clone
255 {
256 cons_event(move |p| {
257 let pred = move |x: &T| { pred(x) };
258 let pred = Rc::new(pred);
259 Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.clone() }))
260 })
261 }
262
263 pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
265 cons_event(move |p| {
266 loop {
267 let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
268 match x {
269 None => return Result::Ok(()),
270 Some(_) => {}
271 }
272 }
273 })
274 }
275
276 pub fn enqueue(queue: Grc<Self>, item: T) -> impl Process<Item = ()> + Clone {
278 request_resource(queue.enqueue_resource.clone())
279 .and_then(move |()| {
280 cons_event(move |p| {
281 queue.enqueue_store(item, p)
282 })
283 .into_process()
284 })
285 }
286
287 pub fn enqueue_with_input_priority(queue: Grc<Self>, pi: SI::Priority, item: T) -> impl Process<Item = ()> + Clone
290 where SI::Priority: Clone
291 {
292 request_resource_with_priority(queue.enqueue_resource.clone(), pi)
293 .and_then(move |()| {
294 cons_event(move |p| {
295 queue.enqueue_store(item, p)
296 })
297 .into_process()
298 })
299 }
300
301 pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Process<Item = ()> + Clone
304 where SM::Priority: Clone
305 {
306 request_resource(queue.enqueue_resource.clone())
307 .and_then(move |()| {
308 cons_event(move |p| {
309 queue.enqueue_store_with_priority(pm, item, p)
310 })
311 .into_process()
312 })
313 }
314
315 pub fn enqueue_with_input_and_storing_priorities(queue: Grc<Self>, pi: SI::Priority, pm: SM::Priority, item: T) -> impl Process<Item = ()> + Clone
318 where SI::Priority: Clone,
319 SM::Priority: Clone
320 {
321 request_resource_with_priority(queue.enqueue_resource.clone(), pi)
322 .and_then(move |()| {
323 cons_event(move |p| {
324 queue.enqueue_store_with_priority(pm, item, p)
325 })
326 .into_process()
327 })
328 }
329
330 pub fn try_enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = bool> + Clone {
332 cons_event(move |p| {
333 let x = {
334 try_request_resource_within_event(queue.enqueue_resource.clone())
335 .call_event(p)
336 }?;
337 if x {
338 queue.enqueue_store(item, p)?;
339 Result::Ok(true)
340 } else {
341 Result::Ok(false)
342 }
343 })
344 }
345
346 pub fn try_enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> + Clone
349 where SM::Priority: Clone
350 {
351 cons_event(move |p| {
352 let x = {
353 try_request_resource_within_event(queue.enqueue_resource.clone())
354 .call_event(p)
355 }?;
356 if x {
357 queue.enqueue_store_with_priority(pm, item, p)?;
358 Result::Ok(true)
359 } else {
360 Result::Ok(false)
361 }
362 })
363 }
364
365 fn dequeue_extract(&self, p: &Point) -> simulation::Result<T> {
367 let i = self.queue_store.pop(p).unwrap();
368 self.dequeue_post_extract(i, p)
369 }
370
371 fn dequeue_post_extract(&self, i: T, p: &Point) -> simulation::Result<T> {
373 let c = self.count.read_at(p);
374 let c2 = c - 1;
375 self.count.write_at(c2, p);
376 release_resource_within_event(self.enqueue_resource.clone())
377 .call_event(p)?;
378 Result::Ok(i)
379 }
380
381 fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
383 self.queue_store.push(item, p);
384 let c = self.count.read_at(p);
385 let c2 = c + 1;
386 self.count.write_at(c2, p);
387 release_resource_within_event(self.dequeue_resource.clone())
388 .call_event(p)
389 }
390
391 fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
393 self.queue_store.push_with_priority(pm, item, p);
394 let c = self.count.read_at(p);
395 let c2 = c + 1;
396 self.count.write_at(c2, p);
397 release_resource_within_event(self.dequeue_resource.clone())
398 .call_event(p)
399 }
400}
401
402#[derive(Clone)]
404pub struct NewQueue<SI, SM, SO, T> {
405
406 enqueue_strategy: SI,
408
409 storing_strategy: SM,
411
412 dequeue_strategy: SO,
414
415 max_count: isize,
417
418 _phantom: PhantomData<T>
420}
421
422impl<SI, SM, SO, T> Event for NewQueue<SI, SM, SO, T>
423 where SI: QueueStrategy + 'static,
424 SM: QueueStrategy,
425 SO: QueueStrategy + 'static,
426 T: Clone + 'static
427{
428 type Item = Queue<SI, SM, SO, T>;
429
430 #[doc(hidden)]
431 #[inline]
432 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
433 let NewQueue { enqueue_strategy, storing_strategy, dequeue_strategy, max_count, _phantom } = self;
434 if max_count < 0 {
435 let msg = String::from("The queue capacity cannot be actually negative");
436 let err = Error::retry(msg);
437 Result::Err(err)
438 } else {
439 let enqueue_resource = {
440 Resource::<SI>::new_with_max_count(enqueue_strategy, max_count, Some(max_count))
441 .call_simulation(p.run)?
442 };
443 let queue_store = storing_strategy.new_storage();
444 let dequeue_resource = {
445 Resource::<SO>::new_with_max_count(dequeue_strategy, 0, Some(max_count))
446 .call_simulation(p.run)?
447 };
448 Result::Ok(Queue {
449 max_count: max_count,
450 enqueue_resource: Grc::new(enqueue_resource),
451 queue_store: queue_store,
452 dequeue_resource: Grc::new(dequeue_resource),
453 count: RefComp::new(0)
454 })
455 }
456 }
457}