dvcompute_dist/simulation/resource/
mod.rs1use crate::simulation;
8use crate::simulation::error::*;
9use crate::simulation::Run;
10use crate::simulation::Point;
11use crate::simulation::ref_comp::RefComp;
12use crate::simulation::simulation::*;
13use crate::simulation::event::*;
14use crate::simulation::process::*;
15use crate::simulation::strategy::*;
16
17use dvcompute_utils::grc::Grc;
18
19pub mod stats;
21
22#[inline]
24pub fn request_resource<S>(resource: Grc<Resource<S>>) -> Request<S>
25 where S: QueueStrategy + Clone + 'static
26{
27 Request { resource: resource }
28}
29
30#[inline]
32pub fn request_resource_with_priority<S>(resource: Grc<Resource<S>>, priority: S::Priority) -> RequestWithPriority<S>
33 where S: QueueStrategy + Clone + 'static,
34 S::Priority: Clone
35{
36 RequestWithPriority { resource: resource, priority: priority }
37}
38
39#[inline]
41pub fn release_resource<S>(resource: Grc<Resource<S>>) -> Release<S>
42 where S: QueueStrategy
43{
44 Release { resource: resource }
45}
46
47#[inline]
49pub fn release_resource_within_event<S>(resource: Grc<Resource<S>>) -> ReleaseWithinEvent<S>
50 where S: QueueStrategy
51{
52 ReleaseWithinEvent { resource: resource }
53}
54
55#[inline]
58pub fn try_request_resource_within_event<S>(resource: Grc<Resource<S>>) -> TryRequestWithinEvent<S>
59 where S: QueueStrategy
60{
61 TryRequestWithinEvent { resource: resource }
62}
63
64#[inline]
66pub fn new_fcfs_resource(count: isize) -> NewResource<FCFSStrategy> {
67 NewResource { strategy: FCFSStrategy::Instance, count: count, max_count: Some(count) }
68}
69
70#[inline]
72pub fn new_fcfs_resource_with_max_count(count: isize, max_count: Option<isize>) -> NewResource<FCFSStrategy> {
73 NewResource { strategy: FCFSStrategy::Instance, count: count, max_count: max_count }
74}
75
76#[inline]
78pub fn new_lcfs_resource(count: isize) -> NewResource<LCFSStrategy> {
79 NewResource { strategy: LCFSStrategy::Instance, count: count, max_count: Some(count) }
80}
81
82#[inline]
84pub fn new_lcfs_resource_with_max_count(count: isize, max_count: Option<isize>) -> NewResource<LCFSStrategy> {
85 NewResource { strategy: LCFSStrategy::Instance, count: count, max_count: max_count }
86}
87
88pub type FCFSResource = Resource<FCFSStrategy>;
90
91pub type LCFSResource = Resource<LCFSStrategy>;
93
94pub struct Resource<S> where S: QueueStrategy {
96
97 max_count: Option<isize>,
99
100 count: RefComp<isize>,
102
103 wait_list: QueueStorageBox<ResourceItem, S::Priority>
105}
106
107impl<S> PartialEq for Resource<S> where S: QueueStrategy {
108
109 fn eq(&self, other: &Self) -> bool {
110 self.count == other.count
111 }
112}
113
114impl<S> Eq for Resource<S> where S: QueueStrategy {}
115
116#[derive(Clone)]
118struct ResourceItem {
119
120 pid: Grc<ProcessId>,
122
123 cont: FrozenProcess<()>
125}
126
127impl<S> Resource<S> where S: QueueStrategy {
128
129 #[inline]
132 pub fn new(strategy: S, count: isize) -> NewResource<S> {
133 NewResource { strategy: strategy, count: count, max_count: Some(count) }
134 }
135
136 #[inline]
139 pub fn new_with_max_count(strategy: S, count: isize, max_count: Option<isize>) -> NewResource<S> {
140 NewResource { strategy: strategy, count: count, max_count: max_count }
141 }
142
143 #[inline]
145 pub fn count(resource: Grc<Self>) -> impl Event<Item = isize> + Clone {
146 cons_event(move |p| {
147 Result::Ok(resource.count.read_at(p))
148 })
149 }
150}
151
152#[derive(Clone)]
154pub struct NewResource<S> {
155
156 strategy: S,
158
159 count: isize,
161
162 max_count: Option<isize>
164}
165
166impl<S> Simulation for NewResource<S> where S: QueueStrategy {
167
168 type Item = Resource<S>;
169
170 #[doc(hidden)]
171 #[inline]
172 fn call_simulation(self, _r: &Run) -> simulation::Result<Self::Item> {
173 let NewResource { strategy, count, max_count } = self;
174 if count < 0 {
175 let msg = String::from("The resource count cannot be actually negative");
176 let err = Error::retry(msg);
177 Result::Err(err)
178 } else if count > max_count.unwrap_or(count) {
179 let msg = String::from("The resource count cannot be greater than its upper bound");
180 let err = Error::retry(msg);
181 Result::Err(err)
182 } else {
183 let wait_list = strategy.new_storage();
184 Result::Ok(Resource {
185 max_count: max_count,
186 count: RefComp::new(count),
187 wait_list: wait_list
188 })
189 }
190 }
191}
192
193#[must_use = "computations are lazy and do nothing unless to be run"]
195#[derive(Clone)]
196pub struct Request<S>
197 where S: QueueStrategy + Clone + 'static
198{
199 resource: Grc<Resource<S>>
201}
202
203impl<S> Process for Request<S>
204 where S: QueueStrategy + Clone + 'static
205{
206 type Item = ();
207
208 #[doc(hidden)]
209 #[inline]
210 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
211 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
212 {
213 let Request { resource } = self;
214 let a = resource.count.read_at(p);
215 if a > 0 {
216 resource.count.write_at(a - 1, p);
217 resume_process(cont, pid, (), p)
218 } else {
219 let comp = Request { resource: resource.clone() };
220 let cont = ProcessBoxCont::new(cont);
221 let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
222 let item = ResourceItem {
223 pid: pid,
224 cont: cont
225 };
226 resource.wait_list.push(item, p);
227 Result::Ok(())
228 }
229 }
230
231 #[doc(hidden)]
232 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
233 let Request { resource } = self;
234 let a = resource.count.read_at(p);
235 if a > 0 {
236 resource.count.write_at(a - 1, p);
237 resume_process_boxed(cont, pid, (), p)
238 } else {
239 let comp = Request { resource: resource.clone() };
240 let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
241 let item = ResourceItem {
242 pid: pid,
243 cont: cont
244 };
245 resource.wait_list.push(item, p);
246 Result::Ok(())
247 }
248 }
249}
250
251#[must_use = "computations are lazy and do nothing unless to be run"]
253#[derive(Clone)]
254pub struct RequestWithPriority<S>
255 where S: QueueStrategy + Clone + 'static
256{
257 resource: Grc<Resource<S>>,
259
260 priority: S::Priority
262}
263
264impl<S> Process for RequestWithPriority<S>
265 where S: QueueStrategy + Clone + 'static,
266 S::Priority: Clone
267{
268 type Item = ();
269
270 #[doc(hidden)]
271 #[inline]
272 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
273 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
274 {
275 let RequestWithPriority { resource, priority } = self;
276 let a = resource.count.read_at(p);
277 if a > 0 {
278 resource.count.write_at(a - 1, p);
279 resume_process(cont, pid, (), p)
280 } else {
281 let comp = RequestWithPriority { resource: resource.clone(), priority: priority.clone() };
282 let cont = ProcessBoxCont::new(cont);
283 let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
284 let item = ResourceItem {
285 pid: pid,
286 cont: cont
287 };
288 resource.wait_list.push_with_priority(priority, item, p);
289 Result::Ok(())
290 }
291 }
292
293 #[doc(hidden)]
294 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
295 let RequestWithPriority { resource, priority } = self;
296 let a = resource.count.read_at(p);
297 if a > 0 {
298 resource.count.write_at(a - 1, p);
299 resume_process_boxed(cont, pid, (), p)
300 } else {
301 let comp = RequestWithPriority { resource: resource.clone(), priority: priority.clone() };
302 let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
303 let item = ResourceItem {
304 pid: pid,
305 cont: cont
306 };
307 resource.wait_list.push_with_priority(priority, item, p);
308 Result::Ok(())
309 }
310 }
311}
312
313#[must_use = "computations are lazy and do nothing unless to be run"]
315#[derive(Clone)]
316pub struct Release<S> where S: QueueStrategy + 'static {
317
318 resource: Grc<Resource<S>>
320}
321
322impl<S> Process for Release<S>
323 where S: QueueStrategy
324{
325 type Item = ();
326
327 #[doc(hidden)]
328 #[inline]
329 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
330 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
331 {
332 let Release { resource } = self;
333 let comp = ReleaseWithinEvent { resource: resource };
334 comp.call_event(p)?;
335 resume_process(cont, pid, (), p)
336 }
337
338 #[doc(hidden)]
339 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
340 let Release { resource } = self;
341 let comp = ReleaseWithinEvent { resource: resource };
342 comp.call_event(p)?;
343 resume_process_boxed(cont, pid, (), p)
344 }
345}
346
347#[must_use = "computations are lazy and do nothing unless to be run"]
349#[derive(Clone)]
350pub struct ReleaseWithinEvent<S> where S: QueueStrategy + 'static {
351
352 resource: Grc<Resource<S>>
354}
355
356impl<S> Event for ReleaseWithinEvent<S>
357 where S: QueueStrategy
358{
359 type Item = ();
360
361 #[doc(hidden)]
362 fn call_event(self, p: &Point) -> simulation::Result<()> {
363 let ReleaseWithinEvent { resource } = self;
364 let a = resource.count.read_at(p);
365 let a2 = a + 1;
366 if a2 > resource.max_count.unwrap_or(a2) {
367 let msg = String::from("The resource count cannot be greater than its upper bound");
368 let err = Error::retry(msg);
369 Result::Err(err)
370 } else {
371 let t = p.time;
372 loop {
373 if resource.wait_list.is_empty(p) {
374 resource.count.write_at(a2, p);
375 break;
376 } else {
377 let ResourceItem { pid, cont: cont0 } = {
378 resource.wait_list.pop(p).unwrap()
379 };
380 match cont0.unfreeze(p)? {
381 None => continue,
382 Some(cont) => {
383 enqueue_event(t, {
384 cons_event(move |p| {
385 resume_process_boxed(cont, pid, (), p)
386 }).into_boxed()
387 }).call_event(p)?;
388 break;
389 }
390 }
391 }
392 }
393 Result::Ok(())
394 }
395 }
396}
397
398#[must_use = "computations are lazy and do nothing unless to be run"]
401#[derive(Clone)]
402pub struct TryRequestWithinEvent<S>
403 where S: QueueStrategy + 'static
404{
405 resource: Grc<Resource<S>>
407}
408
409impl<S> Event for TryRequestWithinEvent<S>
410 where S: QueueStrategy
411{
412 type Item = bool;
413
414 #[doc(hidden)]
415 #[inline]
416 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
417 let TryRequestWithinEvent { resource } = self;
418 let a = resource.count.read_at(p);
419 if a > 0 {
420 resource.count.write_at(a - 1, p);
421 Result::Ok(true)
422 } else {
423 Result::Ok(false)
424 }
425 }
426}