dvcompute_cons/simulation/queue/unbounded/
mod.rs1use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::Point;
11use crate::simulation::ref_comp::RefComp;
12use crate::simulation::simulation::*;
13use crate::simulation::event::*;
14use crate::simulation::process::*;
15use crate::simulation::strategy::*;
16use crate::simulation::resource::*;
17
18use dvcompute_utils::grc::Grc;
19
20pub mod stats;
22
23pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, T>;
26
27pub type LCFSQueue<T> = Queue<LCFSStrategy, FCFSStrategy, T>;
30
31pub struct Queue<SM, SO, T>
34 where SM: QueueStrategy,
35 SO: QueueStrategy + 'static
36{
37 queue_store: QueueStorageBox<T, SM::Priority>,
39
40 dequeue_resource: Grc<Resource<SO>>,
42
43 count: RefComp<isize>
45}
46
47#[inline]
49pub fn new_fcfs_queue<T>() -> NewQueue<FCFSStrategy, FCFSStrategy, T>
50 where T: 'static
51{
52 NewQueue {
53 storing_strategy: FCFSStrategy::Instance,
54 dequeue_strategy: FCFSStrategy::Instance,
55 _phantom: PhantomData
56 }
57}
58
59#[inline]
61pub fn new_lcfs_queue<T>() -> NewQueue<LCFSStrategy, FCFSStrategy, T>
62 where T: 'static
63{
64 NewQueue {
65 storing_strategy: LCFSStrategy::Instance,
66 dequeue_strategy: FCFSStrategy::Instance,
67 _phantom: PhantomData
68 }
69}
70
71impl<SM, SO, T> Queue<SM, SO, T>
72 where SM: QueueStrategy + 'static,
73 SO: QueueStrategy + 'static,
74 T: Clone + 'static
75{
76 #[inline]
78 pub fn new(storing_strategy: SM, dequeue_strategy: SO) -> NewQueue<SM, SO, T> {
79 NewQueue {
80 storing_strategy: storing_strategy,
81 dequeue_strategy: dequeue_strategy,
82 _phantom: PhantomData
83 }
84 }
85
86 #[inline]
88 pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
89 cons_event(move |p| {
90 Result::Ok(queue.count.read_at(p) == 0)
91 })
92 }
93
94 #[inline]
96 pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
97 cons_event(move |p| {
98 Result::Ok(queue.count.read_at(p))
99 })
100 }
101
102 pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
104 request_resource(queue.dequeue_resource.clone())
105 .and_then(move |()| {
106 cons_event(move |p| {
107 queue.dequeue_extract(p)
108 })
109 .into_process()
110 })
111 }
112
113 pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
115 where SO::Priority: Clone
116 {
117 request_resource_with_priority(queue.dequeue_resource.clone(), po)
118 .and_then(move |()| {
119 cons_event(move |p| {
120 queue.dequeue_extract(p)
121 })
122 .into_process()
123 })
124 }
125
126 pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
128 try_request_resource_within_event(queue.dequeue_resource.clone())
129 .and_then(move |f| {
130 if f {
131 cons_event(move |p| {
132 let x = queue.dequeue_extract(p)?;
133 Result::Ok(Some(x))
134 }).into_boxed()
135 } else {
136 return_event(None)
137 .into_boxed()
138 }
139 })
140 }
141
142 pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
145 where T: PartialEq
146 {
147 let pred = move |x: &T| { *x == item };
148 Queue::delete_by(queue, pred)
149 .map(|x| { x.is_some() })
150 }
151
152 pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
154 where T: PartialEq
155 {
156 let pred = move |x: &T| { *x == item };
157 Queue::delete_by(queue, pred)
158 .map(|_| ())
159 }
160
161 pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
163 where F: Fn(&T) -> bool + 'static
164 {
165 try_request_resource_within_event(queue.dequeue_resource.clone())
166 .and_then(move |f| {
167 if f {
168 cons_event(move |p| {
169 let pred = move |x: &T| { pred(x) };
170 let pred = Box::new(pred);
171 match queue.queue_store.remove_boxed_by(pred, p) {
172 None => {
173 release_resource_within_event(queue.dequeue_resource.clone())
174 .call_event(p)?;
175 Result::Ok(None)
176 },
177 Some(i) => {
178 let x = queue.dequeue_post_extract(i, p)?;
179 Result::Ok(Some(x))
180 }
181 }
182 }).into_boxed()
183 } else {
184 return_event(None)
185 .into_boxed()
186 }
187 })
188 }
189
190 pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
192 where F: Fn(&T) -> bool + 'static
193 {
194 cons_event(move |p| {
195 let pred = move |x: &T| { pred(x) };
196 let pred = Box::new(pred);
197 Result::Ok(queue.queue_store.exists_boxed(pred, p))
198 })
199 }
200
201 pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
203 where F: Fn(&T) -> bool + 'static,
204 T: Clone
205 {
206 cons_event(move |p| {
207 let pred = move |x: &T| { pred(x) };
208 let pred = Box::new(pred);
209 Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.clone() }))
210 })
211 }
212
213 pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
215 cons_event(move |p| {
216 loop {
217 let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
218 match x {
219 None => return Result::Ok(()),
220 Some(_) => {}
221 }
222 }
223 })
224 }
225
226 #[inline]
228 pub fn enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
229 cons_event(move |p| {
230 queue.enqueue_store(item, p)
231 })
232 }
233
234 #[inline]
236 pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()>
237 where SM::Priority: Clone
238 {
239 cons_event(move |p| {
240 queue.enqueue_store_with_priority(pm, item, p)
241 })
242 }
243
244 fn dequeue_extract(&self, p: &Point) -> simulation::Result<T> {
246 let i = self.queue_store.pop(p).unwrap();
247 self.dequeue_post_extract(i, p)
248 }
249
250 fn dequeue_post_extract(&self, i: T, p: &Point) -> simulation::Result<T> {
252 let c = self.count.read_at(p);
253 let c2 = c - 1;
254 self.count.write_at(c2, p);
255 Result::Ok(i)
256 }
257
258 fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
260 self.queue_store.push(item, p);
261 let c = self.count.read_at(p);
262 let c2 = c + 1;
263 self.count.write_at(c2, p);
264 release_resource_within_event(self.dequeue_resource.clone())
265 .call_event(p)
266 }
267
268 fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
270 self.queue_store.push_with_priority(pm, item, p);
271 let c = self.count.read_at(p);
272 let c2 = c + 1;
273 self.count.write_at(c2, p);
274 release_resource_within_event(self.dequeue_resource.clone())
275 .call_event(p)
276 }
277}
278
279#[derive(Clone)]
281pub struct NewQueue<SM, SO, T> {
282
283 storing_strategy: SM,
285
286 dequeue_strategy: SO,
288
289 _phantom: PhantomData<T>
291}
292
293impl<SM, SO, T> Event for NewQueue<SM, SO, T>
294 where SM: QueueStrategy,
295 SO: QueueStrategy + 'static,
296 T: 'static
297{
298 type Item = Queue<SM, SO, T>;
299
300 #[doc(hidden)]
301 #[inline]
302 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
303 let NewQueue { storing_strategy, dequeue_strategy, _phantom } = self;
304 let queue_store = storing_strategy.new_storage();
305 let dequeue_resource = {
306 Resource::<SO>::new_with_max_count(dequeue_strategy, 0, None)
307 .call_simulation(p.run)?
308 };
309 Result::Ok(Queue {
310 queue_store: queue_store,
311 dequeue_resource: Grc::new(dequeue_resource),
312 count: RefComp::new(0),
313 })
314 }
315}