boomerang_runtime/
sched.rs

1use crossbeam_channel::{Receiver, RecvTimeoutError};
2use std::{collections::BinaryHeap, pin::Pin, time::Duration};
3
4use crate::{
5    build_reaction_contexts,
6    event::{PhysicalEvent, ScheduledEvent},
7    keepalive,
8    key_set::KeySetView,
9    store::{ReactionTriggerCtx, Store},
10    Env, Instant, Level, ReactionGraph, ReactionKey, ReactionSet, ReactionSetLimits, Tag,
11};
12
13#[derive(Debug)]
14struct EventQueue {
15    /// Current event queue
16    event_queue: BinaryHeap<ScheduledEvent>,
17    /// Recycled ReactionSets to avoid allocations
18    free_reaction_sets: Vec<ReactionSet>,
19    /// Limits for the reaction sets
20    reaction_set_limits: ReactionSetLimits,
21}
22
23impl EventQueue {
24    fn new(reaction_set_limits: ReactionSetLimits) -> Self {
25        Self {
26            event_queue: BinaryHeap::new(),
27            free_reaction_sets: Vec::new(),
28            reaction_set_limits,
29        }
30    }
31
32    /// Push an event into the event queue
33    ///
34    /// A free event is pulled from the `free_events` vector and then modified with the provided function.
35    fn push_event<I>(&mut self, tag: Tag, reactions: I, terminal: bool)
36    where
37        I: IntoIterator<Item = (Level, ReactionKey)>,
38    {
39        let mut reaction_set = self.next_reaction_set();
40        reaction_set.extend_above(reactions);
41        let event = ScheduledEvent {
42            tag,
43            reactions: reaction_set,
44            terminal,
45        };
46        self.event_queue.push(event);
47    }
48
49    /// Get a free [`ReactionSet`] or create a new one if none are available.
50    fn next_reaction_set(&mut self) -> ReactionSet {
51        self.free_reaction_sets
52            .pop()
53            .map(|mut reaction_set| {
54                reaction_set.clear();
55                reaction_set
56            })
57            .unwrap_or_else(|| ReactionSet::new(&self.reaction_set_limits))
58    }
59
60    /// If the event queue still has events on it, report that.
61    fn shutdown(&mut self) {
62        if !self.event_queue.is_empty() {
63            tracing::warn!(
64                "---- There are {} unprocessed future events on the event queue.",
65                self.event_queue.len()
66            );
67            let event = self.event_queue.peek().unwrap();
68            tracing::warn!(
69                "---- The first future event has timestamp {:?} after start time.",
70                event.tag.get_offset()
71            );
72        }
73    }
74}
75
76#[derive(Debug)]
77pub struct Scheduler {
78    /// The reactor runtime store
79    store: Pin<Box<Store>>,
80    /// The reaction graph containing all static dependency and relationship information
81    reaction_graph: ReactionGraph,
82    /// Whether to skip wall-clock synchronization
83    fast_forward: bool,
84    /// Whether to keep the scheduler alive for any possible asynchronous events
85    keep_alive: bool,
86    /// Asynchronous events receiver
87    event_rx: Receiver<PhysicalEvent>,
88    /// Event queue
89    events: EventQueue,
90    /// Initial wall-clock time.
91    start_time: Instant,
92    /// A shutdown has been scheduled at this time.
93    shutdown_tag: Option<Tag>,
94    /// Shutdown channel
95    shutdown_tx: keepalive::Sender,
96}
97
98impl Scheduler {
99    /// Create a new Scheduler instance.
100    ///
101    /// The Scheduler will be initialized with the provided environment and reaction graph.
102    ///
103    /// # Arguments
104    ///
105    /// * `env` - The environment containing all the runtime data structures.
106    /// * `reaction_graph` - The reaction graph containing all static dependency and relationship information.
107    /// * `fast_forward` - Whether to skip wall-clock synchronization.
108    /// * `keep_alive` - Whether to keep the scheduler alive for any possible asynchronous events. If `false`, the
109    ///    scheduler will terminate when there are no more events to process.
110    pub fn new(
111        env: Env,
112        reaction_graph: ReactionGraph,
113        fast_forward: bool,
114        keep_alive: bool,
115    ) -> Self {
116        let (event_tx, event_rx) = crossbeam_channel::unbounded();
117        let (shutdown_tx, shutdown_rx) = keepalive::channel();
118        let start_time = Instant::now();
119
120        // Build contexts for each reaction
121        let contexts = build_reaction_contexts(&reaction_graph, start_time, event_tx, shutdown_rx);
122
123        let store = Store::new(env, contexts, &reaction_graph);
124        let events = EventQueue::new(reaction_graph.reaction_set_limits.clone());
125        Self {
126            store,
127            reaction_graph,
128            fast_forward,
129            keep_alive,
130            event_rx,
131            events,
132            start_time,
133            shutdown_tag: None,
134            shutdown_tx,
135        }
136    }
137
138    /// Execute startup of the Scheduler.
139    #[tracing::instrument(skip(self))]
140    fn startup(&mut self) {
141        //#[cfg(feature = "parallel")]
142        //rayon::ThreadPoolBuilder::new()
143        //    .num_threads(4)
144        //    .build_global()
145        //    .unwrap();
146
147        self.start_time = Instant::now();
148
149        let tag = Tag::new(Duration::ZERO, 0);
150
151        // For all Timers, pump later events onto the queue and create an initial ReactionSet to process.
152        let mut reaction_set = self.events.next_reaction_set();
153        reaction_set.extend_above(self.reaction_graph.startup_reactions.iter().copied());
154
155        tracing::info!(tag = %tag, "Starting the execution.");
156        self.process_tag(tag, reaction_set.view());
157    }
158
159    /// Final shutdown of the Scheduler. The last tag has already been processed.
160    #[tracing::instrument(skip(self))]
161    fn shutdown(&mut self) {
162        tracing::info!("Shutting down.");
163
164        // Signal to any waiting threads that the scheduler is shutting down.
165        self.shutdown_tx.shutdown();
166        self.events.shutdown();
167
168        tracing::info!(
169            "---- Elapsed logical time: {:?}",
170            self.shutdown_tag.unwrap().get_offset()
171        );
172        // If physical_start_time is 0, then execution didn't get far enough along to initialize this.
173        let physical_elapsed = Instant::now().checked_duration_since(self.start_time);
174        tracing::info!("---- Elapsed physical time: {:?}", physical_elapsed);
175
176        tracing::info!("Scheduler has been shut down.");
177    }
178
179    /// Try to receive an asynchronous event
180    #[tracing::instrument(skip(self))]
181    fn receive_event(&mut self) -> Option<PhysicalEvent> {
182        if let Some(shutdown) = self.shutdown_tag {
183            let abs = shutdown.to_logical_time(self.start_time);
184            if let Some(timeout) = abs.checked_duration_since(Instant::now()) {
185                tracing::debug!(timeout = ?timeout, "Waiting for async event.");
186                self.event_rx.recv_timeout(timeout).ok()
187            } else {
188                tracing::debug!("Cannot wait, already past programmed shutdown time...");
189                None
190            }
191        } else if self.keep_alive {
192            tracing::debug!("Waiting indefinitely for async event.");
193            self.event_rx.recv().ok()
194        } else {
195            None
196        }
197    }
198
199    #[tracing::instrument(skip(self))]
200    pub fn event_loop(&mut self) {
201        self.startup();
202        loop {
203            // Push pending events into the queue
204            for physical_event in self.event_rx.try_iter() {
205                self.events.push_event(
206                    physical_event.tag,
207                    physical_event.downstream_reactions(&self.reaction_graph),
208                    physical_event.terminal,
209                );
210            }
211
212            if let Some(mut event) = self.events.event_queue.pop() {
213                tracing::debug!(event = %event, "Handling event");
214
215                if !self.fast_forward {
216                    let target = event.tag.to_logical_time(self.start_time);
217                    if let Some(async_event) = self.synchronize_wall_clock(target) {
218                        // Woken up by async event
219                        if async_event.tag < event.tag {
220                            // Re-insert both events to order them
221                            self.events.event_queue.push(event);
222                            self.events.event_queue.push(async_event);
223                            continue;
224                        } else {
225                            self.events.event_queue.push(async_event);
226                        }
227                    }
228                }
229
230                self.process_tag(event.tag, event.reactions.view());
231
232                // Return the ReactionSet to the free pool
233                self.events.free_reaction_sets.push(event.reactions);
234
235                if event.terminal {
236                    // Break out of the event loop;
237                    break;
238                }
239            } else if let Some(event) = self.receive_event() {
240                self.events.push_event(
241                    event.tag,
242                    event.downstream_reactions(&self.reaction_graph),
243                    event.terminal,
244                );
245            } else {
246                tracing::debug!("No more events in queue. -> Terminate!");
247                // Shutdown event will be processed at the next event loop iteration
248                let tag = Tag::now(self.start_time);
249                self.shutdown_tag = Some(tag);
250                self.events.push_event(
251                    tag,
252                    self.reaction_graph.shutdown_reactions.iter().copied(),
253                    true,
254                );
255            }
256        } // loop
257
258        self.shutdown();
259    }
260
261    // Wait until the wall-clock time is reached
262    #[tracing::instrument(skip(self), fields(target = ?target))]
263    fn synchronize_wall_clock(&mut self, target: Instant) -> Option<ScheduledEvent> {
264        let now = Instant::now();
265
266        if now < target {
267            let advance = target - now;
268            tracing::debug!(advance = ?advance, "Need to sleep");
269
270            match self.event_rx.recv_timeout(advance) {
271                Ok(event) => {
272                    tracing::debug!(event = %event, "Sleep interrupted by async event");
273                    let mut reactions = self.events.next_reaction_set();
274                    reactions.extend_above(event.downstream_reactions(&self.reaction_graph));
275                    return Some(ScheduledEvent {
276                        tag: event.tag,
277                        reactions,
278                        terminal: event.terminal,
279                    });
280                }
281                Err(RecvTimeoutError::Disconnected) => {
282                    let remaining: Option<Duration> = target.checked_duration_since(Instant::now());
283                    if let Some(remaining) = remaining {
284                        tracing::debug!(remaining = ?remaining,
285                            "Sleep interrupted disconnect, sleeping for remaining",
286                        );
287                        std::thread::sleep(remaining);
288                    }
289                }
290                Err(RecvTimeoutError::Timeout) => {}
291            }
292        }
293
294        if now > target {
295            let delay = now - target;
296            tracing::warn!(delay = ?delay, "running late");
297        }
298
299        None
300    }
301
302    /// Process the reactions at this tag in increasing order of level.
303    ///
304    /// Reactions at a level N may trigger further reactions at levels M>N
305    #[tracing::instrument(skip(self, reaction_view), fields(tag = %tag))]
306    pub fn process_tag(&mut self, tag: Tag, reaction_view: KeySetView<ReactionKey>) {
307        reaction_view.for_each_level(|level, reaction_keys, next_levels| {
308            tracing::trace!(level=?level, "Iter");
309
310            // Safety: reaction_keys in the same level are guaranteed to be independent of each other.
311            let iter_ctx = unsafe { self.store.iter_borrow_storage(reaction_keys) };
312
313            #[cfg(feature = "parallel")]
314            use rayon::prelude::ParallelIterator;
315
316            #[cfg(feature = "parallel")]
317            let iter_ctx = rayon::prelude::ParallelBridge::par_bridge(iter_ctx);
318
319            let iter_ctx_res = iter_ctx.map(|trigger_ctx| {
320                let ReactionTriggerCtx {
321                    context,
322                    reaction,
323                    reactor,
324                    actions,
325                    ref_ports,
326                    mut_ports,
327                } = trigger_ctx;
328
329                tracing::trace!(
330                    "    Executing {reactor_name}/{reaction_name}.",
331                    reaction_name = reaction.get_name(),
332                    reactor_name = reactor.get_name()
333                );
334
335                context.reset_for_reaction(tag);
336
337                reaction.trigger(context, reactor, actions, ref_ports, mut_ports);
338
339                &context.trigger_res
340            });
341
342            #[cfg(feature = "parallel")]
343            let iter_ctx_res = iter_ctx_res.collect::<Vec<_>>();
344
345            for res in iter_ctx_res {
346                if let Some(shutdown_tag) = res.scheduled_shutdown {
347                    // if the new shutdown tag is earlier than the current shutdown tag, update the shutdown tag and
348                    // schedule a shutdown event
349                    if self.shutdown_tag.map(|t| shutdown_tag < t).unwrap_or(true) {
350                        self.shutdown_tag = Some(shutdown_tag);
351                        self.events.push_event(
352                            shutdown_tag,
353                            self.reaction_graph.shutdown_reactions.iter().copied(),
354                            true,
355                        );
356                    }
357                }
358
359                // Submit events to the event queue for all scheduled actions
360                for &(action_key, tag) in res.scheduled_actions.iter() {
361                    let downstream = self.reaction_graph.action_triggers[action_key]
362                        .iter()
363                        .copied();
364                    self.events.push_event(tag, downstream, false);
365                }
366            }
367
368            // Collect all the reactions that are triggered by the ports
369            let downstream = self
370                .store
371                .iter_set_port_keys()
372                .flat_map(|port_key| self.reaction_graph.port_triggers[port_key].iter());
373
374            if let Some(mut next_levels) = next_levels {
375                next_levels.extend_above(downstream.copied());
376            }
377        });
378
379        self.store.reset_ports();
380    }
381
382    /// Consume the scheduler and return the `Env` instance.
383    ///
384    /// This method is useful for testing purposes, as it allows the caller to inspect reactor states after the
385    /// scheduler has been run.
386    pub fn into_env(self) -> Env {
387        self.store.into_env()
388    }
389}