1use std::collections::{HashMap, VecDeque};
4use std::time::Duration;
5
6use frp_domain::{EdgeSchedule, HyperEdge};
7use frp_plexus::{EdgeId, PortId};
8
9#[derive(Default)]
12pub struct Scheduler {
13 on_change_edges: Vec<(EdgeId, Vec<PortId>)>,
15 on_tick_edges: Vec<(EdgeId, Duration, Duration)>,
17 on_event_edges: HashMap<String, Vec<EdgeId>>,
19 pending: VecDeque<EdgeId>,
21}
22
23impl Scheduler {
24 pub fn new() -> Self {
26 Scheduler::default()
27 }
28
29 pub fn register(&mut self, edge: &HyperEdge) {
33 match &edge.schedule {
34 EdgeSchedule::OnChange => {
35 self.on_change_edges
36 .push((edge.id, edge.sources.clone()));
37 }
38 EdgeSchedule::OnTick(interval) => {
39 self.on_tick_edges.push((edge.id, *interval, Duration::ZERO));
40 }
41 EdgeSchedule::OnEvent(name) => {
42 self.on_event_edges
43 .entry(name.clone())
44 .or_default()
45 .push(edge.id);
46 }
47 }
48 }
49
50 pub fn notify_change(&mut self, port: PortId) {
54 let pending = &mut self.pending;
55 for (id, sources) in &self.on_change_edges {
56 if sources.contains(&port) {
57 pending.push_back(*id);
58 }
59 }
60 }
61
62 pub fn tick(&mut self, delta: Duration) {
65 for (id, interval, elapsed) in &mut self.on_tick_edges {
66 *elapsed += delta;
67 while *elapsed >= *interval {
68 *elapsed -= *interval;
69 self.pending.push_back(*id);
70 }
71 }
72 }
73
74 pub fn fire_event(&mut self, name: &str) {
76 if let Some(ids) = self.on_event_edges.get(name) {
77 for &id in ids {
78 self.pending.push_back(id);
79 }
80 }
81 }
82
83 pub fn drain_pending(&mut self) -> Vec<EdgeId> {
87 let mut seen = std::collections::HashSet::new();
88 let mut result = Vec::new();
89 while let Some(id) = self.pending.pop_front() {
90 if seen.insert(id) {
91 result.push(id);
92 }
93 }
94 result
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101 use frp_domain::{EdgeSchedule, EdgeTransform, HyperEdge};
102 use frp_plexus::{EdgeId, PortId};
103
104 fn on_change_edge(id: u64, sources: &[u64]) -> HyperEdge {
105 HyperEdge::new(
106 EdgeId::new(id),
107 sources.iter().map(|&p| PortId::new(p)).collect(),
108 vec![],
109 EdgeTransform::PassThrough,
110 EdgeSchedule::OnChange,
111 )
112 }
113
114 fn on_tick_edge(id: u64, ms: u64) -> HyperEdge {
115 HyperEdge::new(
116 EdgeId::new(id),
117 vec![],
118 vec![],
119 EdgeTransform::PassThrough,
120 EdgeSchedule::OnTick(Duration::from_millis(ms)),
121 )
122 }
123
124 fn on_event_edge(id: u64, name: &str) -> HyperEdge {
125 HyperEdge::new(
126 EdgeId::new(id),
127 vec![],
128 vec![],
129 EdgeTransform::PassThrough,
130 EdgeSchedule::OnEvent(name.to_string()),
131 )
132 }
133
134 #[test]
135 fn on_change_fires_when_source_changes() {
136 let mut sched = Scheduler::new();
137 sched.register(&on_change_edge(1, &[10, 20]));
138
139 sched.notify_change(PortId::new(10));
140 let pending = sched.drain_pending();
141 assert_eq!(pending, vec![EdgeId::new(1)]);
142 }
143
144 #[test]
145 fn on_change_does_not_fire_for_unrelated_port() {
146 let mut sched = Scheduler::new();
147 sched.register(&on_change_edge(1, &[10]));
148
149 sched.notify_change(PortId::new(99));
150 assert!(sched.drain_pending().is_empty());
151 }
152
153 #[test]
154 fn on_tick_fires_after_interval() {
155 let mut sched = Scheduler::new();
156 sched.register(&on_tick_edge(2, 100));
157
158 sched.tick(Duration::from_millis(50));
159 assert!(sched.drain_pending().is_empty());
160
161 sched.tick(Duration::from_millis(60));
162 let pending = sched.drain_pending();
163 assert_eq!(pending, vec![EdgeId::new(2)]);
164 }
165
166 #[test]
167 fn on_tick_fires_multiple_times_if_overshot() {
168 let mut sched = Scheduler::new();
169 sched.register(&on_tick_edge(3, 100));
170
171 sched.tick(Duration::from_millis(250));
172 let pending = sched.drain_pending();
173 assert_eq!(pending.len(), 1);
176 assert_eq!(pending[0], EdgeId::new(3));
177 }
178
179 #[test]
180 fn on_event_fires_on_matching_name() {
181 let mut sched = Scheduler::new();
182 sched.register(&on_event_edge(4, "click"));
183
184 sched.fire_event("click");
185 let pending = sched.drain_pending();
186 assert_eq!(pending, vec![EdgeId::new(4)]);
187 }
188
189 #[test]
190 fn on_event_does_not_fire_for_wrong_name() {
191 let mut sched = Scheduler::new();
192 sched.register(&on_event_edge(4, "click"));
193
194 sched.fire_event("hover");
195 assert!(sched.drain_pending().is_empty());
196 }
197
198 #[test]
199 fn drain_deduplicates() {
200 let mut sched = Scheduler::new();
201 sched.register(&on_change_edge(1, &[10]));
202
203 sched.notify_change(PortId::new(10));
204 sched.notify_change(PortId::new(10));
205 let pending = sched.drain_pending();
206 assert_eq!(pending, vec![EdgeId::new(1)]);
207 }
208}