1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
// SPDX-License-Identifier: MIT OR Apache-2.0
use super::Instant;
use super::thread;
use super::{ASYNC_WAITER_ID_COUNTER, AsyncWaiter, Condvar, WaitTimeoutResult};
use crate::Guard;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
impl Condvar {
/// Asynchronously waits for a notification from this condition variable.
///
/// This method will atomically unlock the mutex specified by the guard and
/// await a notification. When a notification is received, the mutex will be
/// re-acquired before the future resolves.
///
/// This method is non-blocking and works everywhere, including WASM main thread.
///
/// # Examples
///
/// ```
/// # // std::thread::spawn panics on wasm32
/// # if cfg!(target_arch = "wasm32") { return; }
/// # test_executors::spin_on(async {
/// use wasm_safe_mutex::{Mutex, condvar::Condvar};
/// use std::sync::Arc;
/// # use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair_clone = Arc::clone(&pair);
///
/// // Spawn a thread that will notify us
/// thread::spawn(move || {
/// # #[cfg(not(target_arch = "wasm32"))]
/// std::thread::sleep(std::time::Duration::from_millis(10));
/// test_executors::spin_on(async {
/// let (mutex, condvar) = &*pair_clone;
/// let mut ready = mutex.lock_async().await;
/// *ready = true;
/// drop(ready);
/// condvar.notify_one();
/// });
/// });
///
/// let (mutex, condvar) = &*pair;
/// let mut ready = mutex.lock_async().await;
/// while !*ready {
/// ready = condvar.wait_async(ready).await;
/// }
/// assert!(*ready);
/// # });
/// ```
pub async fn wait_async<'a, T>(&self, guard: Guard<'a, T>) -> Guard<'a, T> {
let mutex = guard.mutex;
// Create a channel to receive the notification
let receiver = self.waiting_async_threads.with_mut(|waiters| {
let (sender, receiver) = r#continue::continuation();
let id = ASYNC_WAITER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
waiters.push(AsyncWaiter { id, sender });
receiver
});
// Release the mutex
drop(guard);
// Wait for notification
receiver.await;
// Re-acquire the mutex
mutex.lock_async().await
}
/// Asynchronously waits while the predicate remains `true`.
///
/// This method will atomically unlock the mutex specified by the guard and
/// await a notification as long as the `condition` closure returns `true`.
///
/// # Platform Behavior
///
/// - **All Platforms**: Uses async/await to yield execution. This is non-blocking
/// and works in all environments, including WASM main thread.
///
/// # Predicate
///
/// The `condition` closure is called:
/// 1. Before waiting (if it returns `false`, the method returns immediately)
/// 2. After each notification (to check if we should keep waiting)
/// 3. After spurious wakeups (to ensure we don't return prematurely)
///
/// # Spurious Wakeups
///
/// This method automatically handles spurious wakeups by re-checking the condition.
/// You do not need to loop around this call.
///
/// # Examples
///
/// ```
/// # // std::thread::spawn panics on wasm32
/// # if cfg!(target_arch = "wasm32") { return; }
/// # test_executors::spin_on(async {
/// use wasm_safe_mutex::{Mutex, condvar::Condvar};
/// use std::sync::Arc;
/// # use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair_clone = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// # #[cfg(not(target_arch = "wasm32"))]
/// std::thread::sleep(std::time::Duration::from_millis(10));
/// test_executors::spin_on(async {
/// let (mutex, condvar) = &*pair_clone;
/// let mut ready = mutex.lock_async().await;
/// *ready = true;
/// drop(ready);
/// condvar.notify_one();
/// });
/// });
///
/// let (mutex, condvar) = &*pair;
/// let mut ready = mutex.lock_async().await;
/// // Wait until ready becomes true
/// ready = condvar.wait_async_while(ready, |r| !*r).await;
/// assert!(*ready);
/// # });
/// ```
pub async fn wait_async_while<'a, T, F>(
&self,
mut guard: Guard<'a, T>,
mut condition: F,
) -> Guard<'a, T>
where
F: FnMut(&mut T) -> bool,
{
while condition(&mut guard) {
guard = self.wait_async(guard).await;
}
guard
}
/// Asynchronously waits for a notification from this condition variable with a deadline.
///
/// This method will atomically unlock the mutex specified by the guard and
/// await a notification or timeout. When a notification is received or the timeout expires,
/// the mutex will be re-acquired before the future resolves.
///
/// This method is non-blocking and works everywhere, including WASM main thread.
///
/// # Examples
///
/// ```
/// # // std::thread::spawn panics on wasm32
/// # if cfg!(target_arch = "wasm32") { return; }
/// # test_executors::spin_on(async {
/// use wasm_safe_mutex::{Mutex, condvar::Condvar};
/// use std::sync::Arc;
/// # #[cfg(target_arch = "wasm32")]
/// use web_time::{Duration, Instant};
/// # #[cfg(not(target_arch = "wasm32"))]
/// # use std::time::{Duration, Instant};
/// # use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair_clone = Arc::clone(&pair);
///
/// // Spawn a thread that will notify us
/// thread::spawn(move || {
/// # #[cfg(not(target_arch = "wasm32"))]
/// std::thread::sleep(std::time::Duration::from_millis(10));
/// test_executors::spin_on(async {
/// let (mutex, condvar) = &*pair_clone;
/// let mut ready = mutex.lock_async().await;
/// *ready = true;
/// drop(ready);
/// condvar.notify_one();
/// });
/// });
///
/// let (mutex, condvar) = &*pair;
/// let mut ready = mutex.lock_async().await;
/// let deadline = Instant::now() + Duration::from_secs(1);
/// while !*ready {
/// let result;
/// (ready, result) = condvar.wait_async_timeout(ready, deadline).await;
/// if result.timed_out() {
/// break;
/// }
/// }
/// assert!(*ready);
/// # });
/// ```
pub async fn wait_async_timeout<'a, T>(
&self,
guard: Guard<'a, T>,
deadline: Instant,
) -> (Guard<'a, T>, WaitTimeoutResult) {
let mutex = guard.mutex;
// Create a unique ID for this waiter
let waiter_id = ASYNC_WAITER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
// Create two channels - one for normal notification, one for timeout
let (notify_sender, notify_receiver) = r#continue::continuation();
let (timeout_sender, timeout_receiver) = r#continue::continuation();
// Add to waiting list
self.waiting_async_threads.with_mut(|waiters| {
waiters.push(AsyncWaiter {
id: waiter_id,
sender: notify_sender,
});
});
// Spawn a thread to handle the timeout
thread::spawn(move || {
let now = Instant::now();
if deadline > now {
let duration = deadline - now;
thread::sleep(duration);
}
// Send timeout signal
timeout_sender.send(());
});
// Release the mutex
drop(guard);
// Race between notification and timeout
// We'll poll both futures and see which completes first
struct Race<F1, F2> {
notify: Option<F1>,
timeout: Option<F2>,
}
impl<F1: Future + Unpin, F2: Future + Unpin> Future for Race<F1, F2> {
type Output = bool; // true if timed out
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll notification future
if let Some(ref mut notify) = self.notify {
if Pin::new(notify).poll(cx).is_ready() {
self.notify = None;
return Poll::Ready(false); // Got notification
}
}
// Poll timeout future
if let Some(ref mut timeout) = self.timeout {
if Pin::new(timeout).poll(cx).is_ready() {
self.timeout = None;
return Poll::Ready(true); // Timed out
}
}
Poll::Pending
}
}
let timed_out = Race {
notify: Some(notify_receiver),
timeout: Some(timeout_receiver),
}
.await;
// If we timed out, remove ourselves from the list
if timed_out {
self.waiting_async_threads.with_mut(|waiters| {
if let Some(pos) = waiters.iter().position(|w| w.id == waiter_id) {
let waiter = waiters.remove(pos);
// Send the notification to complete the receiver
waiter.sender.send(());
}
});
}
// Re-acquire the mutex
let guard = mutex.lock_async().await;
// Return the result
(guard, WaitTimeoutResult(timed_out))
}
/// Asynchronously waits while the predicate remains `true`, bounded by the deadline.
///
/// This method will atomically unlock the mutex specified by the guard and
/// await a notification as long as the `condition` closure returns `true`.
///
/// # Platform Behavior
///
/// - **All Platforms**: Uses async/await to yield execution. This is non-blocking
/// and works in all environments, including WASM main thread.
///
/// # Predicate
///
/// The `condition` closure is called:
/// 1. Before waiting (if it returns `false`, the method returns immediately)
/// 2. After each notification (to check if we should keep waiting)
/// 3. After spurious wakeups (to ensure we don't return prematurely)
///
/// # Spurious Wakeups
///
/// This method automatically handles spurious wakeups by re-checking the condition.
/// You do not need to loop around this call.
///
/// # Examples
///
/// ```
/// # // std::thread::spawn panics on wasm32
/// # if cfg!(target_arch = "wasm32") { return; }
/// # test_executors::spin_on(async {
/// use wasm_safe_mutex::{Mutex, condvar::Condvar};
/// use std::sync::Arc;
/// # #[cfg(target_arch = "wasm32")]
/// use web_time::{Duration, Instant};
/// # #[cfg(not(target_arch = "wasm32"))]
/// # use std::time::{Duration, Instant};
/// # use std::thread;
///
/// let pair = Arc::new((Mutex::new(0), Condvar::new()));
/// let pair_clone = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// # #[cfg(not(target_arch = "wasm32"))]
/// std::thread::sleep(std::time::Duration::from_millis(10));
/// test_executors::spin_on(async {
/// let (mutex, condvar) = &*pair_clone;
/// let mut value = mutex.lock_async().await;
/// *value = 10;
/// drop(value);
/// condvar.notify_one();
/// });
/// });
///
/// let (mutex, condvar) = &*pair;
/// let guard = mutex.lock_async().await;
/// let deadline = Instant::now() + Duration::from_secs(1);
/// let (guard, result) = condvar.wait_async_timeout_while(guard, deadline, |v| *v < 10).await;
/// if !result.timed_out() {
/// assert_eq!(*guard, 10);
/// }
/// # });
/// ```
pub async fn wait_async_timeout_while<'a, T, F>(
&self,
mut guard: Guard<'a, T>,
deadline: Instant,
mut condition: F,
) -> (Guard<'a, T>, WaitTimeoutResult)
where
F: FnMut(&mut T) -> bool,
{
while condition(&mut guard) {
let result;
(guard, result) = self.wait_async_timeout(guard, deadline).await;
if result.timed_out() {
return (guard, result);
}
}
(guard, WaitTimeoutResult(false))
}
}