1use std::{
2 collections::VecDeque,
3 mem,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8 time::Duration,
9};
10
11use parking_lot::Mutex;
12
13use crate::Event;
14
15const MAXIMUM_EVENTS: usize = 100;
16const EVENT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
17
18#[derive(Clone, Debug)]
20pub struct State<CustomEvent: crate::CustomEvent> {
21 ended: Arc<AtomicBool>,
22 event_queue: Arc<Mutex<VecDeque<Event<CustomEvent>>>>,
23 paused: Arc<AtomicBool>,
24 update_receiver: crossbeam_channel::Receiver<()>,
25 update_sender: crossbeam_channel::Sender<()>,
26}
27
28impl<CustomEvent: crate::CustomEvent> State<CustomEvent> {
29 pub(crate) fn new() -> Self {
30 let (update_sender, update_receiver) = crossbeam_channel::unbounded();
31 Self {
32 ended: Arc::new(AtomicBool::from(false)),
33 event_queue: Arc::new(Mutex::new(VecDeque::new())),
34 paused: Arc::new(AtomicBool::from(false)),
35 update_receiver,
36 update_sender,
37 }
38 }
39
40 fn send_update(&self) {
41 let _result = self.update_sender.send(());
42 }
43
44 pub(crate) fn is_paused(&self) -> bool {
45 self.paused.load(Ordering::Acquire)
46 }
47
48 pub(crate) fn is_ended(&self) -> bool {
49 self.ended.load(Ordering::Acquire)
50 }
51
52 #[inline]
54 pub fn pause(&self) {
55 self.paused.store(true, Ordering::Release);
56 }
57
58 #[inline]
60 pub fn resume(&self) {
61 self.paused.store(false, Ordering::Release);
62 }
63
64 #[inline]
66 pub fn end(&self) {
67 self.ended.store(true, Ordering::Release);
68 }
69
70 #[inline]
72 pub fn enqueue_event(&self, event: Event<CustomEvent>) {
73 let mut events = self.event_queue.lock();
74 let last_resize_event_maybe = matches!(event, Event::Resize(..))
75 .then(|| events.back_mut().filter(|e| matches!(*e, &mut Event::Resize(..))))
76 .flatten();
77
78 if let Some(last_resize_event) = last_resize_event_maybe {
79 let _old = mem::replace(last_resize_event, event);
80 }
81 else if events.len() < MAXIMUM_EVENTS {
82 events.push_back(event);
83 }
84 self.send_update();
85 }
86
87 #[inline]
89 pub fn push_event(&self, event: Event<CustomEvent>) {
90 let mut events = self.event_queue.lock();
91 if events.len() >= MAXIMUM_EVENTS {
92 _ = events.pop_back();
93 }
94 events.push_front(event);
95 self.send_update();
96 }
97
98 #[inline]
101 #[must_use]
102 pub fn read_event(&self) -> Event<CustomEvent> {
103 while self.update_receiver.try_recv().is_ok() {}
105 loop {
106 if let Some(event) = self.event_queue.lock().pop_front() {
107 return event;
108 }
109
110 if self.update_receiver.recv_timeout(EVENT_POLL_TIMEOUT).is_ok() {
113 continue;
114 }
115
116 return Event::None;
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::{
126 sync::atomic::AtomicUsize,
127 thread::{sleep, spawn},
128 };
129
130 use super::*;
131 use crate::testutil::local::{Event, TestEvent};
132
133 fn create_state() -> State<TestEvent> {
134 State::new()
135 }
136
137 #[test]
138 fn paused() {
139 let state = create_state();
140 state.pause();
141 assert!(state.is_paused());
142 }
143
144 #[test]
145 fn resumed() {
146 let state = create_state();
147 state.resume();
148 assert!(!state.is_paused());
149 }
150
151 #[test]
152 fn ended() {
153 let state = create_state();
154 state.end();
155 assert!(state.is_ended());
156 }
157
158 #[test]
159 fn enqueue_event() {
160 let state = create_state();
161 state.enqueue_event(Event::from('a'));
162 state.enqueue_event(Event::from('b'));
163
164 assert_eq!(state.read_event(), Event::from('a'));
165 assert_eq!(state.read_event(), Event::from('b'));
166 }
167
168 #[test]
169 fn enqueue_event_resize_last_follow_by_non_resize() {
170 let state = create_state();
171 state.enqueue_event(Event::Resize(1, 1));
172 state.enqueue_event(Event::from('a'));
173
174 assert_eq!(state.read_event(), Event::Resize(1, 1));
175 assert_eq!(state.read_event(), Event::from('a'));
176 }
177
178 #[test]
179 fn enqueue_event_resize_last_follow_by_new_resize() {
180 let state = create_state();
181 state.enqueue_event(Event::Resize(1, 1));
182 state.enqueue_event(Event::Resize(2, 2));
183
184 assert_eq!(state.read_event(), Event::Resize(2, 2));
185 assert_eq!(state.read_event(), Event::None);
186 }
187
188 #[test]
189 fn enqueue_event_overflow() {
190 let state = create_state();
191 for _ in 0..MAXIMUM_EVENTS {
193 state.enqueue_event(Event::from('a'));
194 }
195 state.enqueue_event(Event::from('b'));
196
197 let mut events_received = vec![];
198 loop {
199 let event = state.read_event();
200 if event == Event::None {
201 break;
202 }
203 events_received.push(event);
204 }
205
206 assert_eq!(state.read_event(), Event::None);
207 assert_eq!(events_received.len(), MAXIMUM_EVENTS);
208 assert_eq!(events_received.first().unwrap(), &Event::from('a'));
209 assert_eq!(events_received.last().unwrap(), &Event::from('a'));
210 }
211
212 #[test]
213 fn push_event() {
214 let state = create_state();
215 state.push_event(Event::from('a'));
216 state.push_event(Event::from('b'));
217
218 assert_eq!(state.read_event(), Event::from('b'));
219 assert_eq!(state.read_event(), Event::from('a'));
220 }
221
222 #[test]
223 fn push_event_overflow() {
224 let state = create_state();
225 for _ in 0..MAXIMUM_EVENTS {
227 state.push_event(Event::from('a'));
228 }
229 state.push_event(Event::from('b'));
230
231 let mut events_received = vec![];
232 loop {
233 let event = state.read_event();
234 if event == Event::None {
235 break;
236 }
237 events_received.push(event);
238 }
239
240 assert_eq!(state.read_event(), Event::None);
241 assert_eq!(events_received.len(), MAXIMUM_EVENTS);
242 assert_eq!(events_received.first().unwrap(), &Event::from('b'));
243 assert_eq!(events_received.last().unwrap(), &Event::from('a'));
244 }
245
246 #[test]
247 fn read_event() {
248 let state = create_state();
259
260 let step: Arc<Mutex<AtomicUsize>> = Arc::new(Mutex::new(AtomicUsize::new(0)));
261 let events_read: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(vec![]));
262
263 let thread_state = state.clone();
264 let thread_step = Arc::clone(&step);
265 let thread_events_read = Arc::clone(&events_read);
266 _ = spawn(move || {
267 loop {
268 let mut thread_events_read_lock = thread_events_read.lock();
269 let thread_step_lock = thread_step.lock();
270 match thread_step_lock.load(Ordering::Acquire) {
271 0 => {
272 let event = thread_state.read_event();
273 thread_events_read_lock.push(event);
274 thread_step_lock.store(1, Ordering::Release);
275 },
276 1 => {
277 sleep(Duration::from_millis(10));
278 },
279 2 => {
280 let event = thread_state.read_event();
281 thread_events_read_lock.push(event);
282 thread_step_lock.store(3, Ordering::Release);
283 break;
284 },
285 _ => unreachable!(),
286 }
287 }
288 });
289
290 while step.lock().load(Ordering::Acquire) != 1 {
291 sleep(Duration::from_millis(10));
292 }
293 state.enqueue_event(Event::from('a'));
294 step.lock().store(2, Ordering::Release);
295
296 while step.lock().load(Ordering::Acquire) == 2 {
297 sleep(Duration::from_millis(10));
298 }
299
300 let mut events_read_lock = events_read.lock();
301 assert_eq!(events_read_lock.pop().unwrap(), Event::from('a'));
302 assert_eq!(events_read_lock.pop().unwrap(), Event::None);
303 }
304}