bevy_event_priority/
lib.rs

1use bevy::ecs::system::SystemParam;
2use bevy::prelude::*;
3use std::marker::PhantomData;
4use std::sync::atomic::Ordering::Relaxed;
5use std::{collections::BinaryHeap, sync::atomic::AtomicU32};
6
7pub trait PriorityEvent: Send + Sync + 'static {}
8impl<E: Send + Sync + 'static> PriorityEvent for E {}
9
10#[derive(Debug)]
11struct EventInstance<E> {
12    prio: u32,
13    event_id: u32,
14    event: E,
15}
16
17impl<E> EventInstance<E> {
18    fn new(event: E, prio: u32) -> Self {
19        static COUNTER: AtomicU32 = AtomicU32::new(0);
20
21        Self {
22            prio,
23            event_id: COUNTER.fetch_add(1, Relaxed),
24            event,
25        }
26    }
27}
28
29impl<E> PartialEq for EventInstance<E> {
30    fn eq(&self, other: &Self) -> bool {
31        self.prio == other.prio && self.event_id == other.event_id
32    }
33}
34impl<E> Eq for EventInstance<E> {}
35
36impl<E> Ord for EventInstance<E> {
37    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
38        match self.prio.cmp(&other.prio) {
39            std::cmp::Ordering::Equal => self.event_id.cmp(&other.event_id),
40            v => v,
41        }
42        .reverse()
43    }
44}
45impl<E> PartialOrd for EventInstance<E> {
46    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51impl<E: Clone> Clone for EventInstance<E> {
52    fn clone(&self) -> Self {
53        Self {
54            prio: self.prio,
55            event_id: self.event_id,
56            event: self.event.clone(),
57        }
58    }
59}
60
61/// An event priority queue.
62/// Used when the ordering of events should be influenced by other factors.
63/// This implementation does NOT provide double buffering.
64/// Writers and readers are expected to remove events as soon as they are read,
65/// this implies a one to one mapping between events and event handlers.
66#[derive(Debug, Resource)]
67pub struct PriorityEvents<E> {
68    events: BinaryHeap<EventInstance<E>>,
69}
70
71impl<E> Default for PriorityEvents<E> {
72    fn default() -> Self {
73        Self {
74            events: BinaryHeap::new(),
75        }
76    }
77}
78
79#[derive(SystemParam)]
80pub struct PriorityEventReader<'w, 's, E: PriorityEvent> {
81    events: ResMut<'w, PriorityEvents<E>>,
82    #[system_param(ignore)]
83    marker: PhantomData<&'s usize>,
84}
85
86pub struct PriorityIterator<'w, E: PriorityEvent> {
87    min: u32,
88    max: u32,
89    events: &'w mut PriorityEvents<E>,
90}
91
92impl<E: PriorityEvent> Iterator for PriorityIterator<'_, E> {
93    type Item = E;
94
95    fn next(&mut self) -> Option<Self::Item> {
96        while let Some(e) = self.events.events.peek() {
97            if e.prio > self.min {
98                return None;
99            } else if e.prio < self.max {
100                // discard events which should have already run
101                self.events.events.pop();
102            } else {
103                break;
104            };
105        }
106
107        self.events.events.pop().map(|e| e.event)
108    }
109}
110
111impl<E: PriorityEvent> PriorityEventReader<'_, '_, E> {
112    /// Iterates over events this reader has not seen yet, while also clearing them.
113    /// Will not remove any events of priority lower than min (0 is highest, inf is lowest)
114    /// but will discard events of higher priority
115    /// i.e. will handle events in the priority range [min,max] (inclusive)
116    pub fn iter_prio_range(&mut self, max: u32, min: u32) -> impl Iterator<Item = E> + '_ {
117        PriorityIterator {
118            min,
119            max,
120            events: self.events.as_mut(),
121        }
122    }
123
124    /// Determines the number of events available to be read, without consuming any
125    pub fn len(&self) -> usize {
126        self.events.events.len()
127    }
128
129    /// Determines if there are any events to be read, without consuming any.
130    pub fn is_empty(&self) -> bool {
131        self.len() == 0
132    }
133}
134
135#[derive(SystemParam)]
136pub struct PriorityEventWriter<'w, 's, E: PriorityEvent> {
137    events: ResMut<'w, PriorityEvents<E>>,
138
139    #[system_param(ignore)]
140    marker: PhantomData<&'s usize>,
141}
142
143impl<E: PriorityEvent> PriorityEventWriter<'_, '_, E> {
144    pub fn send(&mut self, event: E, prio: u32) {
145        self.events.events.push(EventInstance::new(event, prio));
146    }
147
148    pub fn send_batch(&mut self, events: impl Iterator<Item = E>, prio: u32) {
149        self.events
150            .events
151            .extend(events.map(|v| EventInstance::new(v, prio)))
152    }
153
154    pub fn send_default(&mut self, prio: u32)
155    where
156        E: Default,
157    {
158        self.events
159            .events
160            .push(EventInstance::new(E::default(), prio))
161    }
162}
163
164/// a convenience for initialising prioritised event types
165pub trait AddPriorityEvent {
166    fn add_priority_event<E: PriorityEvent>(&mut self) -> &mut Self;
167}
168
169impl AddPriorityEvent for App {
170    fn add_priority_event<E: PriorityEvent>(&mut self) -> &mut Self {
171        self.init_resource::<PriorityEvents<E>>();
172
173        self
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use bevy::{ecs::system::SystemState, prelude::World};
180
181    use super::*;
182
183    #[derive(Copy, Clone, PartialEq, Eq, Debug)]
184    struct TestEvent(usize);
185
186    fn collect_events<E: Copy>(events: BinaryHeap<EventInstance<E>>) -> Vec<E> {
187        events
188            .into_sorted_vec()
189            .iter()
190            .map(|e| e.event)
191            .rev()
192            .collect()
193    }
194
195    #[test]
196    fn test_events() {
197        let mut world = World::new();
198        let mut state_writer: SystemState<PriorityEventWriter<TestEvent>> =
199            SystemState::new(&mut world);
200        let mut state_reader: SystemState<PriorityEventReader<TestEvent>> =
201            SystemState::new(&mut world);
202
203        world.init_resource::<PriorityEvents<TestEvent>>();
204
205        // stage 1
206
207        {
208            let mut w = state_writer.get_mut(&mut world);
209
210            // system writes three events, out of order
211            w.send(TestEvent(0), 5);
212            w.send(TestEvent(1), 1);
213            w.send(TestEvent(2), 0);
214        }
215
216        // events are send and ordered in decreasing priority order
217        assert_eq!(
218            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
219            vec![TestEvent(2), TestEvent(1), TestEvent(0)]
220        );
221
222        // stage 2
223
224        {
225            let mut w = state_reader.get_mut(&mut world);
226
227            // system reads only top event
228            w.iter_prio_range(0, 0).for_each(drop);
229        }
230
231        // first event is consumed immediately
232        assert_eq!(
233            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
234            vec![TestEvent(1), TestEvent(0)]
235        );
236
237        // stage 3
238
239        {
240            let mut w = state_reader.get_mut(&mut world);
241
242            // system reads all events
243            w.iter_prio_range(1, 5).for_each(drop);
244        }
245
246        // first event is consumed immediately
247        assert_eq!(
248            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
249            Vec::default()
250        );
251    }
252
253    #[test]
254    fn test_not_cleared_events() {
255        let mut world = World::new();
256        let mut state_writer: SystemState<PriorityEventWriter<TestEvent>> =
257            SystemState::new(&mut world);
258        let mut state_reader: SystemState<PriorityEventReader<TestEvent>> =
259            SystemState::new(&mut world);
260
261        world.init_resource::<PriorityEvents<TestEvent>>();
262
263        // two systems run at different frequencies, both serve non-overlapping priorities
264
265        // stage 1
266        // system sends events of lower priority than it serves
267        {
268            let mut w = state_writer.get_mut(&mut world);
269
270            w.send(TestEvent(0), 1);
271        }
272        {
273            let mut w = state_reader.get_mut(&mut world);
274
275            w.iter_prio_range(0, 0).for_each(drop);
276        }
277
278        assert_eq!(
279            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
280            vec![TestEvent(0)]
281        );
282
283        // stage 2
284        // same system runs writes another of the same event
285
286        {
287            let mut w = state_writer.get_mut(&mut world);
288
289            w.send(TestEvent(0), 1);
290        }
291        {
292            let mut w = state_reader.get_mut(&mut world);
293
294            w.iter_prio_range(0, 0).for_each(drop);
295        }
296
297        assert_eq!(
298            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
299            vec![TestEvent(0), TestEvent(0)]
300        );
301
302        // stage 3
303        // this time another system runs clearing those events
304        {
305            let mut w = state_reader.get_mut(&mut world);
306
307            w.iter_prio_range(1, 1).for_each(drop);
308        }
309        assert_eq!(
310            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
311            Vec::default()
312        );
313    }
314
315    #[test]
316    fn test_higher_prio_destroyed() {
317        let mut world = World::new();
318        let mut state_writer: SystemState<PriorityEventWriter<TestEvent>> =
319            SystemState::new(&mut world);
320        let mut state_reader: SystemState<PriorityEventReader<TestEvent>> =
321            SystemState::new(&mut world);
322
323        world.init_resource::<PriorityEvents<TestEvent>>();
324
325        // two systems run at different frequencies, both serve non-overlapping priorities
326
327        // stage 1
328        // system sends events of higher priority than another serves
329        {
330            let mut w = state_writer.get_mut(&mut world);
331
332            w.send(TestEvent(0), 0);
333        }
334
335        // stage 2
336        // system receives event of higher priority than it serves
337        {
338            let mut w = state_reader.get_mut(&mut world);
339
340            // event is not read but discarded
341            assert_eq!(
342                w.iter_prio_range(1, 1).collect::<Vec<TestEvent>>(),
343                Vec::default()
344            );
345        }
346
347        // the event is cleared
348        assert_eq!(
349            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
350            vec![]
351        );
352    }
353
354    #[test]
355    fn test_prio_range() {
356        let mut world = World::new();
357        let mut state_writer: SystemState<PriorityEventWriter<TestEvent>> =
358            SystemState::new(&mut world);
359        let mut state_reader: SystemState<PriorityEventReader<TestEvent>> =
360            SystemState::new(&mut world);
361
362        world.init_resource::<PriorityEvents<TestEvent>>();
363
364        // two systems run at different frequencies, both serve non-overlapping priorities
365
366        // stage 1
367        // system sends events of various priorities
368        {
369            let mut w = state_writer.get_mut(&mut world);
370
371            w.send(TestEvent(0), 0);
372            w.send(TestEvent(1), 1);
373            w.send(TestEvent(2), 2);
374            w.send(TestEvent(3), 3);
375            w.send(TestEvent(4), 4);
376            w.send(TestEvent(5), 5);
377        }
378
379        // stage 2
380        // multiple systems in order of priority remove them one by one
381        {
382            let mut w = state_reader.get_mut(&mut world);
383
384            assert_eq!(
385                w.iter_prio_range(0, 1).collect::<Vec<TestEvent>>(),
386                vec![TestEvent(0), TestEvent(1)]
387            );
388
389            assert_eq!(
390                w.iter_prio_range(2, 2).collect::<Vec<TestEvent>>(),
391                vec![TestEvent(2)]
392            );
393
394            assert_eq!(
395                w.iter_prio_range(3, 3).collect::<Vec<TestEvent>>(),
396                vec![TestEvent(3)]
397            );
398
399            // 4 is discarded
400            assert_eq!(
401                w.iter_prio_range(5, 5).collect::<Vec<TestEvent>>(),
402                vec![TestEvent(5)]
403            );
404        }
405
406        // the events are all cleared
407        assert_eq!(
408            collect_events(world.resource::<PriorityEvents<TestEvent>>().events.clone()),
409            vec![]
410        );
411    }
412}