runtime/testutils/
mod.rs

1//! Utilities for writing tests that interact with the runtime.
2use std::{
3	borrow::BorrowMut,
4	mem,
5	sync::{
6		atomic::{AtomicBool, Ordering},
7		Arc,
8	},
9	thread::{sleep, spawn},
10	time::Duration,
11};
12
13use crossbeam_channel::{bounded, Receiver, Sender};
14use parking_lot::Mutex;
15
16use crate::{Installer, Status, ThreadStatuses};
17
18const WAIT_TIME: Duration = Duration::from_millis(100);
19
20/// A mocked version of the `Notifier`, that will interact directly with a `ThreadStatuses` without the use of a thread
21/// or the `Runtime`.
22#[derive(Debug)]
23pub struct MockNotifier<'notifier> {
24	threadable_statuses: &'notifier ThreadStatuses,
25}
26
27impl<'notifier> MockNotifier<'notifier> {
28	/// Create a new instance of a `MockNotifier`.
29	#[inline]
30	#[must_use]
31	pub const fn new(threadable_statuses: &'notifier ThreadStatuses) -> Self {
32		Self { threadable_statuses }
33	}
34
35	/// Register a thread by name and status. This does not create a thread.
36	#[inline]
37	pub fn register_thread(&mut self, thread_name: &str, status: Status) {
38		self.threadable_statuses.register_thread(thread_name, status);
39	}
40}
41
42/// A tester utility for `Threadable`.
43#[derive(Clone, Debug)]
44pub struct ThreadableTester {
45	receiver: Receiver<(String, Status)>,
46	sender: Sender<(String, Status)>,
47	statuses: Arc<Mutex<Vec<Status>>>,
48	ended: Arc<AtomicBool>,
49}
50
51impl ThreadableTester {
52	/// Create a new instance of the test utility.
53	#[inline]
54	#[must_use]
55	pub fn new() -> Self {
56		let (sender, receiver) = bounded(0);
57
58		Self {
59			receiver,
60			sender,
61			statuses: Arc::new(Mutex::new(vec![Status::New])),
62			ended: Arc::new(AtomicBool::new(true)),
63		}
64	}
65
66	/// Take the current `Status` changes.
67	#[inline]
68	#[must_use]
69	pub fn take_statuses(&self) -> Vec<Status> {
70		mem::take(self.statuses.lock().borrow_mut())
71	}
72
73	/// Start a `Threadable` running the thread specified by the name, to completion in a separate thread.
74	#[inline]
75	#[allow(clippy::missing_panics_doc)]
76	pub fn start_threadable<Threadable: crate::Threadable>(&self, theadable: &Threadable, thread_name: &str) {
77		self.ended.store(false, Ordering::Release);
78		let installer = Installer::new(ThreadStatuses::new(), self.sender.clone());
79		theadable.install(&installer);
80		let mut ops = installer.into_ops();
81		let op = ops.remove(thread_name).expect("Expected to find thead");
82
83		let statuses = Arc::clone(&self.statuses);
84		let receiver = self.receiver.clone();
85
86		let _status_thread_id = spawn(move || {
87			for (_, status) in &receiver {
88				let mut statuses_lock = statuses.lock();
89				let last_status = statuses_lock.last().unwrap();
90				if !matches!(*last_status, Status::Error(_)) && last_status != &status {
91					statuses_lock.push(status);
92				}
93			}
94		});
95		let _op_id = spawn(op);
96		self.ended.store(true, Ordering::Release);
97	}
98
99	/// Wait for a particular status to be reached.
100	///
101	/// # Panics
102	///
103	/// Will panic if the wait takes too long and times out.
104	#[inline]
105	pub fn wait_for_status(&self, status: &Status) {
106		let mut attempt = 0;
107
108		loop {
109			let statuses_lock = self.statuses.lock();
110			let current_status = statuses_lock.last().unwrap();
111
112			if current_status == status {
113				break;
114			}
115			assert!(
116				attempt <= 100,
117				"Timeout waited for status change to '{status:?}' on thread.\n Status is: {current_status:?}",
118			);
119
120			sleep(WAIT_TIME);
121			attempt += 1;
122		}
123	}
124
125	/// Wait for an error status to be reached.
126	///
127	/// # Panics
128	///
129	/// Will panic if the wait takes too long and times out.
130	#[inline]
131	pub fn wait_for_error_status(&self) {
132		let mut attempt = 0;
133
134		loop {
135			let statuses_lock = self.statuses.lock();
136			let current_status = statuses_lock.last().unwrap();
137
138			if matches!(current_status, &Status::Error(_)) {
139				break;
140			}
141			assert!(
142				attempt <= 100,
143				"Timeout waited for status change to 'Status::Error(_)' on thread.\n Status is: {current_status:?}"
144			);
145
146			sleep(WAIT_TIME);
147			attempt += 1;
148		}
149	}
150
151	/// Wait for the thread started in `start_threadable` to finish.
152	///
153	/// # Panics
154	///
155	/// Will panic if the wait takes too long and times out.
156	#[inline]
157	pub fn wait_for_finished(&self) {
158		let mut attempt = 0;
159
160		loop {
161			if self.ended.load(Ordering::Acquire) {
162				break;
163			}
164
165			sleep(WAIT_TIME);
166			attempt += 1;
167			assert!(attempt <= 100, "Timeout waited for thread to finish");
168		}
169	}
170}