dvcompute_branch/simulation/internal/
event_queue.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
8use std::cell::UnsafeCell;
9
10#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
11use std::ops::Deref;
12
13#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
14use std::rc::Rc;
15
16#[cfg(feature="dist_mode")]
17use libc::*;
18
19use crate::simulation::specs::SpecsRepr;
20use crate::simulation::point::Point;
21use crate::simulation::event::EventRepr;
22
23#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
24use crate::simulation;
25
26#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
27use crate::simulation::error::*;
28
29#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
30use crate::simulation::event::*;
31
32#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
33use crate::simulation::utils::priority_queue::PriorityQueue;
34
35/// The event queue for the optimistic Time Warp method.
36#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
37pub struct EventQueue {
38
39    /// The current event time.
40    time: UnsafeCell<f64>,
41
42    /// The priority queue.
43    pq: UnsafeCell<PriorityQueue<(f64, isize), Rc<EventRepr>>>
44}
45
46#[cfg(any(feature="branch_mode", feature="branch_wasm_mode"))]
47impl EventQueue {
48
49    /// Create a new event queue for the sequential simulation.
50    pub fn new(specs: &SpecsRepr) -> EventQueue {
51        EventQueue {
52            time: UnsafeCell::new(specs.start_time),
53            pq: UnsafeCell::new(PriorityQueue::new())
54        }
55    }
56
57    /// Clone the event queue at the specified modeling time point.
58    pub fn clone_at(&self, _p: &Point) -> EventQueue {
59        let t0 = unsafe {
60            *(self.time.get())
61        };
62        let pq = self.pq.get();
63        let pq = unsafe {
64            (*pq).clone()
65        };
66        EventQueue {
67            time: UnsafeCell::new(t0),
68            pq: UnsafeCell::new(pq)
69        }
70    }
71
72    /// Enqueue the event by the specified activation time and handler generator.
73    #[inline]
74    pub fn enqueue_event(&self, event_time: f64, priority: isize, comp: EventRepr, p: &Point) {
75        assert!(event_time >= p.time, "The event time cannot be less than the current modeling time");
76        let pq = self.pq.get();
77        unsafe {
78            *pq = (*pq).enqueue((event_time, - priority), Rc::new(comp));
79        }
80    }
81
82    /// Enqueue the IO-based event by the specified activation time and handler generator.
83    #[inline]
84    pub fn enqueue_io_event(&self, event_time: f64, priority: isize, comp: EventRepr, p: &Point) {
85        self.enqueue_event(event_time, priority, comp, p)
86    }
87
88    /// Run the pending events.
89    pub fn run_events(&self, including_current: bool, p: &Point) -> simulation::Result<()> {
90        loop {
91            let t0 = self.time.get();
92            let pq = self.pq.get();
93            unsafe {
94                match (*pq).front_key() {
95                    Some((t2, _)) if *t2 < *t0 => {
96                        panic!("t={}: The time value ({}) is too small. The event queue is desynchronized", *t0, *t2);
97                    }
98                    Some((t2, _)) if *t2 < p.time || (including_current && *t2 == p.time) => {
99                        *t0 = *t2;
100                    }
101                    _ => {
102                        break;
103                    }
104                }
105            }
106            let ((t2, pri2), c2) = unsafe {
107                (*pq).front().unwrap()
108            };
109            let t2 = *t2;
110            let c2 = c2.deref().clone();
111            let p2 = p.run.point_at(t2, -(*pri2), p.minimal_priority);
112            unsafe {
113                *pq = (*pq).dequeue()
114            }
115            match c2.call_event(&p2) {
116                Result::Ok(()) => (),
117                Result::Err(Error::Cancel) => (),
118                Result::Err(Error::Other(x)) => {
119                    match x.deref() {
120                        &OtherError::Retry(_) => {
121                            return Result::Err(Error::Other(x.clone()))
122                        },
123                        &OtherError::Panic(_) => {
124                            return Result::Err(Error::Other(x.clone()))
125                        },
126                        &OtherError::IO(_) => {
127                            return Result::Err(Error::Other(x.clone()))
128                        }
129                    }
130                }
131            }
132        }
133        Result::Ok(())
134    }
135}
136
137/// Represents the event queue.
138#[cfg(feature="dist_mode")]
139pub type EventQueue = c_void;
140
141#[cfg(all(feature="dist_mode", not(feature="dist_core_mode")))]
142#[cfg_attr(windows, link(name = "dvcompute_core_dist.dll"))]
143#[cfg_attr(not(windows), link(name = "dvcompute_core_dist"))]
144extern {
145
146    /// Create a new event queue.
147    #[doc(hidden)]
148    pub fn create_extern_event_queue(specs: *const SpecsRepr) -> *mut EventQueue;
149
150    /// Delete the event queue.
151    #[doc(hidden)]
152    pub fn delete_extern_event_queue(queue: *mut EventQueue);
153
154    /// Enqueue a new event.
155    #[doc(hidden)]
156    pub fn enqueue_extern_event(queue: *mut EventQueue, event_time: f64, priority: isize, comp: EventRepr, p: *const Point);
157
158    /// Enqueue a new IO event.
159    #[doc(hidden)]
160    pub fn enqueue_extern_io_event(queue: *mut EventQueue, event_time: f64, priority: isize, comp: EventRepr, p: *const Point);
161
162    /// Synchronize the events.
163    #[doc(hidden)]
164    pub fn sync_extern_events(including_current: isize, p: *const Point);
165
166    /// Leave the simulation.
167    #[doc(hidden)]
168    pub fn leave_extern_simulation(queue: *mut EventQueue, p: *const Point);
169}
170
171#[cfg(all(feature="dist_mode", feature="dist_core_mode"))]
172extern {
173
174    /// Create a new event queue.
175    #[doc(hidden)]
176    pub fn create_extern_event_queue(specs: *const SpecsRepr) -> *mut EventQueue;
177
178    /// Delete the event queue.
179    #[doc(hidden)]
180    pub fn delete_extern_event_queue(queue: *mut EventQueue);
181
182    /// Enqueue a new event.
183    #[doc(hidden)]
184    pub fn enqueue_extern_event(queue: *mut EventQueue, event_time: f64, priority: isize, comp: EventRepr, p: *const Point);
185
186    /// Enqueue a new IO event.
187    #[doc(hidden)]
188    pub fn enqueue_extern_io_event(queue: *mut EventQueue, event_time: f64, priority: isize, comp: EventRepr, p: *const Point);
189
190    /// Synchronize the events.
191    #[doc(hidden)]
192    pub fn sync_extern_events(including_current: isize, p: *const Point);
193
194    /// Leave the simulation.
195    #[doc(hidden)]
196    pub fn leave_extern_simulation(queue: *mut EventQueue, p: *const Point);
197}