1use crossbeam_channel::{Receiver, RecvTimeoutError};
2use std::{collections::BinaryHeap, pin::Pin, time::Duration};
3
4use crate::{
5 build_reaction_contexts,
6 event::{PhysicalEvent, ScheduledEvent},
7 keepalive,
8 key_set::KeySetView,
9 store::{ReactionTriggerCtx, Store},
10 Env, Instant, Level, ReactionGraph, ReactionKey, ReactionSet, ReactionSetLimits, Tag,
11};
12
13#[derive(Debug)]
14struct EventQueue {
15 event_queue: BinaryHeap<ScheduledEvent>,
17 free_reaction_sets: Vec<ReactionSet>,
19 reaction_set_limits: ReactionSetLimits,
21}
22
23impl EventQueue {
24 fn new(reaction_set_limits: ReactionSetLimits) -> Self {
25 Self {
26 event_queue: BinaryHeap::new(),
27 free_reaction_sets: Vec::new(),
28 reaction_set_limits,
29 }
30 }
31
32 fn push_event<I>(&mut self, tag: Tag, reactions: I, terminal: bool)
36 where
37 I: IntoIterator<Item = (Level, ReactionKey)>,
38 {
39 let mut reaction_set = self.next_reaction_set();
40 reaction_set.extend_above(reactions);
41 let event = ScheduledEvent {
42 tag,
43 reactions: reaction_set,
44 terminal,
45 };
46 self.event_queue.push(event);
47 }
48
49 fn next_reaction_set(&mut self) -> ReactionSet {
51 self.free_reaction_sets
52 .pop()
53 .map(|mut reaction_set| {
54 reaction_set.clear();
55 reaction_set
56 })
57 .unwrap_or_else(|| ReactionSet::new(&self.reaction_set_limits))
58 }
59
60 fn shutdown(&mut self) {
62 if !self.event_queue.is_empty() {
63 tracing::warn!(
64 "---- There are {} unprocessed future events on the event queue.",
65 self.event_queue.len()
66 );
67 let event = self.event_queue.peek().unwrap();
68 tracing::warn!(
69 "---- The first future event has timestamp {:?} after start time.",
70 event.tag.get_offset()
71 );
72 }
73 }
74}
75
76#[derive(Debug)]
77pub struct Scheduler {
78 store: Pin<Box<Store>>,
80 reaction_graph: ReactionGraph,
82 fast_forward: bool,
84 keep_alive: bool,
86 event_rx: Receiver<PhysicalEvent>,
88 events: EventQueue,
90 start_time: Instant,
92 shutdown_tag: Option<Tag>,
94 shutdown_tx: keepalive::Sender,
96}
97
98impl Scheduler {
99 pub fn new(
111 env: Env,
112 reaction_graph: ReactionGraph,
113 fast_forward: bool,
114 keep_alive: bool,
115 ) -> Self {
116 let (event_tx, event_rx) = crossbeam_channel::unbounded();
117 let (shutdown_tx, shutdown_rx) = keepalive::channel();
118 let start_time = Instant::now();
119
120 let contexts = build_reaction_contexts(&reaction_graph, start_time, event_tx, shutdown_rx);
122
123 let store = Store::new(env, contexts, &reaction_graph);
124 let events = EventQueue::new(reaction_graph.reaction_set_limits.clone());
125 Self {
126 store,
127 reaction_graph,
128 fast_forward,
129 keep_alive,
130 event_rx,
131 events,
132 start_time,
133 shutdown_tag: None,
134 shutdown_tx,
135 }
136 }
137
138 #[tracing::instrument(skip(self))]
140 fn startup(&mut self) {
141 self.start_time = Instant::now();
148
149 let tag = Tag::new(Duration::ZERO, 0);
150
151 let mut reaction_set = self.events.next_reaction_set();
153 reaction_set.extend_above(self.reaction_graph.startup_reactions.iter().copied());
154
155 tracing::info!(tag = %tag, "Starting the execution.");
156 self.process_tag(tag, reaction_set.view());
157 }
158
159 #[tracing::instrument(skip(self))]
161 fn shutdown(&mut self) {
162 tracing::info!("Shutting down.");
163
164 self.shutdown_tx.shutdown();
166 self.events.shutdown();
167
168 tracing::info!(
169 "---- Elapsed logical time: {:?}",
170 self.shutdown_tag.unwrap().get_offset()
171 );
172 let physical_elapsed = Instant::now().checked_duration_since(self.start_time);
174 tracing::info!("---- Elapsed physical time: {:?}", physical_elapsed);
175
176 tracing::info!("Scheduler has been shut down.");
177 }
178
179 #[tracing::instrument(skip(self))]
181 fn receive_event(&mut self) -> Option<PhysicalEvent> {
182 if let Some(shutdown) = self.shutdown_tag {
183 let abs = shutdown.to_logical_time(self.start_time);
184 if let Some(timeout) = abs.checked_duration_since(Instant::now()) {
185 tracing::debug!(timeout = ?timeout, "Waiting for async event.");
186 self.event_rx.recv_timeout(timeout).ok()
187 } else {
188 tracing::debug!("Cannot wait, already past programmed shutdown time...");
189 None
190 }
191 } else if self.keep_alive {
192 tracing::debug!("Waiting indefinitely for async event.");
193 self.event_rx.recv().ok()
194 } else {
195 None
196 }
197 }
198
199 #[tracing::instrument(skip(self))]
200 pub fn event_loop(&mut self) {
201 self.startup();
202 loop {
203 for physical_event in self.event_rx.try_iter() {
205 self.events.push_event(
206 physical_event.tag,
207 physical_event.downstream_reactions(&self.reaction_graph),
208 physical_event.terminal,
209 );
210 }
211
212 if let Some(mut event) = self.events.event_queue.pop() {
213 tracing::debug!(event = %event, "Handling event");
214
215 if !self.fast_forward {
216 let target = event.tag.to_logical_time(self.start_time);
217 if let Some(async_event) = self.synchronize_wall_clock(target) {
218 if async_event.tag < event.tag {
220 self.events.event_queue.push(event);
222 self.events.event_queue.push(async_event);
223 continue;
224 } else {
225 self.events.event_queue.push(async_event);
226 }
227 }
228 }
229
230 self.process_tag(event.tag, event.reactions.view());
231
232 self.events.free_reaction_sets.push(event.reactions);
234
235 if event.terminal {
236 break;
238 }
239 } else if let Some(event) = self.receive_event() {
240 self.events.push_event(
241 event.tag,
242 event.downstream_reactions(&self.reaction_graph),
243 event.terminal,
244 );
245 } else {
246 tracing::debug!("No more events in queue. -> Terminate!");
247 let tag = Tag::now(self.start_time);
249 self.shutdown_tag = Some(tag);
250 self.events.push_event(
251 tag,
252 self.reaction_graph.shutdown_reactions.iter().copied(),
253 true,
254 );
255 }
256 } self.shutdown();
259 }
260
261 #[tracing::instrument(skip(self), fields(target = ?target))]
263 fn synchronize_wall_clock(&mut self, target: Instant) -> Option<ScheduledEvent> {
264 let now = Instant::now();
265
266 if now < target {
267 let advance = target - now;
268 tracing::debug!(advance = ?advance, "Need to sleep");
269
270 match self.event_rx.recv_timeout(advance) {
271 Ok(event) => {
272 tracing::debug!(event = %event, "Sleep interrupted by async event");
273 let mut reactions = self.events.next_reaction_set();
274 reactions.extend_above(event.downstream_reactions(&self.reaction_graph));
275 return Some(ScheduledEvent {
276 tag: event.tag,
277 reactions,
278 terminal: event.terminal,
279 });
280 }
281 Err(RecvTimeoutError::Disconnected) => {
282 let remaining: Option<Duration> = target.checked_duration_since(Instant::now());
283 if let Some(remaining) = remaining {
284 tracing::debug!(remaining = ?remaining,
285 "Sleep interrupted disconnect, sleeping for remaining",
286 );
287 std::thread::sleep(remaining);
288 }
289 }
290 Err(RecvTimeoutError::Timeout) => {}
291 }
292 }
293
294 if now > target {
295 let delay = now - target;
296 tracing::warn!(delay = ?delay, "running late");
297 }
298
299 None
300 }
301
302 #[tracing::instrument(skip(self, reaction_view), fields(tag = %tag))]
306 pub fn process_tag(&mut self, tag: Tag, reaction_view: KeySetView<ReactionKey>) {
307 reaction_view.for_each_level(|level, reaction_keys, next_levels| {
308 tracing::trace!(level=?level, "Iter");
309
310 let iter_ctx = unsafe { self.store.iter_borrow_storage(reaction_keys) };
312
313 #[cfg(feature = "parallel")]
314 use rayon::prelude::ParallelIterator;
315
316 #[cfg(feature = "parallel")]
317 let iter_ctx = rayon::prelude::ParallelBridge::par_bridge(iter_ctx);
318
319 let iter_ctx_res = iter_ctx.map(|trigger_ctx| {
320 let ReactionTriggerCtx {
321 context,
322 reaction,
323 reactor,
324 actions,
325 ref_ports,
326 mut_ports,
327 } = trigger_ctx;
328
329 tracing::trace!(
330 " Executing {reactor_name}/{reaction_name}.",
331 reaction_name = reaction.get_name(),
332 reactor_name = reactor.get_name()
333 );
334
335 context.reset_for_reaction(tag);
336
337 reaction.trigger(context, reactor, actions, ref_ports, mut_ports);
338
339 &context.trigger_res
340 });
341
342 #[cfg(feature = "parallel")]
343 let iter_ctx_res = iter_ctx_res.collect::<Vec<_>>();
344
345 for res in iter_ctx_res {
346 if let Some(shutdown_tag) = res.scheduled_shutdown {
347 if self.shutdown_tag.map(|t| shutdown_tag < t).unwrap_or(true) {
350 self.shutdown_tag = Some(shutdown_tag);
351 self.events.push_event(
352 shutdown_tag,
353 self.reaction_graph.shutdown_reactions.iter().copied(),
354 true,
355 );
356 }
357 }
358
359 for &(action_key, tag) in res.scheduled_actions.iter() {
361 let downstream = self.reaction_graph.action_triggers[action_key]
362 .iter()
363 .copied();
364 self.events.push_event(tag, downstream, false);
365 }
366 }
367
368 let downstream = self
370 .store
371 .iter_set_port_keys()
372 .flat_map(|port_key| self.reaction_graph.port_triggers[port_key].iter());
373
374 if let Some(mut next_levels) = next_levels {
375 next_levels.extend_above(downstream.copied());
376 }
377 });
378
379 self.store.reset_ports();
380 }
381
382 pub fn into_env(self) -> Env {
387 self.store.into_env()
388 }
389}