Skip to main content

frp_engine/
scheduler.rs

1//! Edge scheduler — tracks which edges should fire and when.
2
3use std::collections::{HashMap, VecDeque};
4use std::time::Duration;
5
6use frp_domain::{EdgeSchedule, HyperEdge};
7use frp_plexus::{EdgeId, PortId};
8
9/// Tracks scheduling state for all registered edges and accumulates a queue of
10/// edge ids that are ready to execute.
11#[derive(Default)]
12pub struct Scheduler {
13    /// Edges that fire whenever one of their source ports changes.
14    on_change_edges: Vec<(EdgeId, Vec<PortId>)>,
15    /// Edges that fire on a timer: `(id, interval, elapsed_since_last_fire)`.
16    on_tick_edges: Vec<(EdgeId, Duration, Duration)>,
17    /// Edges that fire on a named event.
18    on_event_edges: HashMap<String, Vec<EdgeId>>,
19    /// Queue of edge ids ready to run (may contain duplicates; deduped on drain).
20    pending: VecDeque<EdgeId>,
21}
22
23impl Scheduler {
24    /// Create an empty scheduler.
25    pub fn new() -> Self {
26        Scheduler::default()
27    }
28
29    /// Register an edge with the scheduler.
30    ///
31    /// The scheduling policy is taken from the edge's [`EdgeSchedule`].
32    pub fn register(&mut self, edge: &HyperEdge) {
33        match &edge.schedule {
34            EdgeSchedule::OnChange => {
35                self.on_change_edges
36                    .push((edge.id, edge.sources.clone()));
37            }
38            EdgeSchedule::OnTick(interval) => {
39                self.on_tick_edges.push((edge.id, *interval, Duration::ZERO));
40            }
41            EdgeSchedule::OnEvent(name) => {
42                self.on_event_edges
43                    .entry(name.clone())
44                    .or_default()
45                    .push(edge.id);
46            }
47        }
48    }
49
50    /// Notify the scheduler that `port` has a new value.
51    ///
52    /// All `OnChange` edges whose source list includes `port` are enqueued.
53    pub fn notify_change(&mut self, port: PortId) {
54        let pending = &mut self.pending;
55        for (id, sources) in &self.on_change_edges {
56            if sources.contains(&port) {
57                pending.push_back(*id);
58            }
59        }
60    }
61
62    /// Advance time by `delta`.  All `OnTick` edges whose interval has elapsed
63    /// are enqueued (and their elapsed counter resets to zero).
64    pub fn tick(&mut self, delta: Duration) {
65        for (id, interval, elapsed) in &mut self.on_tick_edges {
66            *elapsed += delta;
67            while *elapsed >= *interval {
68                *elapsed -= *interval;
69                self.pending.push_back(*id);
70            }
71        }
72    }
73
74    /// Fire a named event, enqueuing all edges registered for that event name.
75    pub fn fire_event(&mut self, name: &str) {
76        if let Some(ids) = self.on_event_edges.get(name) {
77            for &id in ids {
78                self.pending.push_back(id);
79            }
80        }
81    }
82
83    /// Drain the pending queue, returning edge ids in FIFO order.
84    ///
85    /// Duplicate ids are deduplicated while preserving first-occurrence order.
86    pub fn drain_pending(&mut self) -> Vec<EdgeId> {
87        let mut seen = std::collections::HashSet::new();
88        let mut result = Vec::new();
89        while let Some(id) = self.pending.pop_front() {
90            if seen.insert(id) {
91                result.push(id);
92            }
93        }
94        result
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use frp_domain::{EdgeSchedule, EdgeTransform, HyperEdge};
102    use frp_plexus::{EdgeId, PortId};
103
104    fn on_change_edge(id: u64, sources: &[u64]) -> HyperEdge {
105        HyperEdge::new(
106            EdgeId::new(id),
107            sources.iter().map(|&p| PortId::new(p)).collect(),
108            vec![],
109            EdgeTransform::PassThrough,
110            EdgeSchedule::OnChange,
111        )
112    }
113
114    fn on_tick_edge(id: u64, ms: u64) -> HyperEdge {
115        HyperEdge::new(
116            EdgeId::new(id),
117            vec![],
118            vec![],
119            EdgeTransform::PassThrough,
120            EdgeSchedule::OnTick(Duration::from_millis(ms)),
121        )
122    }
123
124    fn on_event_edge(id: u64, name: &str) -> HyperEdge {
125        HyperEdge::new(
126            EdgeId::new(id),
127            vec![],
128            vec![],
129            EdgeTransform::PassThrough,
130            EdgeSchedule::OnEvent(name.to_string()),
131        )
132    }
133
134    #[test]
135    fn on_change_fires_when_source_changes() {
136        let mut sched = Scheduler::new();
137        sched.register(&on_change_edge(1, &[10, 20]));
138
139        sched.notify_change(PortId::new(10));
140        let pending = sched.drain_pending();
141        assert_eq!(pending, vec![EdgeId::new(1)]);
142    }
143
144    #[test]
145    fn on_change_does_not_fire_for_unrelated_port() {
146        let mut sched = Scheduler::new();
147        sched.register(&on_change_edge(1, &[10]));
148
149        sched.notify_change(PortId::new(99));
150        assert!(sched.drain_pending().is_empty());
151    }
152
153    #[test]
154    fn on_tick_fires_after_interval() {
155        let mut sched = Scheduler::new();
156        sched.register(&on_tick_edge(2, 100));
157
158        sched.tick(Duration::from_millis(50));
159        assert!(sched.drain_pending().is_empty());
160
161        sched.tick(Duration::from_millis(60));
162        let pending = sched.drain_pending();
163        assert_eq!(pending, vec![EdgeId::new(2)]);
164    }
165
166    #[test]
167    fn on_tick_fires_multiple_times_if_overshot() {
168        let mut sched = Scheduler::new();
169        sched.register(&on_tick_edge(3, 100));
170
171        sched.tick(Duration::from_millis(250));
172        let pending = sched.drain_pending();
173        // The edge is enqueued twice (250ms / 100ms = 2 intervals) but
174        // drain_pending deduplicates, so it appears once in the result.
175        assert_eq!(pending.len(), 1);
176        assert_eq!(pending[0], EdgeId::new(3));
177    }
178
179    #[test]
180    fn on_event_fires_on_matching_name() {
181        let mut sched = Scheduler::new();
182        sched.register(&on_event_edge(4, "click"));
183
184        sched.fire_event("click");
185        let pending = sched.drain_pending();
186        assert_eq!(pending, vec![EdgeId::new(4)]);
187    }
188
189    #[test]
190    fn on_event_does_not_fire_for_wrong_name() {
191        let mut sched = Scheduler::new();
192        sched.register(&on_event_edge(4, "click"));
193
194        sched.fire_event("hover");
195        assert!(sched.drain_pending().is_empty());
196    }
197
198    #[test]
199    fn drain_deduplicates() {
200        let mut sched = Scheduler::new();
201        sched.register(&on_change_edge(1, &[10]));
202
203        sched.notify_change(PortId::new(10));
204        sched.notify_change(PortId::new(10));
205        let pending = sched.drain_pending();
206        assert_eq!(pending, vec![EdgeId::new(1)]);
207    }
208}