dvcompute_dist/simulation/resource/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use crate::simulation;
8use crate::simulation::error::*;
9use crate::simulation::Run;
10use crate::simulation::Point;
11use crate::simulation::ref_comp::RefComp;
12use crate::simulation::simulation::*;
13use crate::simulation::event::*;
14use crate::simulation::process::*;
15use crate::simulation::strategy::*;
16
17use dvcompute_utils::grc::Grc;
18
19/// The resources that gather its statistics.
20pub mod stats;
21
22/// Request for the resource within `Process` computation.
23#[inline]
24pub fn request_resource<S>(resource: Grc<Resource<S>>) -> Request<S>
25    where S: QueueStrategy + Clone + 'static
26{
27    Request { resource: resource }
28}
29
30/// Request for the resource within `Process` computation with the specified priority.
31#[inline]
32pub fn request_resource_with_priority<S>(resource: Grc<Resource<S>>, priority: S::Priority) -> RequestWithPriority<S>
33    where S: QueueStrategy + Clone + 'static,
34          S::Priority: Clone
35{
36    RequestWithPriority { resource: resource, priority: priority }
37}
38
39/// Release the resource within `Process` computation.
40#[inline]
41pub fn release_resource<S>(resource: Grc<Resource<S>>) -> Release<S>
42    where S: QueueStrategy
43{
44    Release { resource: resource }
45}
46
47/// Release the resource within `Event` computation.
48#[inline]
49pub fn release_resource_within_event<S>(resource: Grc<Resource<S>>) -> ReleaseWithinEvent<S>
50    where S: QueueStrategy
51{
52    ReleaseWithinEvent { resource: resource }
53}
54
55/// Try to request for the resource immediately and return a flag
56/// indicating whether the resource was aquired.
57#[inline]
58pub fn try_request_resource_within_event<S>(resource: Grc<Resource<S>>) -> TryRequestWithinEvent<S>
59    where S: QueueStrategy
60{
61    TryRequestWithinEvent { resource: resource }
62}
63
64/// Create a new FCFS (a.k.a FIFO) resource by the specified initial count that becomes the capacity as well.
65#[inline]
66pub fn new_fcfs_resource(count: isize) -> NewResource<FCFSStrategy> {
67    NewResource { strategy: FCFSStrategy::Instance, count: count, max_count: Some(count) }
68}
69
70/// Create a new FCFS (a.k.a FIFO) resource by the specified initial count and optional maximum count, i.e. capacity.
71#[inline]
72pub fn new_fcfs_resource_with_max_count(count: isize, max_count: Option<isize>) -> NewResource<FCFSStrategy> {
73    NewResource { strategy: FCFSStrategy::Instance, count: count, max_count: max_count }
74}
75
76/// Create a new LCFS (a.k.a LIFO) resource by the specified initial count that becomes the capacity as well.
77#[inline]
78pub fn new_lcfs_resource(count: isize) -> NewResource<LCFSStrategy> {
79    NewResource { strategy: LCFSStrategy::Instance, count: count, max_count: Some(count) }
80}
81
82/// Create a new LCFS (a.k.a LIFO) resource by the specified initial count and optional maximum count, i.e. capacity.
83#[inline]
84pub fn new_lcfs_resource_with_max_count(count: isize, max_count: Option<isize>) -> NewResource<LCFSStrategy> {
85    NewResource { strategy: LCFSStrategy::Instance, count: count, max_count: max_count }
86}
87
88/// The `Resource` based on using the FCFS (a.k.a. FIFO) strategy (First Come - First Served).
89pub type FCFSResource = Resource<FCFSStrategy>;
90
91/// The `Resource` based on using the LCFS (a.k.a. LIFO) strategy (Last Come - First Served).
92pub type LCFSResource = Resource<LCFSStrategy>;
93
94/// Represents a simple resource that gathers its statistics.
95pub struct Resource<S> where S: QueueStrategy {
96
97    /// The maximum count of the resource, where `None` means that there is no upper bound.
98    max_count: Option<isize>,
99
100    /// The count.
101    count: RefComp<isize>,
102
103    /// The Wait list.
104    wait_list: QueueStorageBox<ResourceItem, S::Priority>
105}
106
107impl<S> PartialEq for Resource<S> where S: QueueStrategy {
108
109    fn eq(&self, other: &Self) -> bool {
110        self.count == other.count
111    }
112}
113
114impl<S> Eq for Resource<S> where S: QueueStrategy {}
115
116/// Identifies the resource item.
117#[derive(Clone)]
118struct ResourceItem {
119
120    /// The process identifier.
121    pid: Grc<ProcessId>,
122
123    /// The continuation of the corresponding process.
124    cont: FrozenProcess<()>
125}
126
127impl<S> Resource<S> where S: QueueStrategy {
128
129    /// Create a new resource by the specified queue storage and initial count,
130    /// where the latter becomes the capacity as well.
131    #[inline]
132    pub fn new(strategy: S, count: isize) -> NewResource<S> {
133        NewResource { strategy: strategy, count: count, max_count: Some(count) }
134    }
135
136    /// Create a new resource by the specified queue storage, initial count
137    /// and optional maximum count, i.e. capacity.
138    #[inline]
139    pub fn new_with_max_count(strategy: S, count: isize, max_count: Option<isize>) -> NewResource<S> {
140        NewResource { strategy: strategy, count: count, max_count: max_count }
141    }
142
143    /// Return the current available count of the resource.
144    #[inline]
145    pub fn count(resource: Grc<Self>) -> impl Event<Item = isize> + Clone {
146        cons_event(move |p| {
147            Result::Ok(resource.count.read_at(p))
148        })
149    }
150}
151
152/// Computation that creates a new `Resource`.
153#[derive(Clone)]
154pub struct NewResource<S> {
155
156    /// The queue strategy.
157    strategy: S,
158
159    /// The initial count.
160    count: isize,
161
162    /// The optional maximum count.
163    max_count: Option<isize>
164}
165
166impl<S> Simulation for NewResource<S> where S: QueueStrategy {
167
168    type Item = Resource<S>;
169
170    #[doc(hidden)]
171    #[inline]
172    fn call_simulation(self, _r: &Run) -> simulation::Result<Self::Item> {
173        let NewResource { strategy, count, max_count } = self;
174        if count < 0 {
175            let msg = String::from("The resource count cannot be actually negative");
176            let err = Error::retry(msg);
177            Result::Err(err)
178        } else if count > max_count.unwrap_or(count) {
179            let msg = String::from("The resource count cannot be greater than its upper bound");
180            let err = Error::retry(msg);
181            Result::Err(err)
182        } else {
183            let wait_list = strategy.new_storage();
184            Result::Ok(Resource {
185                max_count: max_count,
186                count: RefComp::new(count),
187                wait_list: wait_list
188            })
189        }
190    }
191}
192
193/// Request for the resource.
194#[must_use = "computations are lazy and do nothing unless to be run"]
195#[derive(Clone)]
196pub struct Request<S>
197    where S: QueueStrategy + Clone + 'static
198{
199    /// The resource.
200    resource: Grc<Resource<S>>
201}
202
203impl<S> Process for Request<S>
204    where S: QueueStrategy + Clone + 'static
205{
206    type Item = ();
207
208    #[doc(hidden)]
209    #[inline]
210    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
211        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
212    {
213        let Request { resource } = self;
214        let a = resource.count.read_at(p);
215        if a > 0 {
216            resource.count.write_at(a - 1, p);
217            resume_process(cont, pid, (), p)
218        } else {
219            let comp = Request { resource: resource.clone() };
220            let cont = ProcessBoxCont::new(cont);
221            let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
222            let item = ResourceItem {
223                pid: pid,
224                cont: cont
225            };
226            resource.wait_list.push(item, p);
227            Result::Ok(())
228        }
229    }
230
231    #[doc(hidden)]
232    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
233        let Request { resource } = self;
234        let a = resource.count.read_at(p);
235        if a > 0 {
236            resource.count.write_at(a - 1, p);
237            resume_process_boxed(cont, pid, (), p)
238        } else {
239            let comp = Request { resource: resource.clone() };
240            let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
241            let item = ResourceItem {
242                pid: pid,
243                cont: cont
244            };
245            resource.wait_list.push(item, p);
246            Result::Ok(())
247        }
248    }
249}
250
251/// Request for the resource with priority.
252#[must_use = "computations are lazy and do nothing unless to be run"]
253#[derive(Clone)]
254pub struct RequestWithPriority<S>
255    where S: QueueStrategy + Clone + 'static
256{
257    /// The resource.
258    resource: Grc<Resource<S>>,
259
260    /// The priority of the request.
261    priority: S::Priority
262}
263
264impl<S> Process for RequestWithPriority<S>
265    where S: QueueStrategy + Clone + 'static,
266          S::Priority: Clone
267{
268    type Item = ();
269
270    #[doc(hidden)]
271    #[inline]
272    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
273        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
274    {
275        let RequestWithPriority { resource, priority } = self;
276        let a = resource.count.read_at(p);
277        if a > 0 {
278            resource.count.write_at(a - 1, p);
279            resume_process(cont, pid, (), p)
280        } else {
281            let comp = RequestWithPriority { resource: resource.clone(), priority: priority.clone() };
282            let cont = ProcessBoxCont::new(cont);
283            let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
284            let item = ResourceItem {
285                pid: pid,
286                cont: cont
287            };
288            resource.wait_list.push_with_priority(priority, item, p);
289            Result::Ok(())
290        }
291    }
292
293    #[doc(hidden)]
294    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
295        let RequestWithPriority { resource, priority } = self;
296        let a = resource.count.read_at(p);
297        if a > 0 {
298            resource.count.write_at(a - 1, p);
299            resume_process_boxed(cont, pid, (), p)
300        } else {
301            let comp = RequestWithPriority { resource: resource.clone(), priority: priority.clone() };
302            let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
303            let item = ResourceItem {
304                pid: pid,
305                cont: cont
306            };
307            resource.wait_list.push_with_priority(priority, item, p);
308            Result::Ok(())
309        }
310    }
311}
312
313/// Release the resource.
314#[must_use = "computations are lazy and do nothing unless to be run"]
315#[derive(Clone)]
316pub struct Release<S> where S: QueueStrategy + 'static {
317
318    /// The resource.
319    resource: Grc<Resource<S>>
320}
321
322impl<S> Process for Release<S>
323    where S: QueueStrategy
324{
325    type Item = ();
326
327    #[doc(hidden)]
328    #[inline]
329    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
330        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
331    {
332        let Release { resource } = self;
333        let comp = ReleaseWithinEvent { resource: resource };
334        comp.call_event(p)?;
335        resume_process(cont, pid, (), p)
336    }
337
338    #[doc(hidden)]
339    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
340        let Release { resource } = self;
341        let comp = ReleaseWithinEvent { resource: resource };
342        comp.call_event(p)?;
343        resume_process_boxed(cont, pid, (), p)
344    }
345}
346
347/// Release the resource.
348#[must_use = "computations are lazy and do nothing unless to be run"]
349#[derive(Clone)]
350pub struct ReleaseWithinEvent<S> where S: QueueStrategy + 'static {
351
352    /// The resource.
353    resource: Grc<Resource<S>>
354}
355
356impl<S> Event for ReleaseWithinEvent<S>
357    where S: QueueStrategy
358{
359    type Item = ();
360
361    #[doc(hidden)]
362    fn call_event(self, p: &Point) -> simulation::Result<()> {
363        let ReleaseWithinEvent { resource } = self;
364        let a  = resource.count.read_at(p);
365        let a2 = a + 1;
366        if a2 > resource.max_count.unwrap_or(a2) {
367            let msg = String::from("The resource count cannot be greater than its upper bound");
368            let err = Error::retry(msg);
369            Result::Err(err)
370        } else {
371            let t = p.time;
372            loop {
373                if resource.wait_list.is_empty(p) {
374                    resource.count.write_at(a2, p);
375                    break;
376                } else {
377                    let ResourceItem { pid, cont: cont0 } = {
378                        resource.wait_list.pop(p).unwrap()
379                    };
380                    match cont0.unfreeze(p)? {
381                        None => continue,
382                        Some(cont) => {
383                            enqueue_event(t, {
384                                cons_event(move |p| {
385                                    resume_process_boxed(cont, pid, (), p)
386                                }).into_boxed()
387                            }).call_event(p)?;
388                            break;
389                        }
390                    }
391                }
392            }
393            Result::Ok(())
394        }
395    }
396}
397
398/// Try to request for the resource immediately and return a flag
399/// indicating whether the resource was aquired.
400#[must_use = "computations are lazy and do nothing unless to be run"]
401#[derive(Clone)]
402pub struct TryRequestWithinEvent<S>
403    where S: QueueStrategy + 'static
404{
405    /// The resource.
406    resource: Grc<Resource<S>>
407}
408
409impl<S> Event for TryRequestWithinEvent<S>
410    where S: QueueStrategy
411{
412    type Item = bool;
413
414    #[doc(hidden)]
415    #[inline]
416    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
417        let TryRequestWithinEvent { resource } = self;
418        let a = resource.count.read_at(p);
419        if a > 0 {
420            resource.count.write_at(a - 1, p);
421            Result::Ok(true)
422        } else {
423            Result::Ok(false)
424        }
425    }
426}