dora_node_api/event_stream/scheduler.rs
1use std::collections::{HashMap, VecDeque};
2
3use dora_message::{daemon_to_node::NodeEvent, id::DataId};
4
5use super::thread::EventItem;
6pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event";
7
8/// This scheduler will make sure that there is fairness between inputs.
9///
10/// The scheduler reorders events in the following way:
11///
12/// - **Non-input events are prioritized**
13///
14/// If the node received any events that are not input events, they are returned first. The
15/// intention of this reordering is that the nodes can react quickly to dataflow-related events
16/// even when their input queues are very full.
17///
18/// This reordering has some side effects that might be unexpected:
19/// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last
20/// input events of that ID.
21///
22/// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs
23/// of a certain ID. This invariant does not hold anymore for a scheduled event stream.
24/// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore.
25///
26/// Usually, the `Stop` event is the last event that is sent to a node before the event stream
27/// is closed. Because of the reordering, the stream might return more events after a `Stop`
28/// event.
29/// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**.
30///
31/// The scheduler keeps a separate queue for each input ID, where the incoming input events are
32/// placed in their chronological order. When yielding the next event, the scheduler iterates over
33/// these queues in least-recently used order. This means that the queue corresponding to the
34/// last yielded event will be checked last. The scheduler will return the oldest event from the
35/// first non-empty queue.
36///
37/// The side effect of this change is that inputs events of different IDs are no longer in their
38/// chronological order. This might lead to unexpected results for input events that are caused by
39/// each other.
40///
41/// ## Example 1
42/// Consider the case that one input has a very high frequency and another one with a very slow
43/// frequency. The event stream will always alternate between the two inputs when each input is
44/// available.
45/// Without the scheduling, the high-frequency input would be returned much more often.
46///
47/// ## Example 2
48/// Again, let's consider the case that one input has a very high frequency and the other has a
49/// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency
50/// input, but a large queue size for the high-frequency one.
51/// Using the scheduler, the event stream will always alternate between high and low-frequency
52/// inputs as long as inputs of both types are available.
53///
54/// Without scheduling, the low-frequency input might never be yielded before
55/// it's dropped because there is almost always an older high-frequency input available that is
56/// yielded first. Once the low-frequency input would be the next one chronologically, it might
57/// have been dropped already because the node received newer low-frequency inputs in the
58/// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency
59/// input again.
60///
61/// ## Example 3
62/// Consider a high-frequency camera input and a low-frequency bounding box input, which is based
63/// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera
64/// input and a small queue size for the bounding box input.
65///
66/// With scheduling, the number of
67/// buffered camera inputs might grow over time. As a result the camera inputs yielded from the
68/// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So
69/// the node receives an up-to-date bounding box, but a considerably outdated image.
70///
71/// Without scheduling, the events are returned in chronological order. This time, the bounding
72/// box might be slightly outdated if the camera sent new images before the bounding box was
73/// ready. However, the time difference between the two input types is independent of the
74/// queue size this time.
75///
76/// (If a perfect matching bounding box is required, we recommend to forward the input image as
77/// part of the bounding box output. This way, the receiving node only needs to subscribe to one
78/// input so no mismatches can happen.)
79#[derive(Debug)]
80pub struct Scheduler {
81 /// Tracks the last-used event ID
82 last_used: VecDeque<DataId>,
83 /// Tracks events per ID
84 event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>,
85}
86
87impl Scheduler {
88 pub(crate) fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
89 let topic = VecDeque::from_iter(
90 event_queues
91 .keys()
92 .filter(|t| **t != DataId::from(NON_INPUT_EVENT.to_string()))
93 .cloned(),
94 );
95 Self {
96 last_used: topic,
97 event_queues,
98 }
99 }
100
101 pub(crate) fn add_event(&mut self, event: EventItem) {
102 let event_id = match &event {
103 EventItem::NodeEvent {
104 event:
105 NodeEvent::Input {
106 id,
107 metadata: _,
108 data: _,
109 },
110 ack_channel: _,
111 } => id,
112 _ => &DataId::from(NON_INPUT_EVENT.to_string()),
113 };
114
115 // Enforce queue size limit
116 let (size, queue) = self
117 .event_queues
118 .entry(event_id.clone())
119 .or_insert_with(|| {
120 self.last_used.push_back(event_id.clone());
121 (1, Default::default())
122 });
123
124 // Remove the oldest event if at limit
125 if &queue.len() >= size {
126 tracing::debug!("Discarding event for input `{event_id}` due to queue size limit");
127 queue.pop_front();
128 }
129 queue.push_back(event);
130 }
131
132 pub(crate) fn next(&mut self) -> Option<EventItem> {
133 // Retrieve message from the non input event first that have priority over input message.
134 if let Some((_size, queue)) = self
135 .event_queues
136 .get_mut(&DataId::from(NON_INPUT_EVENT.to_string()))
137 {
138 if let Some(event) = queue.pop_front() {
139 return Some(event);
140 }
141 }
142
143 // Process the ID with the oldest timestamp using BTreeMap Ordering
144 for (index, id) in self.last_used.clone().iter().enumerate() {
145 if let Some((_size, queue)) = self.event_queues.get_mut(id) {
146 if let Some(event) = queue.pop_front() {
147 // Put last used at last
148 self.last_used.remove(index);
149 self.last_used.push_back(id.clone());
150 return Some(event);
151 }
152 }
153 }
154
155 None
156 }
157
158 pub(crate) fn is_empty(&self) -> bool {
159 self.event_queues
160 .iter()
161 .all(|(_id, (_size, queue))| queue.is_empty())
162 }
163}