my_ecs/ds/signal.rs
1use std::{
2 mem,
3 num::NonZeroU32,
4 sync::atomic::{AtomicU32, Ordering},
5 thread::{self, Thread},
6};
7
8const ASLEEP: u32 = 0;
9const AWAKE: u32 = 1;
10const ALARM: u32 = 2;
11
12/// A data structure that can be used to wake other workers.
13///
14/// To use this struct, you must register worker handles through
15/// [`Signal::set_handles`], then you can make use of sleep/wake methods like
16/// so,
17/// - [`Signal::wait`] : Sleep indefinitely.
18/// - [`Signal::notify`] : Wake a specific worker.
19/// - [`Signal::notify_one`] : Wake a random worker.
20/// - [`Signal::notify_all`] : Wake all workers.
21#[derive(Debug)]
22pub struct Signal {
23 handles: Vec<Thread>,
24 states: Vec<AtomicU32>,
25 rand_gen: Xorshift,
26}
27
28impl Default for Signal {
29 /// Creates a new empty [`Signal`] with default random seed number.
30 ///
31 /// Note that you cannot do anything with empty `Signal` unless you call
32 /// [`Signal::set_handles`] on it.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use my_ecs::ds::Signal;
38 ///
39 /// let signal = Signal::default();
40 /// ```
41 fn default() -> Self {
42 const DEFAULT_SEED: u32 = 0x7A7A_A7A7;
43 // Safety: Infallible.
44 unsafe { Self::new(NonZeroU32::new_unchecked(DEFAULT_SEED)) }
45 }
46}
47
48impl Signal {
49 /// Creates a new empty [`Signal`] with the given random seed number.
50 ///
51 /// Don't forget to call [`Signal::set_handles`] before using the signal.
52 ///
53 /// # Examples
54 ///
55 /// ```
56 /// use my_ecs::ds::Signal;
57 /// use std::num::NonZeroU32;
58 ///
59 /// let signal = Signal::new(NonZeroU32::MIN);
60 /// ```
61 pub fn new(seed: NonZeroU32) -> Self {
62 Self {
63 handles: Vec::new(),
64 states: Vec::new(),
65 rand_gen: Xorshift::new(seed),
66 }
67 }
68
69 /// Sets worker handles to the signal then returns former worker handles.
70 ///
71 /// # Examples
72 ///
73 /// ```
74 /// use my_ecs::ds::Signal;
75 /// use std::thread;
76 ///
77 /// let join = thread::spawn(|| { /* ... */ });
78 /// let handle = join.thread().clone();
79 ///
80 /// let mut signal = Signal::default();
81 /// signal.set_handles(vec![handle]);
82 /// ```
83 pub fn set_handles(&mut self, handles: Vec<Thread>) -> Vec<Thread> {
84 let init_state = if self.states.is_empty() {
85 AWAKE
86 } else if self
87 .states
88 .iter()
89 .all(|state| state.load(Ordering::Relaxed) == ASLEEP)
90 {
91 ASLEEP
92 } else {
93 panic!("cannot replace 'Signal' in awake state");
94 };
95
96 let new_len = handles.len();
97
98 // Replaces 'handles'.
99 let old = mem::replace(&mut self.handles, handles);
100
101 // Replaces 'states'.
102 self.states = (0..new_len).map(|_| AtomicU32::new(init_state)).collect();
103
104 old
105 }
106
107 /// Blocks until another worker signals.
108 ///
109 /// * this_index - Index to the handle of *this worker* in the vector that
110 /// you inserted through [`Signal::set_handles`].
111 ///
112 /// Current worker cannot be woken up unless another worker wakes the
113 /// current worker through [`Signal`] because this method blocks repeatedly
114 /// to ignore spurious wakeup. See [`park`](thread::park) for more details.
115 ///
116 /// Also, note that signaling looks like being buffered in a single slot.
117 /// For example, if another worker woke current worker up and current worker
118 /// didn't sleep at that time, next call to [`Signal::wait`] will consume
119 /// the signal in the single slot buffer then be ignored.
120 ///
121 /// # Note
122 ///
123 /// `index` is an index for the current worker handle in the vector you
124 /// inserted at [`Signal::set_handles`]. By receiving the index from
125 /// caller, this method can avoid unnecessary matching opeartion. But note
126 /// that incorrect index causes panic or undefinitely long sleep.
127 ///
128 /// # Examples
129 ///
130 /// ```ignore
131 /// use my_ecs::ds::Signal;
132 ///
133 /// let mut signal = Signal::default();
134 /// let handle = std::thread::current();
135 /// signal.set_handles(vec![handle, /* other handles */]);
136 ///
137 /// // Current worker will be blocked by this call.
138 /// signal.wait(0); // Current handle index is 0.
139 /// ```
140 pub fn wait(&self, this_index: usize) {
141 #[cfg(debug_assertions)]
142 {
143 let handle = self
144 .handles
145 .get(this_index)
146 .expect("index is out of bounds");
147 assert_eq!(
148 thread::current().id(),
149 handle.id(),
150 "incorrect index was given"
151 );
152 }
153
154 // Skips if ALARM.
155 if self.states[this_index]
156 .compare_exchange(AWAKE, ASLEEP, Ordering::Relaxed, Ordering::Relaxed)
157 .is_ok()
158 {
159 loop {
160 thread::park(); // Acquire op in terms of ordering.
161 if self.states[this_index].load(Ordering::Relaxed) != ASLEEP {
162 break;
163 }
164 }
165 }
166
167 // ALARM -> AWAKE.
168 self.states[this_index].store(AWAKE, Ordering::Relaxed);
169 }
170
171 /// Wakes up a worker for the given target index or another worker.
172 ///
173 /// * target_index - Index to a handle of the vector that you inserted
174 /// through [`Signal::set_handles`].
175 ///
176 /// If the target worker is not blocked at the time, tries to wake another
177 /// worker instead. If failed to wake any worker, then mark *ALARM* on the
178 /// target worker, so that the worker will not be blocked by
179 /// [`Signal::wait`] next time.
180 ///
181 /// # Examples
182 ///
183 /// ```ignore
184 /// use my_ecs::ds::Signal;
185 /// use std::thread;
186 ///
187 /// let mut signal = Signal::default();
188 ///
189 /// // Worker A.
190 /// let join_a = thread::spawn(|| { /* ... */});
191 /// let handle_a = join_a.thread().clone();
192 ///
193 /// // Worker B.
194 /// let join_b = thread::spawn(|| { /* ... */});
195 /// let handle_b = join_b.thread().clone();
196 ///
197 /// signal.set_handles(vec![handle_a, handle_b]);
198 ///
199 /// // Wakes up worker A.
200 /// signal.notify(0);
201 ///
202 /// // Wakes up worker B.
203 /// signal.notify(1);
204 ///
205 /// # join_a.join().unwrap();
206 /// # join_b.join().unwrap();
207 /// ```
208 pub fn notify(&self, target_index: usize) {
209 // Tries to find and wake one ASLEEP worker.
210 const RETRY: usize = 3;
211 let len = self.states.len();
212 for i in (0..len).cycle().skip(target_index).take(len * RETRY) {
213 if self.states[i]
214 .compare_exchange(ASLEEP, ALARM, Ordering::Relaxed, Ordering::Relaxed)
215 .is_ok()
216 {
217 self.handles[i].unpark(); // Release op in terms of ordering.
218 return;
219 }
220 }
221
222 // We've failed to find ASLEEP worker.
223 // Then, make sure one worker to be AWAKE later.
224 self.states[target_index].store(ALARM, Ordering::Relaxed);
225 self.handles[target_index].unpark(); // Release op in terms of ordering.
226 }
227
228 /// Wakes up a random worker.
229 ///
230 /// If failed to wake any worker, then mark *ALARM* on one worker, so that
231 /// the worker will not be blocked by [`Signal::wait`] next time.
232 ///
233 /// # Examples
234 ///
235 /// ```ignore
236 /// use my_ecs::ds::Signal;
237 /// use std::thread;
238 ///
239 /// let mut signal = Signal::default();
240 ///
241 /// // Worker A.
242 /// let join_a = thread::spawn(|| { /* ... */});
243 /// let handle_a = join_a.thread().clone();
244 ///
245 /// // Worker B.
246 /// let join_b = thread::spawn(|| { /* ... */});
247 /// let handle_b = join_b.thread().clone();
248 ///
249 /// signal.set_handles(vec![handle_a, handle_b]);
250 ///
251 /// // Wakes up one random worker.
252 /// signal.notify_one();
253 ///
254 /// # join_a.join().unwrap();
255 /// # join_b.join().unwrap();
256 /// ```
257 pub fn notify_one(&self) {
258 let index = self.rand_gen.next() % self.states.len() as u32;
259 self.notify(index as usize);
260 }
261
262 /// Wakes up all workers.
263 ///
264 /// Each worker is marked *ALARM* when it's not blocked at the time, so that
265 /// the worker will not be blocked by [`Signal::wait`] next time.
266 ///
267 /// # Examples
268 ///
269 /// ```ignore
270 /// use my_ecs::ds::Signal;
271 /// use std::thread;
272 ///
273 /// let mut signal = Signal::default();
274 ///
275 /// // Worker A.
276 /// let join_a = thread::spawn(|| { /* ... */});
277 /// let handle_a = join_a.thread().clone();
278 ///
279 /// // Worker B.
280 /// let join_b = thread::spawn(|| { /* ... */});
281 /// let handle_b = join_b.thread().clone();
282 ///
283 /// signal.set_handles(vec![handle_a, handle_b]);
284 ///
285 /// // Wakes up all workers, Worker A & B.
286 /// signal.notify_all();
287 ///
288 /// # join_a.join().unwrap();
289 /// # join_b.join().unwrap();
290 /// ```
291 pub fn notify_all(&self) {
292 for (state, handle) in self.states.iter().zip(self.handles.iter()) {
293 state.store(ALARM, Ordering::Relaxed);
294 handle.unpark(); // Release op in terms of ordering.
295 }
296 }
297}
298
299/// A random number generator based on 32bits state.
300///
301/// The genrator can be shared and generate random numbers across workers but it
302/// doesn't provide synchronization.
303///
304/// # Reference
305///
306/// <https://en.wikipedia.org/wiki/Xorshift>
307#[derive(Debug)]
308#[repr(transparent)]
309pub struct Xorshift {
310 state: AtomicU32,
311}
312
313impl Xorshift {
314 /// Creates a new [`Xorshift`].
315 ///
316 /// # Examples
317 ///
318 /// ```
319 /// use my_ecs::ds::Xorshift;
320 /// use std::num::NonZeroU32;
321 ///
322 /// let gen = Xorshift::new(NonZeroU32::MIN);
323 /// ```
324 pub const fn new(seed: NonZeroU32) -> Self {
325 Self {
326 state: AtomicU32::new(seed.get()),
327 }
328 }
329
330 /// Generates next random number from the current state.
331 ///
332 /// You can call this method on multiple workers at the same time. The
333 /// generator is using atomic variable under the hood, so that it's
334 /// guaranteed to generate a random number from different states.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use my_ecs::ds::Xorshift;
340 /// use std::{num::NonZeroU32, collections::HashSet};
341 ///
342 /// let gen = Xorshift::new(NonZeroU32::MIN);
343 /// let mut randoms = HashSet::new();
344 /// for _ in 0..100 {
345 /// let is_new = randoms.insert(gen.next());
346 /// assert!(is_new);
347 /// }
348 /// ```
349 pub fn next(&self) -> u32 {
350 let mut cur = self.state.load(Ordering::Relaxed);
351 loop {
352 let mut new = cur;
353 new ^= new << 13;
354 new ^= new >> 17;
355 new ^= new << 5;
356 if let Err(slot) =
357 self.state
358 .compare_exchange_weak(cur, new, Ordering::Relaxed, Ordering::Relaxed)
359 {
360 cur = slot;
361 } else {
362 return new;
363 }
364 }
365 }
366}