dora_node_api/event_stream/scheduler.rs
1use std::collections::{HashMap, VecDeque};
2
3use dora_message::{daemon_to_node::NodeEvent, id::DataId};
4
5use super::thread::EventItem;
6pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event";
7
8/// This scheduler will make sure that there is fairness between inputs.
9///
10/// The scheduler reorders events in the following way:
11///
12/// - **Non-input events are prioritized**
13///
14/// If the node received any events that are not input events, they are returned first. The
15/// intention of this reordering is that the nodes can react quickly to dataflow-related events
16/// even when their input queues are very full.
17///
18/// This reordering has some side effects that might be unexpected:
19/// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last
20/// input events of that ID.
21///
22/// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs
23/// of a certain ID. This invariant does not hold anymore for a scheduled event stream.
24/// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore.
25///
26/// Usually, the `Stop` event is the last event that is sent to a node before the event stream
27/// is closed. Because of the reordering, the stream might return more events after a `Stop`
28/// event.
29/// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**.
30///
31/// The scheduler keeps a separate queue for each input ID, where the incoming input events are
32/// placed in their chronological order. When yielding the next event, the scheduler iterates over
33/// these queues in least-recently used order. This means that the queue corresponding to the
34/// last yielded event will be checked last. The scheduler will return the oldest event from the
35/// first non-empty queue.
36///
37/// The side effect of this change is that inputs events of different IDs are no longer in their
38/// chronological order. This might lead to unexpected results for input events that are caused by
39/// each other.
40///
41/// ## Example 1
42/// Consider the case that one input has a very high frequency and another one with a very slow
43/// frequency. The event stream will always alternate between the two inputs when each input is
44/// available.
45/// Without the scheduling, the high-frequency input would be returned much more often.
46///
47/// ## Example 2
48/// Again, let's consider the case that one input has a very high frequency and the other has a
49/// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency
50/// input, but a large queue size for the high-frequency one.
51/// Using the scheduler, the event stream will always alternate between high and low-frequency
52/// inputs as long as inputs of both types are available.
53///
54/// Without scheduling, the low-frequency input might never be yielded before
55/// it's dropped because there is almost always an older high-frequency input available that is
56/// yielded first. Once the low-frequency input would be the next one chronologically, it might
57/// have been dropped already because the node received newer low-frequency inputs in the
58/// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency
59/// input again.
60///
61/// ## Example 3
62/// Consider a high-frequency camera input and a low-frequency bounding box input, which is based
63/// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera
64/// input and a small queue size for the bounding box input.
65///
66/// With scheduling, the number of
67/// buffered camera inputs might grow over time. As a result the camera inputs yielded from the
68/// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So
69/// the node receives an up-to-date bounding box, but a considerably outdated image.
70///
71/// Without scheduling, the events are returned in chronological order. This time, the bounding
72/// box might be slightly outdated if the camera sent new images before the bounding box was
73/// ready. However, the time difference between the two input types is independent of the
74/// queue size this time.
75///
76/// (If a perfect matching bounding box is required, we recommend to forward the input image as
77/// part of the bounding box output. This way, the receiving node only needs to subscribe to one
78/// input so no mismatches can happen.)
79#[derive(Debug)]
80pub struct Scheduler {
81 /// Tracks the last-used event ID
82 last_used: VecDeque<DataId>,
83 /// Tracks events per ID
84 event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>,
85}
86
87impl Scheduler {
88 pub(crate) fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
89 let topic = VecDeque::from_iter(
90 event_queues
91 .keys()
92 .filter(|t| **t != DataId::from(NON_INPUT_EVENT.to_string()))
93 .cloned(),
94 );
95 Self {
96 last_used: topic,
97 event_queues,
98 }
99 }
100
101 pub(crate) fn add_event(&mut self, event: EventItem) {
102 let event_id = match &event {
103 EventItem::NodeEvent {
104 event:
105 NodeEvent::Input {
106 id,
107 metadata: _,
108 data: _,
109 },
110 ack_channel: _,
111 } => id,
112 _ => &DataId::from(NON_INPUT_EVENT.to_string()),
113 };
114
115 // Enforce queue size limit
116 if let Some((size, queue)) = self.event_queues.get_mut(event_id) {
117 // Remove the oldest event if at limit
118 if &queue.len() >= size {
119 tracing::debug!("Discarding event for input `{event_id}` due to queue size limit");
120 queue.pop_front();
121 }
122 queue.push_back(event);
123 } else {
124 unimplemented!("Received an event that was not in the definition event id description.")
125 }
126 }
127
128 pub(crate) fn next(&mut self) -> Option<EventItem> {
129 // Retrieve message from the non input event first that have priority over input message.
130 if let Some((_size, queue)) = self
131 .event_queues
132 .get_mut(&DataId::from(NON_INPUT_EVENT.to_string()))
133 {
134 if let Some(event) = queue.pop_front() {
135 return Some(event);
136 }
137 }
138
139 // Process the ID with the oldest timestamp using BTreeMap Ordering
140 for (index, id) in self.last_used.clone().iter().enumerate() {
141 if let Some((_size, queue)) = self.event_queues.get_mut(id) {
142 if let Some(event) = queue.pop_front() {
143 // Put last used at last
144 self.last_used.remove(index);
145 self.last_used.push_back(id.clone());
146 return Some(event);
147 }
148 }
149 }
150
151 None
152 }
153
154 pub(crate) fn is_empty(&self) -> bool {
155 self.event_queues
156 .iter()
157 .all(|(_id, (_size, queue))| queue.is_empty())
158 }
159}