yaar_lock/sync/
reset_event.rs

1use super::WaitNode;
2use crate::ThreadEvent;
3use core::{
4    fmt,
5    marker::PhantomData,
6    sync::atomic::{fence, AtomicUsize, Ordering},
7};
8
9#[cfg(feature = "os")]
10pub use self::if_os::*;
11#[cfg(feature = "os")]
12mod if_os {
13    use super::*;
14    use crate::OsThreadEvent;
15
16    /// A [`WordEvent`] backed by [`OsThreadEvent`] for thread blocking.
17    #[cfg_attr(feature = "nightly", doc(cfg(feature = "os")))]
18    pub type ResetEvent = WordEvent<OsThreadEvent>;
19}
20
21const IS_SET: usize = 0b1;
22
23/// A word (`usize`) sized [`ThreadEvent`] implementation.
24pub struct WordEvent<E> {
25    state: AtomicUsize,
26    phantom: PhantomData<E>,
27}
28
29unsafe impl<E: Send> Send for WordEvent<E> {}
30
31unsafe impl<E: Sync> Sync for WordEvent<E> {}
32
33impl<E> Default for WordEvent<E> {
34    fn default() -> Self {
35        Self::new(false)
36    }
37}
38
39impl<E> WordEvent<E> {
40    #[inline]
41    pub fn new(is_set: bool) -> Self {
42        Self {
43            state: AtomicUsize::new(if is_set { IS_SET } else { 0 }),
44            phantom: PhantomData,
45        }
46    }
47}
48
49impl<E: ThreadEvent> fmt::Debug for WordEvent<E> {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("WordEvent")
52            .field("is_set", &self.is_set())
53            .finish()
54    }
55}
56
57impl<E: ThreadEvent> ThreadEvent for WordEvent<E> {
58    #[inline]
59    fn is_set(&self) -> bool {
60        self.state.load(Ordering::Acquire) == IS_SET
61    }
62
63    #[inline]
64    fn reset(&self) {
65        self.state.store(0, Ordering::Relaxed);
66    }
67
68    #[inline]
69    fn set(&self) {
70        let state = self.state.swap(IS_SET, Ordering::Release);
71        let head = (state & !IS_SET) as *const WaitNode<E>;
72        if !head.is_null() {
73            self.wake_slow(unsafe { &*head });
74        }
75    }
76
77    #[inline]
78    fn wait(&self) {
79        if !self.is_set() {
80            self.wait_slow();
81        }
82    }
83}
84
85impl<E: ThreadEvent> WordEvent<E> {
86    #[cold]
87    fn wake_slow(&self, head: &WaitNode<E>) {
88        loop {
89            let (new_tail, tail) = head.dequeue();
90            tail.notify(false);
91            if new_tail.is_null() {
92                break;
93            }
94        }
95    }
96
97    #[cold]
98    fn wait_slow(&self) {
99        let wait_node = WaitNode::<E>::default();
100        let mut state = self.state.load(Ordering::Acquire);
101
102        loop {
103            if state == IS_SET {
104                return;
105            }
106
107            let head = (state & !IS_SET) as *const WaitNode<E>;
108            if let Err(s) = self.state.compare_exchange_weak(
109                state,
110                wait_node.enqueue(head) as usize,
111                Ordering::Relaxed,
112                Ordering::Relaxed,
113            ) {
114                fence(Ordering::Acquire);
115                state = s;
116                continue;
117            }
118
119            let _ = wait_node.wait();
120            wait_node.reset();
121            state = self.state.load(Ordering::Acquire);
122            continue;
123        }
124    }
125}
126
127#[cfg(test)]
128#[test]
129fn test_reset_event() {
130    use std::{cell::Cell, sync::Arc, thread};
131
132    let event = ResetEvent::default();
133    assert_eq!(event.is_set(), false);
134
135    event.set();
136    assert_eq!(event.is_set(), true);
137
138    event.reset();
139    assert_eq!(event.is_set(), false);
140
141    struct Context {
142        value: Cell<u128>,
143        input: ResetEvent,
144        output: ResetEvent,
145    }
146
147    unsafe impl Sync for Context {}
148
149    let context = Arc::new(Context {
150        value: Cell::new(0),
151        input: ResetEvent::default(),
152        output: ResetEvent::default(),
153    });
154
155    let receiver = {
156        let context = context.clone();
157        thread::spawn(move || {
158            // wait for sender to update value and signal input
159            context.input.wait();
160            assert_eq!(context.value.get(), 1);
161
162            // update value and signal output
163            context.input.reset();
164            context.value.set(2);
165            context.output.set();
166
167            // wait for sender to update value and signal final input
168            context.input.wait();
169            assert_eq!(context.value.get(), 3);
170        })
171    };
172
173    let sender = move || {
174        // update value and signal input
175        assert_eq!(context.value.get(), 0);
176        context.value.set(1);
177        context.input.set();
178
179        // wait for receiver to update value and signal output
180        context.output.wait();
181        assert_eq!(context.value.get(), 2);
182
183        // update value and signal final input
184        context.value.set(3);
185        context.input.set();
186    };
187
188    sender();
189    receiver.join().unwrap();
190}