input/thread/
state.rs

1use std::{
2	collections::VecDeque,
3	mem,
4	sync::{
5		atomic::{AtomicBool, Ordering},
6		Arc,
7	},
8	time::Duration,
9};
10
11use parking_lot::Mutex;
12
13use crate::Event;
14
15const MAXIMUM_EVENTS: usize = 100;
16const EVENT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
17
18/// Input thread state.
19#[derive(Clone, Debug)]
20pub struct State<CustomEvent: crate::CustomEvent> {
21	ended: Arc<AtomicBool>,
22	event_queue: Arc<Mutex<VecDeque<Event<CustomEvent>>>>,
23	paused: Arc<AtomicBool>,
24	update_receiver: crossbeam_channel::Receiver<()>,
25	update_sender: crossbeam_channel::Sender<()>,
26}
27
28impl<CustomEvent: crate::CustomEvent> State<CustomEvent> {
29	pub(crate) fn new() -> Self {
30		let (update_sender, update_receiver) = crossbeam_channel::unbounded();
31		Self {
32			ended: Arc::new(AtomicBool::from(false)),
33			event_queue: Arc::new(Mutex::new(VecDeque::new())),
34			paused: Arc::new(AtomicBool::from(false)),
35			update_receiver,
36			update_sender,
37		}
38	}
39
40	fn send_update(&self) {
41		let _result = self.update_sender.send(());
42	}
43
44	pub(crate) fn is_paused(&self) -> bool {
45		self.paused.load(Ordering::Acquire)
46	}
47
48	pub(crate) fn is_ended(&self) -> bool {
49		self.ended.load(Ordering::Acquire)
50	}
51
52	/// Pause the event read thread.
53	#[inline]
54	pub fn pause(&self) {
55		self.paused.store(true, Ordering::Release);
56	}
57
58	/// Resume the event read thread.
59	#[inline]
60	pub fn resume(&self) {
61		self.paused.store(false, Ordering::Release);
62	}
63
64	/// Permanently End the event read thread.
65	#[inline]
66	pub fn end(&self) {
67		self.ended.store(true, Ordering::Release);
68	}
69
70	/// Add an event after existing events.
71	#[inline]
72	pub fn enqueue_event(&self, event: Event<CustomEvent>) {
73		let mut events = self.event_queue.lock();
74		let last_resize_event_maybe = matches!(event, Event::Resize(..))
75			.then(|| events.back_mut().filter(|e| matches!(*e, &mut Event::Resize(..))))
76			.flatten();
77
78		if let Some(last_resize_event) = last_resize_event_maybe {
79			let _old = mem::replace(last_resize_event, event);
80		}
81		else if events.len() < MAXIMUM_EVENTS {
82			events.push_back(event);
83		}
84		self.send_update();
85	}
86
87	/// Add an event before existing events.
88	#[inline]
89	pub fn push_event(&self, event: Event<CustomEvent>) {
90		let mut events = self.event_queue.lock();
91		if events.len() >= MAXIMUM_EVENTS {
92			_ = events.pop_back();
93		}
94		events.push_front(event);
95		self.send_update();
96	}
97
98	/// Read an event from the queue. This function will block for a while until an event is
99	/// available. And if no event is available, it will return `Event::None`.
100	#[inline]
101	#[must_use]
102	pub fn read_event(&self) -> Event<CustomEvent> {
103		// clear existing message since last read
104		while self.update_receiver.try_recv().is_ok() {}
105		loop {
106			if let Some(event) = self.event_queue.lock().pop_front() {
107				return event;
108			}
109
110			// if there is no event available on the queue, instead of returning early, we can wait
111			// for the new event message and try again.
112			if self.update_receiver.recv_timeout(EVENT_POLL_TIMEOUT).is_ok() {
113				continue;
114			}
115
116			// We always return if the above recv call times out, to ensure this does not block
117			// forever
118			return Event::None;
119		}
120	}
121}
122
123#[cfg(test)]
124mod tests {
125	use std::{
126		sync::atomic::AtomicUsize,
127		thread::{sleep, spawn},
128	};
129
130	use super::*;
131	use crate::testutil::local::{Event, TestEvent};
132
133	fn create_state() -> State<TestEvent> {
134		State::new()
135	}
136
137	#[test]
138	fn paused() {
139		let state = create_state();
140		state.pause();
141		assert!(state.is_paused());
142	}
143
144	#[test]
145	fn resumed() {
146		let state = create_state();
147		state.resume();
148		assert!(!state.is_paused());
149	}
150
151	#[test]
152	fn ended() {
153		let state = create_state();
154		state.end();
155		assert!(state.is_ended());
156	}
157
158	#[test]
159	fn enqueue_event() {
160		let state = create_state();
161		state.enqueue_event(Event::from('a'));
162		state.enqueue_event(Event::from('b'));
163
164		assert_eq!(state.read_event(), Event::from('a'));
165		assert_eq!(state.read_event(), Event::from('b'));
166	}
167
168	#[test]
169	fn enqueue_event_resize_last_follow_by_non_resize() {
170		let state = create_state();
171		state.enqueue_event(Event::Resize(1, 1));
172		state.enqueue_event(Event::from('a'));
173
174		assert_eq!(state.read_event(), Event::Resize(1, 1));
175		assert_eq!(state.read_event(), Event::from('a'));
176	}
177
178	#[test]
179	fn enqueue_event_resize_last_follow_by_new_resize() {
180		let state = create_state();
181		state.enqueue_event(Event::Resize(1, 1));
182		state.enqueue_event(Event::Resize(2, 2));
183
184		assert_eq!(state.read_event(), Event::Resize(2, 2));
185		assert_eq!(state.read_event(), Event::None);
186	}
187
188	#[test]
189	fn enqueue_event_overflow() {
190		let state = create_state();
191		// fill queue
192		for _ in 0..MAXIMUM_EVENTS {
193			state.enqueue_event(Event::from('a'));
194		}
195		state.enqueue_event(Event::from('b'));
196
197		let mut events_received = vec![];
198		loop {
199			let event = state.read_event();
200			if event == Event::None {
201				break;
202			}
203			events_received.push(event);
204		}
205
206		assert_eq!(state.read_event(), Event::None);
207		assert_eq!(events_received.len(), MAXIMUM_EVENTS);
208		assert_eq!(events_received.first().unwrap(), &Event::from('a'));
209		assert_eq!(events_received.last().unwrap(), &Event::from('a'));
210	}
211
212	#[test]
213	fn push_event() {
214		let state = create_state();
215		state.push_event(Event::from('a'));
216		state.push_event(Event::from('b'));
217
218		assert_eq!(state.read_event(), Event::from('b'));
219		assert_eq!(state.read_event(), Event::from('a'));
220	}
221
222	#[test]
223	fn push_event_overflow() {
224		let state = create_state();
225		// fill queue
226		for _ in 0..MAXIMUM_EVENTS {
227			state.push_event(Event::from('a'));
228		}
229		state.push_event(Event::from('b'));
230
231		let mut events_received = vec![];
232		loop {
233			let event = state.read_event();
234			if event == Event::None {
235				break;
236			}
237			events_received.push(event);
238		}
239
240		assert_eq!(state.read_event(), Event::None);
241		assert_eq!(events_received.len(), MAXIMUM_EVENTS);
242		assert_eq!(events_received.first().unwrap(), &Event::from('b'));
243		assert_eq!(events_received.last().unwrap(), &Event::from('a'));
244	}
245
246	#[test]
247	fn read_event() {
248		// STEPS:
249		// 0 -> thread: initial event read with timeout, returns None
250		//      test: waits for initial event read with timeout to occur (moves to step 1)
251		// 1 -> thread: waits for step 1 to complete
252		//      test: enqueues new event (moves to step 2)
253		// 2 -> tread: reads enqueued event (moves to step 3)
254		//      test: waits for step 2 to complete
255		// 3 -> thead: ended, no action
256		//      test: assert events read match
257
258		let state = create_state();
259
260		let step: Arc<Mutex<AtomicUsize>> = Arc::new(Mutex::new(AtomicUsize::new(0)));
261		let events_read: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(vec![]));
262
263		let thread_state = state.clone();
264		let thread_step = Arc::clone(&step);
265		let thread_events_read = Arc::clone(&events_read);
266		_ = spawn(move || {
267			loop {
268				let mut thread_events_read_lock = thread_events_read.lock();
269				let thread_step_lock = thread_step.lock();
270				match thread_step_lock.load(Ordering::Acquire) {
271					0 => {
272						let event = thread_state.read_event();
273						thread_events_read_lock.push(event);
274						thread_step_lock.store(1, Ordering::Release);
275					},
276					1 => {
277						sleep(Duration::from_millis(10));
278					},
279					2 => {
280						let event = thread_state.read_event();
281						thread_events_read_lock.push(event);
282						thread_step_lock.store(3, Ordering::Release);
283						break;
284					},
285					_ => unreachable!(),
286				}
287			}
288		});
289
290		while step.lock().load(Ordering::Acquire) != 1 {
291			sleep(Duration::from_millis(10));
292		}
293		state.enqueue_event(Event::from('a'));
294		step.lock().store(2, Ordering::Release);
295
296		while step.lock().load(Ordering::Acquire) == 2 {
297			sleep(Duration::from_millis(10));
298		}
299
300		let mut events_read_lock = events_read.lock();
301		assert_eq!(events_read_lock.pop().unwrap(), Event::from('a'));
302		assert_eq!(events_read_lock.pop().unwrap(), Event::None);
303	}
304}