my_ecs/ds/
signal.rs

1use std::{
2    mem,
3    num::NonZeroU32,
4    sync::atomic::{AtomicU32, Ordering},
5    thread::{self, Thread},
6};
7
8const ASLEEP: u32 = 0;
9const AWAKE: u32 = 1;
10const ALARM: u32 = 2;
11
12/// A data structure that can be used to wake other workers.
13///
14/// To use this struct, you must register worker handles through
15/// [`Signal::set_handles`], then you can make use of sleep/wake methods like
16/// so,
17/// - [`Signal::wait`] : Sleep indefinitely.
18/// - [`Signal::notify`] : Wake a specific worker.
19/// - [`Signal::notify_one`] : Wake a random worker.
20/// - [`Signal::notify_all`] : Wake all workers.
21#[derive(Debug)]
22pub struct Signal {
23    handles: Vec<Thread>,
24    states: Vec<AtomicU32>,
25    rand_gen: Xorshift,
26}
27
28impl Default for Signal {
29    /// Creates a new empty [`Signal`] with default random seed number.
30    ///
31    /// Note that you cannot do anything with empty `Signal` unless you call
32    /// [`Signal::set_handles`] on it.
33    ///
34    /// # Examples
35    ///
36    /// ```
37    /// use my_ecs::ds::Signal;
38    ///
39    /// let signal = Signal::default();
40    /// ```
41    fn default() -> Self {
42        const DEFAULT_SEED: u32 = 0x7A7A_A7A7;
43        // Safety: Infallible.
44        unsafe { Self::new(NonZeroU32::new_unchecked(DEFAULT_SEED)) }
45    }
46}
47
48impl Signal {
49    /// Creates a new empty [`Signal`] with the given random seed number.
50    ///
51    /// Don't forget to call [`Signal::set_handles`] before using the signal.
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// use my_ecs::ds::Signal;
57    /// use std::num::NonZeroU32;
58    ///
59    /// let signal = Signal::new(NonZeroU32::MIN);
60    /// ```
61    pub fn new(seed: NonZeroU32) -> Self {
62        Self {
63            handles: Vec::new(),
64            states: Vec::new(),
65            rand_gen: Xorshift::new(seed),
66        }
67    }
68
69    /// Sets worker handles to the signal then returns former worker handles.
70    ///
71    /// # Examples
72    ///
73    /// ```
74    /// use my_ecs::ds::Signal;
75    /// use std::thread;
76    ///
77    /// let join = thread::spawn(|| { /* ... */ });
78    /// let handle = join.thread().clone();
79    ///
80    /// let mut signal = Signal::default();
81    /// signal.set_handles(vec![handle]);
82    /// ```
83    pub fn set_handles(&mut self, handles: Vec<Thread>) -> Vec<Thread> {
84        let init_state = if self.states.is_empty() {
85            AWAKE
86        } else if self
87            .states
88            .iter()
89            .all(|state| state.load(Ordering::Relaxed) == ASLEEP)
90        {
91            ASLEEP
92        } else {
93            panic!("cannot replace 'Signal' in awake state");
94        };
95
96        let new_len = handles.len();
97
98        // Replaces 'handles'.
99        let old = mem::replace(&mut self.handles, handles);
100
101        // Replaces 'states'.
102        self.states = (0..new_len).map(|_| AtomicU32::new(init_state)).collect();
103
104        old
105    }
106
107    /// Blocks until another worker signals.
108    ///
109    /// * this_index - Index to the handle of *this worker* in the vector that
110    ///   you inserted through [`Signal::set_handles`].
111    ///
112    /// Current worker cannot be woken up unless another worker wakes the
113    /// current worker through [`Signal`] because this method blocks repeatedly
114    /// to ignore spurious wakeup. See [`park`](thread::park) for more details.
115    ///
116    /// Also, note that signaling looks like being buffered in a single slot.
117    /// For example, if another worker woke current worker up and current worker
118    /// didn't sleep at that time, next call to [`Signal::wait`] will consume
119    /// the signal in the single slot buffer then be ignored.
120    ///
121    /// # Note
122    ///
123    /// `index` is an index for the current worker handle in the vector you
124    /// inserted at [`Signal::set_handles`]. By receiving the index from
125    /// caller, this method can avoid unnecessary matching opeartion. But note
126    /// that incorrect index causes panic or undefinitely long sleep.
127    ///
128    /// # Examples
129    ///
130    /// ```ignore
131    /// use my_ecs::ds::Signal;
132    ///
133    /// let mut signal = Signal::default();
134    /// let handle = std::thread::current();
135    /// signal.set_handles(vec![handle, /* other handles */]);
136    ///
137    /// // Current worker will be blocked by this call.
138    /// signal.wait(0); // Current handle index is 0.
139    /// ```
140    pub fn wait(&self, this_index: usize) {
141        #[cfg(debug_assertions)]
142        {
143            let handle = self
144                .handles
145                .get(this_index)
146                .expect("index is out of bounds");
147            assert_eq!(
148                thread::current().id(),
149                handle.id(),
150                "incorrect index was given"
151            );
152        }
153
154        // Skips if ALARM.
155        if self.states[this_index]
156            .compare_exchange(AWAKE, ASLEEP, Ordering::Relaxed, Ordering::Relaxed)
157            .is_ok()
158        {
159            loop {
160                thread::park(); // Acquire op in terms of ordering.
161                if self.states[this_index].load(Ordering::Relaxed) != ASLEEP {
162                    break;
163                }
164            }
165        }
166
167        // ALARM -> AWAKE.
168        self.states[this_index].store(AWAKE, Ordering::Relaxed);
169    }
170
171    /// Wakes up a worker for the given target index or another worker.
172    ///
173    /// * target_index - Index to a handle of the vector that you inserted
174    ///   through [`Signal::set_handles`].
175    ///
176    /// If the target worker is not blocked at the time, tries to wake another
177    /// worker instead. If failed to wake any worker, then mark *ALARM* on the
178    /// target worker, so that the worker will not be blocked by
179    /// [`Signal::wait`] next time.
180    ///
181    /// # Examples
182    ///
183    /// ```ignore
184    /// use my_ecs::ds::Signal;
185    /// use std::thread;
186    ///
187    /// let mut signal = Signal::default();
188    ///
189    /// // Worker A.
190    /// let join_a = thread::spawn(|| { /* ... */});
191    /// let handle_a = join_a.thread().clone();
192    ///
193    /// // Worker B.
194    /// let join_b = thread::spawn(|| { /* ... */});
195    /// let handle_b = join_b.thread().clone();
196    ///
197    /// signal.set_handles(vec![handle_a, handle_b]);
198    ///
199    /// // Wakes up worker A.
200    /// signal.notify(0);
201    ///
202    /// // Wakes up worker B.
203    /// signal.notify(1);
204    ///
205    /// # join_a.join().unwrap();
206    /// # join_b.join().unwrap();
207    /// ```
208    pub fn notify(&self, target_index: usize) {
209        // Tries to find and wake one ASLEEP worker.
210        const RETRY: usize = 3;
211        let len = self.states.len();
212        for i in (0..len).cycle().skip(target_index).take(len * RETRY) {
213            if self.states[i]
214                .compare_exchange(ASLEEP, ALARM, Ordering::Relaxed, Ordering::Relaxed)
215                .is_ok()
216            {
217                self.handles[i].unpark(); // Release op in terms of ordering.
218                return;
219            }
220        }
221
222        // We've failed to find ASLEEP worker.
223        // Then, make sure one worker to be AWAKE later.
224        self.states[target_index].store(ALARM, Ordering::Relaxed);
225        self.handles[target_index].unpark(); // Release op in terms of ordering.
226    }
227
228    /// Wakes up a random worker.
229    ///
230    /// If failed to wake any worker, then mark *ALARM* on one worker, so that
231    /// the worker will not be blocked by [`Signal::wait`] next time.
232    ///
233    /// # Examples
234    ///
235    /// ```ignore
236    /// use my_ecs::ds::Signal;
237    /// use std::thread;
238    ///
239    /// let mut signal = Signal::default();
240    ///
241    /// // Worker A.
242    /// let join_a = thread::spawn(|| { /* ... */});
243    /// let handle_a = join_a.thread().clone();
244    ///
245    /// // Worker B.
246    /// let join_b = thread::spawn(|| { /* ... */});
247    /// let handle_b = join_b.thread().clone();
248    ///
249    /// signal.set_handles(vec![handle_a, handle_b]);
250    ///
251    /// // Wakes up one random worker.
252    /// signal.notify_one();
253    ///
254    /// # join_a.join().unwrap();
255    /// # join_b.join().unwrap();
256    /// ```
257    pub fn notify_one(&self) {
258        let index = self.rand_gen.next() % self.states.len() as u32;
259        self.notify(index as usize);
260    }
261
262    /// Wakes up all workers.
263    ///
264    /// Each worker is marked *ALARM* when it's not blocked at the time, so that
265    /// the worker will not be blocked by [`Signal::wait`] next time.
266    ///
267    /// # Examples
268    ///
269    /// ```ignore
270    /// use my_ecs::ds::Signal;
271    /// use std::thread;
272    ///
273    /// let mut signal = Signal::default();
274    ///
275    /// // Worker A.
276    /// let join_a = thread::spawn(|| { /* ... */});
277    /// let handle_a = join_a.thread().clone();
278    ///
279    /// // Worker B.
280    /// let join_b = thread::spawn(|| { /* ... */});
281    /// let handle_b = join_b.thread().clone();
282    ///
283    /// signal.set_handles(vec![handle_a, handle_b]);
284    ///
285    /// // Wakes up all workers, Worker A & B.
286    /// signal.notify_all();
287    ///
288    /// # join_a.join().unwrap();
289    /// # join_b.join().unwrap();
290    /// ```
291    pub fn notify_all(&self) {
292        for (state, handle) in self.states.iter().zip(self.handles.iter()) {
293            state.store(ALARM, Ordering::Relaxed);
294            handle.unpark(); // Release op in terms of ordering.
295        }
296    }
297}
298
299/// A random number generator based on 32bits state.
300///
301/// The genrator can be shared and generate random numbers across workers but it
302/// doesn't provide synchronization.
303///
304/// # Reference
305///  
306/// <https://en.wikipedia.org/wiki/Xorshift>
307#[derive(Debug)]
308#[repr(transparent)]
309pub struct Xorshift {
310    state: AtomicU32,
311}
312
313impl Xorshift {
314    /// Creates a new [`Xorshift`].
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// use my_ecs::ds::Xorshift;
320    /// use std::num::NonZeroU32;
321    ///
322    /// let gen = Xorshift::new(NonZeroU32::MIN);
323    /// ```
324    pub const fn new(seed: NonZeroU32) -> Self {
325        Self {
326            state: AtomicU32::new(seed.get()),
327        }
328    }
329
330    /// Generates next random number from the current state.
331    ///
332    /// You can call this method on multiple workers at the same time. The
333    /// generator is using atomic variable under the hood, so that it's
334    /// guaranteed to generate a random number from different states.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use my_ecs::ds::Xorshift;
340    /// use std::{num::NonZeroU32, collections::HashSet};
341    ///
342    /// let gen = Xorshift::new(NonZeroU32::MIN);
343    /// let mut randoms = HashSet::new();
344    /// for _ in 0..100 {
345    ///     let is_new = randoms.insert(gen.next());
346    ///     assert!(is_new);
347    /// }
348    /// ```
349    pub fn next(&self) -> u32 {
350        let mut cur = self.state.load(Ordering::Relaxed);
351        loop {
352            let mut new = cur;
353            new ^= new << 13;
354            new ^= new >> 17;
355            new ^= new << 5;
356            if let Err(slot) =
357                self.state
358                    .compare_exchange_weak(cur, new, Ordering::Relaxed, Ordering::Relaxed)
359            {
360                cur = slot;
361            } else {
362                return new;
363            }
364        }
365    }
366}