1use crate::GpuDevice;
7use parking_lot::{Condvar, Mutex};
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12pub struct Fence {
16 device: Arc<wgpu::Device>,
17 signaled: Arc<AtomicBool>,
18 timestamp: Arc<AtomicU64>,
19}
20
21impl Fence {
22 #[must_use]
24 pub fn new(device: &GpuDevice) -> Self {
25 Self {
26 device: Arc::clone(device.device()),
27 signaled: Arc::new(AtomicBool::new(false)),
28 timestamp: Arc::new(AtomicU64::new(0)),
29 }
30 }
31
32 pub fn signal(&self) {
36 let _ = self.device.poll(wgpu::PollType::wait_indefinitely());
37 self.signaled.store(true, Ordering::Release);
38 self.timestamp.store(
39 Instant::now().elapsed().as_nanos() as u64,
40 Ordering::Relaxed,
41 );
42 }
43
44 pub fn wait(&self) {
49 while !self.signaled.load(Ordering::Acquire) {
50 std::thread::yield_now();
51 }
52 }
53
54 #[must_use]
64 pub fn wait_timeout(&self, timeout: Duration) -> bool {
65 let start = Instant::now();
66 while !self.signaled.load(Ordering::Acquire) {
67 if start.elapsed() > timeout {
68 return false;
69 }
70 std::thread::yield_now();
71 }
72 true
73 }
74
75 #[must_use]
77 pub fn is_signaled(&self) -> bool {
78 self.signaled.load(Ordering::Acquire)
79 }
80
81 pub fn reset(&self) {
83 self.signaled.store(false, Ordering::Release);
84 }
85
86 #[must_use]
88 pub fn timestamp(&self) -> Option<u64> {
89 if self.is_signaled() {
90 Some(self.timestamp.load(Ordering::Relaxed))
91 } else {
92 None
93 }
94 }
95}
96
97impl Clone for Fence {
98 fn clone(&self) -> Self {
99 Self {
100 device: Arc::clone(&self.device),
101 signaled: Arc::clone(&self.signaled),
102 timestamp: Arc::clone(&self.timestamp),
103 }
104 }
105}
106
107pub struct Semaphore {
111 value: Arc<AtomicU64>,
112 condvar: Arc<Condvar>,
113 mutex: Arc<Mutex<()>>,
114}
115
116impl Semaphore {
117 #[must_use]
119 pub fn new(initial_value: u64) -> Self {
120 Self {
121 value: Arc::new(AtomicU64::new(initial_value)),
122 condvar: Arc::new(Condvar::new()),
123 mutex: Arc::new(Mutex::new(())),
124 }
125 }
126
127 pub fn signal(&self) {
129 self.value.fetch_add(1, Ordering::Release);
130 self.condvar.notify_all();
131 }
132
133 pub fn wait(&self) {
135 loop {
136 let current = self.value.load(Ordering::Acquire);
137 if current > 0 {
138 if self
139 .value
140 .compare_exchange_weak(
141 current,
142 current - 1,
143 Ordering::AcqRel,
144 Ordering::Acquire,
145 )
146 .is_ok()
147 {
148 return;
149 }
150 } else {
151 let mut guard = self.mutex.lock();
152 self.condvar.wait(&mut guard);
153 }
154 }
155 }
156
157 #[must_use]
163 pub fn try_wait(&self) -> bool {
164 loop {
165 let current = self.value.load(Ordering::Acquire);
166 if current > 0 {
167 match self.value.compare_exchange_weak(
168 current,
169 current - 1,
170 Ordering::AcqRel,
171 Ordering::Acquire,
172 ) {
173 Ok(_) => return true,
174 Err(_) => continue,
175 }
176 }
177 return false;
178 }
179 }
180
181 #[must_use]
183 pub fn value(&self) -> u64 {
184 self.value.load(Ordering::Acquire)
185 }
186
187 pub fn reset(&self, value: u64) {
189 self.value.store(value, Ordering::Release);
190 self.condvar.notify_all();
191 }
192}
193
194impl Clone for Semaphore {
195 fn clone(&self) -> Self {
196 Self {
197 value: Arc::clone(&self.value),
198 condvar: Arc::clone(&self.condvar),
199 mutex: Arc::clone(&self.mutex),
200 }
201 }
202}
203
204pub struct Event {
206 signaled: Arc<AtomicBool>,
207 condvar: Arc<Condvar>,
208 mutex: Arc<Mutex<()>>,
209}
210
211impl Event {
212 #[must_use]
214 pub fn new() -> Self {
215 Self {
216 signaled: Arc::new(AtomicBool::new(false)),
217 condvar: Arc::new(Condvar::new()),
218 mutex: Arc::new(Mutex::new(())),
219 }
220 }
221
222 pub fn signal(&self) {
224 self.signaled.store(true, Ordering::Release);
225 self.condvar.notify_all();
226 }
227
228 pub fn wait(&self) {
230 while !self.signaled.load(Ordering::Acquire) {
231 let mut guard = self.mutex.lock();
232 self.condvar.wait(&mut guard);
233 }
234 }
235
236 #[must_use]
246 pub fn wait_timeout(&self, timeout: Duration) -> bool {
247 let start = Instant::now();
248 while !self.signaled.load(Ordering::Acquire) {
249 if start.elapsed() > timeout {
250 return false;
251 }
252 let mut guard = self.mutex.lock();
253 let remaining = timeout.saturating_sub(start.elapsed());
254 if remaining.is_zero() {
255 return false;
256 }
257 self.condvar.wait_for(&mut guard, remaining);
258 }
259 true
260 }
261
262 #[must_use]
264 pub fn is_signaled(&self) -> bool {
265 self.signaled.load(Ordering::Acquire)
266 }
267
268 pub fn reset(&self) {
270 self.signaled.store(false, Ordering::Release);
271 }
272}
273
274impl Default for Event {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280impl Clone for Event {
281 fn clone(&self) -> Self {
282 Self {
283 signaled: Arc::clone(&self.signaled),
284 condvar: Arc::clone(&self.condvar),
285 mutex: Arc::clone(&self.mutex),
286 }
287 }
288}
289
290pub struct Barrier {
292 total_count: usize,
293 current_count: Arc<AtomicU64>,
294 generation: Arc<AtomicU64>,
295 condvar: Arc<Condvar>,
296 mutex: Arc<Mutex<()>>,
297}
298
299impl Barrier {
300 #[must_use]
306 pub fn new(count: usize) -> Self {
307 Self {
308 total_count: count,
309 current_count: Arc::new(AtomicU64::new(0)),
310 generation: Arc::new(AtomicU64::new(0)),
311 condvar: Arc::new(Condvar::new()),
312 mutex: Arc::new(Mutex::new(())),
313 }
314 }
315
316 pub fn wait(&self) {
320 let gen = self.generation.load(Ordering::Acquire);
321 let count = self.current_count.fetch_add(1, Ordering::AcqRel) + 1;
322
323 if count >= self.total_count as u64 {
324 self.current_count.store(0, Ordering::Release);
326 self.generation.fetch_add(1, Ordering::Release);
327 self.condvar.notify_all();
328 } else {
329 let mut guard = self.mutex.lock();
331 while gen == self.generation.load(Ordering::Acquire) {
332 self.condvar.wait(&mut guard);
333 }
334 }
335 }
336
337 #[must_use]
339 pub fn count(&self) -> usize {
340 self.total_count
341 }
342
343 #[must_use]
345 pub fn waiting(&self) -> u64 {
346 self.current_count.load(Ordering::Acquire)
347 }
348}
349
350impl Clone for Barrier {
351 fn clone(&self) -> Self {
352 Self {
353 total_count: self.total_count,
354 current_count: Arc::clone(&self.current_count),
355 generation: Arc::clone(&self.generation),
356 condvar: Arc::clone(&self.condvar),
357 mutex: Arc::clone(&self.mutex),
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_semaphore() {
368 let sem = Semaphore::new(1);
369 assert_eq!(sem.value(), 1);
370
371 assert!(sem.try_wait());
372 assert_eq!(sem.value(), 0);
373
374 assert!(!sem.try_wait());
375 assert_eq!(sem.value(), 0);
376
377 sem.signal();
378 assert_eq!(sem.value(), 1);
379
380 assert!(sem.try_wait());
381 assert_eq!(sem.value(), 0);
382 }
383
384 #[test]
385 fn test_event() {
386 let event = Event::new();
387 assert!(!event.is_signaled());
388
389 event.signal();
390 assert!(event.is_signaled());
391
392 event.reset();
393 assert!(!event.is_signaled());
394 }
395
396 #[test]
397 fn test_barrier() {
398 let barrier = Barrier::new(3);
399 assert_eq!(barrier.count(), 3);
400 assert_eq!(barrier.waiting(), 0);
401 }
402}