desim/
lib.rs

1/* Copyright © 2018 Gianmarco Garrisi
2
3This program is free software: you can redistribute it and/or modify
4it under the terms of the GNU General Public License as published by
5the Free Software Foundation, either version 3 of the License, or
6(at your option) any later version.
7
8This program is distributed in the hope that it will be useful,
9but WITHOUT ANY WARRANTY; without even the implied warranty of
10MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11GNU General Public License for more details.
12
13You should have received a copy of the GNU General Public License
14along with this program.  If not, see <http://www.gnu.org/licenses/>. */
15
16//! This crate implements a discrete time event simulation framework
17//! inspired by the SimPy library for Python. It uses the coroutine
18//! feature that is nightly. Once the feature is stabilized, also this
19//! crate will use stable. Coroutines will be the only nightly feature
20//! used in this crate.
21//!
22//! The examples directory in this repository contains full usage examples
23//! of the desim crate as a simulation framework.
24//!
25//! # Simulation
26//! A simulation is performed scheduling one or more processes that
27//! models the environment you are going to simulate. Your model may
28//! consider some kind of finite resource that must be shared among
29//! the processes, e.g. a bunch of servers in a simulation on queues.
30//!
31//! After setting up the simulation, it can be run step-by-step, using
32//! the `step()` method, or all at once, with `run()`, until and ending
33//! condition is met.
34//!
35//! The simulation will generate a log of all the events with a state that
36//! returns `true` to `should_log`.
37//!
38/*
39//! `nonblocking_run` lets you run the simulation in another thread
40//! so that your program can go on without waiting for the simulation
41//! to finish.
42//!
43*/
44//! # Process
45//! A process is implemented using the rust coroutines syntax.
46//! This let us avoid the overhead of spawning a new thread for each
47//! process, while still keeping the use of this framework quite simple.
48//!
49//! When a new process is created in the simulation, an identifier, of type
50//! `ProcessId` is assigned to it. That id can be used to schedule an event that
51//! resumes the process.
52//!
53//! A process can be stopped and resumed later on. To stop the process, the
54//! coroutine yields an `Effect` that specify what the simulator should do.
55//! For example, a coroutine can set a timeout after which it is executed again.
56//! The process may also return. In that case it can not be resumed anymore.
57//!
58//!
59//! # Resource
60//! A resource is a finite amount of entities, eachone of which can be used by one process
61//! a time. When the process does not need the resource anymore, it must release it.
62//!
63//! A resource can be created in the simulation using the `create_resource`
64//! method, which requires the resource to add to the simulation and returns an identifier
65//! for that resource that can be used to require and release it.
66//!
67//! A resource can be required and reelased by a process yielding
68//! the corresponding `Effect`. There is no check on the fact that a process
69//! yielding `Release` was holding a resource with that ID.
70//!
71//! For more information about the `Resource` trait and the `SimpleResource` implementation,
72//! see the [`resources`](crate::resources) module.
73
74#![feature(coroutines, coroutine_trait)]
75use std::cmp::{Ordering, Reverse};
76use std::collections::BinaryHeap;
77use std::ops::{Coroutine, CoroutineState};
78use std::pin::Pin;
79
80pub mod prelude;
81pub mod resources;
82use resources::{Resource, Store};
83
84/// Data structures implementing this trait can be yielded from the coroutine
85/// associated with a `Process`. This allows attaching application-specific data
86/// to `Effect`s. This data is then carried arround by the Simulation, passed
87/// into user callbacks for context or simply logged for later.
88///
89/// As a simple example, implementing SimState for a type (as shown for
90/// ItemState below) allows users to track item stages.
91///
92/// A process can then yield `ItemState` instead of `Effect` types:
93///
94/// ```
95/// #![feature (coroutines, coroutine_trait)]
96/// use desim::{Effect, SimState, Simulation};
97///
98/// // enum used as part of state logged during simulation
99/// #[derive(Clone)]
100/// enum StageType {
101///     FirstPass,
102///     SecondPass,
103/// }
104///
105/// // structure yielded from processes of the simulation
106/// #[derive(Clone)]
107/// struct ItemState {
108///     stage: StageType,
109///     effect: Effect,
110///     log: bool,
111/// }
112///
113/// impl SimState for ItemState {
114///     fn get_effect(&self) -> Effect { self.effect }
115///     fn set_effect(&mut self, e: Effect) { self.effect = e; }
116///     fn should_log(&self) -> bool { self.log }
117/// }
118///
119/// let mut sim = Simulation::new();
120/// sim.create_process(Box::new(move |_| {
121///     yield ItemState { stage: StageType::FirstPass,
122///                       effect: Effect::TimeOut(10.0),
123///                       log: true,
124///                     };
125/// }));
126/// ```
127///
128/// Calling `sim.processed_steps()` then returns a vector of (Event, ItemState)
129/// pairs, one for each yielded value where should_log() returned true.
130///
131/// For a full example, see examples/monitoring-state.rs
132///
133pub trait SimState {
134    fn get_effect(&self) -> Effect;
135    fn set_effect(&mut self, effect: Effect);
136    fn should_log(&self) -> bool;
137}
138
139/// The effect is yelded by a process coroutine to
140/// interact with the simulation environment.
141#[derive(Debug, Copy, Clone)]
142#[non_exhaustive]
143pub enum Effect {
144    /// The process that yields this effect will be resumed
145    /// after the speified time
146    TimeOut(f64),
147    /// Yielding this effect it is possible to schedule the specified event
148    Event {
149        /// Time interval between the current simulation time and the event schedule
150        time: f64,
151        /// Process to execute when the event occur
152        process: ProcessId,
153    },
154    /// This effect is yielded to request a resource
155    Request(ResourceId),
156    /// This effect is yielded to release a resource that is not needed anymore.
157    Release(ResourceId),
158    /// This effect is yielded to push into a store
159    Push(StoreId),
160    /// This effect is yielded to pull out of a store
161    Pull(StoreId),
162    /// Keep the process' state until it is resumed by another event.
163    Wait,
164    /// Logs the event and resume the process immediately.
165    Trace,
166}
167
168/// Identifies a process. Can be used to resume it from another one and to schedule it.
169pub type ProcessId = usize;
170/// Identifies a resource. Can be used to request and release it.
171pub type ResourceId = usize;
172/// Identifies a store. Can be used to push into and pull out of it.
173pub type StoreId = usize;
174/// The type of each `Process` coroutine
175pub type Process<T> = dyn Coroutine<SimContext<T>, Yield = T, Return = ()> + Unpin;
176
177/// This struct provides the methods to create and run the simulation
178/// in a single thread.
179///
180/// It provides methods to create processes and finite resources that
181/// must be shared among them.
182///
183/// See the crate-level documentation for more information about how the
184/// simulation framework works
185pub struct Simulation<T: SimState + Clone> {
186    time: f64,
187    steps: usize,
188    processes: Vec<Option<Box<Process<T>>>>,
189    future_events: BinaryHeap<Reverse<Event<T>>>,
190    processed_events: Vec<(Event<T>, T)>,
191    resources: Vec<Box<dyn Resource<T>>>,
192    stores: Vec<Box<dyn Store<T>>>,
193    future_events_buffer: Vec<Event<T>>,
194}
195
196/// The Simulation Context is the argument used to resume the coroutine.
197/// It can be used to retrieve the simulation time and the effect that caused the process' wake up.
198#[derive(Debug, Clone)]
199pub struct SimContext<T> {
200    time: f64,
201    state: T,
202}
203
204/*
205pub struct ParallelSimulation {
206    processes: Vec<Box<Coroutine<Yield = Effect, Return = ()>>>
207}
208 */
209
210/// An event that can be scheduled by a process, yelding the `Event` `Effect`
211/// or by the owner of a `Simulation` through the `schedule` method
212#[derive(Debug, Copy, Clone)]
213pub struct Event<T> {
214    /// Time interval between the current simulation time and the event schedule
215    time: f64,
216    /// Process to execute when the event occur
217    process: ProcessId,
218    /// Effect that generated the event
219    state: T,
220}
221
222/// Specify which condition must be met for the simulation to stop.
223pub enum EndCondition {
224    /// Run the simulation until a certain point in time is reached.
225    Time(f64),
226    /// Run the simulation until there are no more events scheduled.
227    NoEvents,
228    /// Execute exactly N steps of the simulation.
229    NSteps(usize),
230}
231
232impl<T: 'static + SimState + Clone> Simulation<T> {
233    /// Create a new `Simulation` environment.
234    pub fn new() -> Simulation<T> {
235        Simulation::<T>::default()
236    }
237
238    /// Returns the current simulation time
239    pub fn time(&self) -> f64 {
240        self.time
241    }
242
243    /// Returns the log of processed events
244    pub fn processed_events(&self) -> &[(Event<T>, T)] {
245        self.processed_events.as_slice()
246    }
247
248    /// Create a process.
249    ///
250    /// For more information about a process, see the crate level documentation
251    ///
252    /// Returns the identifier of the process.
253    pub fn create_process(
254        &mut self,
255        process: Box<dyn Coroutine<SimContext<T>, Yield = T, Return = ()> + Unpin>,
256    ) -> ProcessId {
257        let id = self.processes.len();
258        self.processes.push(Some(process));
259        id
260    }
261
262    /// Create a new resource.
263    ///
264    /// For more information about a resource, see the crate level documentation
265    /// and the documentation of the [`resources`](crate::resources) module.
266    ///
267    /// Returns the identifier of the resource
268    pub fn create_resource(&mut self, resource: Box<dyn Resource<T>>) -> ResourceId {
269        let id = self.resources.len();
270        self.resources.push(resource);
271        id
272    }
273
274    /// Create a new store.
275    ///
276    /// For more information about a store, see the crate level documentation
277    /// and the documentation of the [`resources`](crate::resources) module.
278    ///
279    /// Returns the identifier of the store
280    pub fn create_store(&mut self, store: Box<dyn Store<T>>) -> StoreId {
281        let id = self.stores.len();
282        self.stores.push(store);
283        id
284    }
285
286    /// Schedule a process to be executed after `time` time instants.
287    /// Another way to schedule events is
288    /// yielding `Effect::Event` from a process during the simulation.
289    // TODO: Review this API
290    pub fn schedule_event(&mut self, time: f64, process: ProcessId, state: T) {
291        self.future_events
292            .push(Reverse(Event::new(time, process, state)));
293    }
294
295    fn log_processed_event(&mut self, event: &Event<T>, sim_state: T) {
296        if sim_state.should_log() {
297            self.processed_events.push((event.clone(), sim_state));
298        }
299    }
300
301    /// Proceed in the simulation by 1 step
302    pub fn step(&mut self) {
303        self.steps += 1;
304        if let Some(Reverse(event)) = self.future_events.pop() {
305            self.time = event.time();
306            let gstatepin = Pin::new(
307                self.processes[event.process]
308                    .as_mut()
309                    .expect("ERROR. Tried to resume a completed process."),
310            )
311            .resume(SimContext {
312                time: self.time,
313                state: event.state().clone(),
314            });
315            // log event
316            // logging needs to happen before the processing because processing
317            // can add further events (such as resource acquired/released) and
318            // it becomes confusing if you first get a resource acquired event
319            // and only log the request for it afterwards.
320            match gstatepin.clone() {
321                CoroutineState::Yielded(y) => {
322                    self.log_processed_event(&event, y);
323                }
324                CoroutineState::Complete(_) => {}
325            }
326            // process event
327            match gstatepin {
328                CoroutineState::Yielded(y) => {
329                    let effect = y.get_effect();
330                    match effect {
331                        Effect::TimeOut(t) => self.future_events.push(Reverse(Event {
332                            time: self.time + t,
333                            process: event.process(),
334                            state: y,
335                        })),
336                        Effect::Event { time, process } => {
337                            let e = Event::new(time + self.time, process, y);
338                            self.future_events.push(Reverse(e))
339                        }
340                        Effect::Request(r) => {
341                            let res = &mut self.resources[r];
342                            let request_event = Event::new(self.time, event.process(), y);
343                            if let Some(e) = res.allocate_or_enqueue(request_event) {
344                                self.future_events.push(Reverse(e))
345                            }
346                        }
347                        Effect::Release(r) => {
348                            let res = &mut self.resources[r];
349                            let release_event = Event::new(self.time, event.process(), y);
350                            if let Some(e) = res.release_and_schedule_next(release_event.clone()) {
351                                self.future_events.push(Reverse(e));
352                            }
353                            // after releasing the resource the process
354                            // can be resumed
355                            self.future_events.push(Reverse(release_event));
356                        }
357                        Effect::Wait => {}
358                        Effect::Trace => {
359                            // this event is only for tracing, reschedule
360                            // immediately'
361                            let e = Event::new(self.time, event.process(), y);
362                            self.future_events.push(Reverse(e));
363                        }
364                        Effect::Push(s) => {
365                            let store = &mut self.stores[s];
366                            let request_event = Event::new(self.time, event.process(), y);
367                            store.push_or_enqueue_and_schedule_next(
368                                request_event,
369                                &mut self.future_events_buffer,
370                            );
371                            self.future_events
372                                .extend(self.future_events_buffer.drain(..).map(Reverse));
373                        }
374                        Effect::Pull(s) => {
375                            let store = &mut self.stores[s];
376                            let request_event = Event::new(self.time, event.process(), y);
377                            store.pull_or_enqueue_and_schedule_next(
378                                request_event,
379                                &mut self.future_events_buffer,
380                            );
381                            self.future_events
382                                .extend(self.future_events_buffer.drain(..).map(Reverse));
383                        }
384                    }
385                }
386                CoroutineState::Complete(_) => {
387                    // FIXME: removing the process from the vector would invalidate
388                    // all existing `ProcessId`s, but keeping it would be a
389                    // waste of space since it is completed.
390                    // May be worth to use another data structure.
391                    // At least let's remove the coroutine itself.
392                    self.processes[event.process()].take();
393                }
394            }
395        }
396    }
397
398    /// Run the simulation until and ending condition is met.
399    pub fn run(mut self, until: EndCondition) -> Simulation<T> {
400        while !self.check_ending_condition(&until) {
401            self.step();
402        }
403        self
404    }
405    /*
406        pub fn nonblocking_run(mut self, until: EndCondition) -> thread::JoinHandle<Simulation> {
407            thread::spawn(move || {
408                self.run(until)
409            })
410        }
411    */
412
413    /// Return `true` if the ending condition was met, `false` otherwise.
414    fn check_ending_condition(&self, ending_condition: &EndCondition) -> bool {
415        match &ending_condition {
416            EndCondition::Time(t) => self.time >= *t,
417            EndCondition::NoEvents => self.future_events.is_empty(),
418            EndCondition::NSteps(n) => self.steps == *n,
419        }
420    }
421}
422
423impl<T> SimContext<T> {
424    /// Returns current simulation time.
425    pub fn time(&self) -> f64 {
426        self.time
427    }
428
429    /// Returns the `Effect` that caused the process to wake up
430    pub fn state(&self) -> &T {
431        &self.state
432    }
433}
434
435impl<T> Event<T> {
436    pub fn new(time: f64, process: ProcessId, state: T) -> Event<T> {
437        Event {
438            time,
439            process,
440            state,
441        }
442    }
443    pub fn time(&self) -> f64 {
444        self.time
445    }
446    pub fn set_time(&mut self, time: f64) {
447        self.time = time;
448    }
449    pub fn process(&self) -> ProcessId {
450        self.process
451    }
452    pub fn set_process(&mut self, process: ProcessId) {
453        self.process = process;
454    }
455    pub fn state(&self) -> &T {
456        &self.state
457    }
458    pub fn state_mut(&mut self) -> &mut T {
459        &mut self.state
460    }
461    pub fn set_state(&mut self, state: T) {
462        self.state = state;
463    }
464}
465
466impl<T: SimState> Event<T> {
467    pub fn effect(&self) -> Effect {
468        self.state.get_effect()
469    }
470    pub fn set_effect(&mut self, effect: Effect) {
471        self.state.set_effect(effect)
472    }
473}
474
475impl<T: SimState + Clone> Default for Simulation<T> {
476    fn default() -> Self {
477        Simulation::<T> {
478            time: 0.0,
479            steps: 0,
480            processes: Vec::default(),
481            future_events: BinaryHeap::default(),
482            processed_events: Vec::default(),
483            resources: Vec::default(),
484            stores: Vec::default(),
485            future_events_buffer: Vec::default(),
486        }
487    }
488}
489
490impl<T> PartialEq for Event<T> {
491    fn eq(&self, other: &Event<T>) -> bool {
492        self.time == other.time
493    }
494}
495
496impl<T> Eq for Event<T> {}
497
498impl<T> PartialOrd for Event<T> {
499    #[allow(clippy::incorrect_partial_ord_impl_on_ord_type)]
500    fn partial_cmp(&self, other: &Event<T>) -> Option<Ordering> {
501        self.time.partial_cmp(&other.time)
502    }
503}
504
505impl<T> Ord for Event<T> {
506    fn cmp(&self, other: &Event<T>) -> Ordering {
507        match self.time.partial_cmp(&other.time) {
508            Some(o) => o,
509            None => panic!("Event time was uncomparable. Maybe a NaN"),
510        }
511    }
512}
513
514impl SimState for Effect {
515    fn get_effect(&self) -> Effect {
516        *self
517    }
518    fn set_effect(&mut self, e: Effect) {
519        *self = e;
520    }
521    fn should_log(&self) -> bool {
522        true
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    #[test]
529    fn it_works() {
530        use crate::{Effect, Simulation};
531
532        let mut s = Simulation::new();
533        let p = s.create_process(Box::new(|_| {
534            let mut a = 0.0;
535            loop {
536                a += 1.0;
537
538                yield Effect::TimeOut(a);
539            }
540        }));
541        s.schedule_event(0.0, p, Effect::TimeOut(0.));
542        s.step();
543        s.step();
544        assert_eq!(s.time(), 1.0);
545        s.step();
546        assert_eq!(s.time(), 3.0);
547        s.step();
548        assert_eq!(s.time(), 6.0);
549    }
550
551    #[test]
552    fn run() {
553        use crate::{Effect, EndCondition, Simulation};
554
555        let mut s = Simulation::new();
556        let p = s.create_process(Box::new(|_| {
557            let tik = 0.7;
558            loop {
559                println!("tik");
560                yield Effect::TimeOut(tik);
561            }
562        }));
563        s.schedule_event(0.0, p, Effect::TimeOut(0.));
564        let s = s.run(EndCondition::Time(10.0));
565        println!("{}", s.time());
566        assert!(s.time() >= 10.0);
567    }
568
569    #[test]
570    fn resource() {
571        use crate::resources::SimpleResource;
572        use crate::{Effect, EndCondition::NoEvents, Simulation};
573
574        let mut s = Simulation::new();
575        let r = s.create_resource(Box::new(SimpleResource::new(1)));
576
577        // simple process that lock the resource for 7 time units
578        let p1 = s.create_process(Box::new(move |_| {
579            yield Effect::Request(r);
580            yield Effect::TimeOut(7.0);
581            yield Effect::Release(r);
582        }));
583        // simple process that holds the resource for 3 time units
584        let p2 = s.create_process(Box::new(move |_| {
585            yield Effect::Request(r);
586            yield Effect::TimeOut(3.0);
587            yield Effect::Release(r);
588        }));
589
590        // let p1 start immediately...
591        s.schedule_event(0.0, p1, Effect::TimeOut(0.));
592        // let p2 start after 2 t.u., when r is not available
593        s.schedule_event(2.0, p2, Effect::TimeOut(2.));
594        // p2 will wait r to be free (time 7.0) and its timeout
595        // of 3.0 t.u. The simulation will end at time 10.0
596
597        let s = s.run(NoEvents);
598        println!("{:?}", s.processed_events());
599        assert_eq!(s.time(), 10.0);
600    }
601
602    #[test]
603    fn store() {
604        use crate::resources::SimpleStore;
605        use crate::{Effect, EndCondition::NoEvents, Simulation};
606
607        let mut sim = Simulation::new();
608        let store = sim.create_store(Box::new(SimpleStore::new(1)));
609
610        // simple process that pulls out of the store immediately and after 7 time units
611        let p1 = sim.create_process(Box::new(move |_| {
612            yield Effect::Pull(store);
613            yield Effect::TimeOut(7.0);
614            yield Effect::Pull(store);
615        }));
616        // simple process that pushes into the store immediately and after 3 time units
617        let p2 = sim.create_process(Box::new(move |_| {
618            yield Effect::Push(store);
619            yield Effect::TimeOut(3.0);
620            yield Effect::Push(store);
621        }));
622
623        // let p1 start immediately...
624        sim.schedule_event(0.0, p1, Effect::TimeOut(0.));
625        // let p2 start after 2 t.u., when r is not available
626        sim.schedule_event(2.0, p2, Effect::TimeOut(2.));
627        // p2 will wait r to be free (time 7.0) and its timeout
628        // of 3.0 t.u. The simulation will end at time 10.0
629
630        let s = sim.run(NoEvents);
631        println!("{:?}", s.processed_events());
632        assert_eq!(s.time(), 9.0);
633    }
634}