dora_node_api/event_stream/
scheduler.rs

1use std::collections::{HashMap, VecDeque};
2
3use dora_message::{daemon_to_node::NodeEvent, id::DataId};
4
5use super::thread::EventItem;
6pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event";
7
8/// This scheduler will make sure that there is fairness between inputs.
9///
10/// The scheduler reorders events in the following way:
11///
12/// - **Non-input events are prioritized**
13///   
14///   If the node received any events that are not input events, they are returned first. The
15///   intention of this reordering is that the nodes can react quickly to dataflow-related events
16///   even when their input queues are very full.
17///   
18///   This reordering has some side effects that might be unexpected:
19///   - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last
20///     input events of that ID.
21///     
22///     Usually, an `InputClosed` event indicates that there won't be any subsequent inputs
23///     of a certain ID. This invariant does not hold anymore for a scheduled event stream.
24///   - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore.
25///     
26///     Usually, the `Stop` event is the last event that is sent to a node before the event stream
27///     is closed. Because of the reordering, the stream might return more events after a `Stop`
28///     event.
29/// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**.
30///
31///   The scheduler keeps a separate queue for each input ID, where the incoming input events are
32///   placed in their chronological order. When yielding the next event, the scheduler iterates over
33///   these queues in least-recently used order. This means that the queue corresponding to the
34///   last yielded event will be checked last. The scheduler will return the oldest event from the
35///   first non-empty queue.
36///
37///   The side effect of this change is that inputs events of different IDs are no longer in their
38///   chronological order. This might lead to unexpected results for input events that are caused by
39///   each other.
40///
41/// ## Example 1
42/// Consider the case that one input has a very high frequency and another one with a very slow
43/// frequency. The event stream will always alternate between the two inputs when each input is
44/// available.
45/// Without the scheduling, the high-frequency input would be returned much more often.
46///
47/// ## Example 2
48/// Again, let's consider the case that one input has a very high frequency and the other has a
49/// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency
50/// input, but a large queue size for the high-frequency one.
51/// Using the scheduler, the event stream will always alternate between high and low-frequency
52/// inputs as long as inputs of both types are available.
53///
54/// Without scheduling, the low-frequency input might never be yielded before
55/// it's dropped because there is almost always an older high-frequency input available that is
56/// yielded first. Once the low-frequency input would be the next one chronologically, it might
57/// have been dropped already because the node received newer low-frequency inputs in the
58/// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency
59/// input again.
60///
61/// ## Example 3
62/// Consider a high-frequency camera input and a low-frequency bounding box input, which is based
63/// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera
64/// input and a small queue size for the bounding box input.
65///
66/// With scheduling, the number of
67/// buffered camera inputs might grow over time. As a result the camera inputs yielded from the
68/// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So
69/// the node receives an up-to-date bounding box, but a considerably outdated image.
70///
71/// Without scheduling, the events are returned in chronological order. This time, the bounding
72/// box might be slightly outdated if the camera sent new images before the bounding box was
73/// ready. However, the time difference between the two input types is independent of the
74/// queue size this time.
75///
76/// (If a perfect matching bounding box is required, we recommend to forward the input image as
77/// part of the bounding box output. This way, the receiving node only needs to subscribe to one
78/// input so no mismatches can happen.)
79#[derive(Debug)]
80pub struct Scheduler {
81    /// Tracks the last-used event ID
82    last_used: VecDeque<DataId>,
83    /// Tracks events per ID
84    event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>,
85}
86
87impl Scheduler {
88    pub(crate) fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
89        let topic = VecDeque::from_iter(
90            event_queues
91                .keys()
92                .filter(|t| **t != DataId::from(NON_INPUT_EVENT.to_string()))
93                .cloned(),
94        );
95        Self {
96            last_used: topic,
97            event_queues,
98        }
99    }
100
101    pub(crate) fn add_event(&mut self, event: EventItem) {
102        let event_id = match &event {
103            EventItem::NodeEvent {
104                event:
105                    NodeEvent::Input {
106                        id,
107                        metadata: _,
108                        data: _,
109                    },
110                ack_channel: _,
111            } => id,
112            _ => &DataId::from(NON_INPUT_EVENT.to_string()),
113        };
114
115        // Enforce queue size limit
116        if let Some((size, queue)) = self.event_queues.get_mut(event_id) {
117            // Remove the oldest event if at limit
118            if &queue.len() >= size {
119                tracing::debug!("Discarding event for input `{event_id}` due to queue size limit");
120                queue.pop_front();
121            }
122            queue.push_back(event);
123        } else {
124            unimplemented!("Received an event that was not in the definition event id description.")
125        }
126    }
127
128    pub(crate) fn next(&mut self) -> Option<EventItem> {
129        // Retrieve message from the non input event first that have priority over input message.
130        if let Some((_size, queue)) = self
131            .event_queues
132            .get_mut(&DataId::from(NON_INPUT_EVENT.to_string()))
133        {
134            if let Some(event) = queue.pop_front() {
135                return Some(event);
136            }
137        }
138
139        // Process the ID with the oldest timestamp using BTreeMap Ordering
140        for (index, id) in self.last_used.clone().iter().enumerate() {
141            if let Some((_size, queue)) = self.event_queues.get_mut(id) {
142                if let Some(event) = queue.pop_front() {
143                    // Put last used at last
144                    self.last_used.remove(index);
145                    self.last_used.push_back(id.clone());
146                    return Some(event);
147                }
148            }
149        }
150
151        None
152    }
153
154    pub(crate) fn is_empty(&self) -> bool {
155        self.event_queues
156            .iter()
157            .all(|(_id, (_size, queue))| queue.is_empty())
158    }
159}