yaar_lock/sync/
reset_event.rs1use super::WaitNode;
2use crate::ThreadEvent;
3use core::{
4 fmt,
5 marker::PhantomData,
6 sync::atomic::{fence, AtomicUsize, Ordering},
7};
8
9#[cfg(feature = "os")]
10pub use self::if_os::*;
11#[cfg(feature = "os")]
12mod if_os {
13 use super::*;
14 use crate::OsThreadEvent;
15
16 #[cfg_attr(feature = "nightly", doc(cfg(feature = "os")))]
18 pub type ResetEvent = WordEvent<OsThreadEvent>;
19}
20
21const IS_SET: usize = 0b1;
22
23pub struct WordEvent<E> {
25 state: AtomicUsize,
26 phantom: PhantomData<E>,
27}
28
29unsafe impl<E: Send> Send for WordEvent<E> {}
30
31unsafe impl<E: Sync> Sync for WordEvent<E> {}
32
33impl<E> Default for WordEvent<E> {
34 fn default() -> Self {
35 Self::new(false)
36 }
37}
38
39impl<E> WordEvent<E> {
40 #[inline]
41 pub fn new(is_set: bool) -> Self {
42 Self {
43 state: AtomicUsize::new(if is_set { IS_SET } else { 0 }),
44 phantom: PhantomData,
45 }
46 }
47}
48
49impl<E: ThreadEvent> fmt::Debug for WordEvent<E> {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("WordEvent")
52 .field("is_set", &self.is_set())
53 .finish()
54 }
55}
56
57impl<E: ThreadEvent> ThreadEvent for WordEvent<E> {
58 #[inline]
59 fn is_set(&self) -> bool {
60 self.state.load(Ordering::Acquire) == IS_SET
61 }
62
63 #[inline]
64 fn reset(&self) {
65 self.state.store(0, Ordering::Relaxed);
66 }
67
68 #[inline]
69 fn set(&self) {
70 let state = self.state.swap(IS_SET, Ordering::Release);
71 let head = (state & !IS_SET) as *const WaitNode<E>;
72 if !head.is_null() {
73 self.wake_slow(unsafe { &*head });
74 }
75 }
76
77 #[inline]
78 fn wait(&self) {
79 if !self.is_set() {
80 self.wait_slow();
81 }
82 }
83}
84
85impl<E: ThreadEvent> WordEvent<E> {
86 #[cold]
87 fn wake_slow(&self, head: &WaitNode<E>) {
88 loop {
89 let (new_tail, tail) = head.dequeue();
90 tail.notify(false);
91 if new_tail.is_null() {
92 break;
93 }
94 }
95 }
96
97 #[cold]
98 fn wait_slow(&self) {
99 let wait_node = WaitNode::<E>::default();
100 let mut state = self.state.load(Ordering::Acquire);
101
102 loop {
103 if state == IS_SET {
104 return;
105 }
106
107 let head = (state & !IS_SET) as *const WaitNode<E>;
108 if let Err(s) = self.state.compare_exchange_weak(
109 state,
110 wait_node.enqueue(head) as usize,
111 Ordering::Relaxed,
112 Ordering::Relaxed,
113 ) {
114 fence(Ordering::Acquire);
115 state = s;
116 continue;
117 }
118
119 let _ = wait_node.wait();
120 wait_node.reset();
121 state = self.state.load(Ordering::Acquire);
122 continue;
123 }
124 }
125}
126
127#[cfg(test)]
128#[test]
129fn test_reset_event() {
130 use std::{cell::Cell, sync::Arc, thread};
131
132 let event = ResetEvent::default();
133 assert_eq!(event.is_set(), false);
134
135 event.set();
136 assert_eq!(event.is_set(), true);
137
138 event.reset();
139 assert_eq!(event.is_set(), false);
140
141 struct Context {
142 value: Cell<u128>,
143 input: ResetEvent,
144 output: ResetEvent,
145 }
146
147 unsafe impl Sync for Context {}
148
149 let context = Arc::new(Context {
150 value: Cell::new(0),
151 input: ResetEvent::default(),
152 output: ResetEvent::default(),
153 });
154
155 let receiver = {
156 let context = context.clone();
157 thread::spawn(move || {
158 context.input.wait();
160 assert_eq!(context.value.get(), 1);
161
162 context.input.reset();
164 context.value.set(2);
165 context.output.set();
166
167 context.input.wait();
169 assert_eq!(context.value.get(), 3);
170 })
171 };
172
173 let sender = move || {
174 assert_eq!(context.value.get(), 0);
176 context.value.set(1);
177 context.input.set();
178
179 context.output.wait();
181 assert_eq!(context.value.get(), 2);
182
183 context.value.set(3);
185 context.input.set();
186 };
187
188 sender();
189 receiver.join().unwrap();
190}