1use std::{
3 borrow::BorrowMut,
4 mem,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 },
9 thread::{sleep, spawn},
10 time::Duration,
11};
12
13use crossbeam_channel::{bounded, Receiver, Sender};
14use parking_lot::Mutex;
15
16use crate::{Installer, Status, ThreadStatuses};
17
18const WAIT_TIME: Duration = Duration::from_millis(100);
19
20#[derive(Debug)]
23pub struct MockNotifier<'notifier> {
24 threadable_statuses: &'notifier ThreadStatuses,
25}
26
27impl<'notifier> MockNotifier<'notifier> {
28 #[inline]
30 #[must_use]
31 pub const fn new(threadable_statuses: &'notifier ThreadStatuses) -> Self {
32 Self { threadable_statuses }
33 }
34
35 #[inline]
37 pub fn register_thread(&mut self, thread_name: &str, status: Status) {
38 self.threadable_statuses.register_thread(thread_name, status);
39 }
40}
41
42#[derive(Clone, Debug)]
44pub struct ThreadableTester {
45 receiver: Receiver<(String, Status)>,
46 sender: Sender<(String, Status)>,
47 statuses: Arc<Mutex<Vec<Status>>>,
48 ended: Arc<AtomicBool>,
49}
50
51impl ThreadableTester {
52 #[inline]
54 #[must_use]
55 pub fn new() -> Self {
56 let (sender, receiver) = bounded(0);
57
58 Self {
59 receiver,
60 sender,
61 statuses: Arc::new(Mutex::new(vec![Status::New])),
62 ended: Arc::new(AtomicBool::new(true)),
63 }
64 }
65
66 #[inline]
68 #[must_use]
69 pub fn take_statuses(&self) -> Vec<Status> {
70 mem::take(self.statuses.lock().borrow_mut())
71 }
72
73 #[inline]
75 #[allow(clippy::missing_panics_doc)]
76 pub fn start_threadable<Threadable: crate::Threadable>(&self, theadable: &Threadable, thread_name: &str) {
77 self.ended.store(false, Ordering::Release);
78 let installer = Installer::new(ThreadStatuses::new(), self.sender.clone());
79 theadable.install(&installer);
80 let mut ops = installer.into_ops();
81 let op = ops.remove(thread_name).expect("Expected to find thead");
82
83 let statuses = Arc::clone(&self.statuses);
84 let receiver = self.receiver.clone();
85
86 let _status_thread_id = spawn(move || {
87 for (_, status) in &receiver {
88 let mut statuses_lock = statuses.lock();
89 let last_status = statuses_lock.last().unwrap();
90 if !matches!(*last_status, Status::Error(_)) && last_status != &status {
91 statuses_lock.push(status);
92 }
93 }
94 });
95 let _op_id = spawn(op);
96 self.ended.store(true, Ordering::Release);
97 }
98
99 #[inline]
105 pub fn wait_for_status(&self, status: &Status) {
106 let mut attempt = 0;
107
108 loop {
109 let statuses_lock = self.statuses.lock();
110 let current_status = statuses_lock.last().unwrap();
111
112 if current_status == status {
113 break;
114 }
115 assert!(
116 attempt <= 100,
117 "Timeout waited for status change to '{status:?}' on thread.\n Status is: {current_status:?}",
118 );
119
120 sleep(WAIT_TIME);
121 attempt += 1;
122 }
123 }
124
125 #[inline]
131 pub fn wait_for_error_status(&self) {
132 let mut attempt = 0;
133
134 loop {
135 let statuses_lock = self.statuses.lock();
136 let current_status = statuses_lock.last().unwrap();
137
138 if matches!(current_status, &Status::Error(_)) {
139 break;
140 }
141 assert!(
142 attempt <= 100,
143 "Timeout waited for status change to 'Status::Error(_)' on thread.\n Status is: {current_status:?}"
144 );
145
146 sleep(WAIT_TIME);
147 attempt += 1;
148 }
149 }
150
151 #[inline]
157 pub fn wait_for_finished(&self) {
158 let mut attempt = 0;
159
160 loop {
161 if self.ended.load(Ordering::Acquire) {
162 break;
163 }
164
165 sleep(WAIT_TIME);
166 attempt += 1;
167 assert!(attempt <= 100, "Timeout waited for thread to finish");
168 }
169 }
170}