1#![cfg_attr(docsrs, feature(doc_cfg))]
42
43mod utils; use async_trait::async_trait;
46use std::{future::Future, sync::Arc, time::Duration};
47use thiserror::Error;
48
49#[derive(Debug, Error, PartialEq, Eq)]
53pub enum LockError {
54 #[error("acquiring lock timed‑out after {0:?}")]
56 Timeout(Duration),
57
58 #[cfg(all(feature = "browser", target_arch = "wasm32"))]
60 #[error("Navigator LockManager rejected: {0}")]
61 Js(String),
62}
63
64#[cfg(not(target_arch = "wasm32"))]
69#[async_trait]
70pub trait GlobalLock: Send + Sync + 'static {
71 async fn with_lock<R, F, Fut>(
72 &self,
73 name: &str,
74 timeout: Duration,
75 op: F,
76 ) -> Result<R, LockError>
77 where
78 F: FnOnce() -> Fut + Send + 'static,
79 Fut: Future<Output=R> + Send + 'static,
80 R: Send + 'static;
81}
82
83#[cfg(target_arch = "wasm32")]
84#[async_trait(?Send)]
85pub trait GlobalLock: Send + Sync + 'static {
86 async fn with_lock<R, F, Fut>(
87 &self,
88 name: &str,
89 timeout: Duration,
90 op: F,
91 ) -> Result<R, LockError>
92 where
93 F: FnOnce() -> Fut + 'static,
94 Fut: Future<Output=R> + 'static,
95 R: 'static;
96}
97
98#[cfg(target_arch = "wasm32")]
101#[async_trait(?Send)]
102impl<T: GlobalLock> GlobalLock for Arc<T> {
103 async fn with_lock<R, F, Fut>(
104 &self,
105 n: &str,
106 t: Duration,
107 f: F,
108 ) -> Result<R, LockError>
109 where
110 F: FnOnce() -> Fut + 'static,
111 Fut: Future<Output=R> + 'static,
112 R: 'static,
113 {
114 (**self).with_lock(n, t, f).await
115 }
116}
117
118#[cfg(not(target_arch = "wasm32"))]
119#[async_trait]
120impl<T: GlobalLock> GlobalLock for Arc<T> {
121 async fn with_lock<R, F, Fut>(
122 &self,
123 n: &str,
124 t: Duration,
125 f: F,
126 ) -> Result<R, LockError>
127 where
128 F: FnOnce() -> Fut + Send + 'static,
129 Fut: Future<Output=R> + Send + 'static,
130 R: Send + 'static,
131 {
132 (**self).with_lock(n, t, f).await
133 }
134}
135
136#[derive(Clone, Debug, Default)]
141pub struct NoopLock;
142
143#[cfg(target_arch = "wasm32")]
144#[async_trait(?Send)]
145impl GlobalLock for NoopLock {
146 async fn with_lock<R, F, Fut>(&self, _: &str, _t: Duration, op: F) -> Result<R, LockError>
147 where
148 F: FnOnce() -> Fut + 'static,
149 Fut: Future<Output=R> + 'static,
150 R: 'static,
151 {
152 Ok(op().await)
153 }
154}
155
156#[cfg(not(target_arch = "wasm32"))]
157#[async_trait]
158impl GlobalLock for NoopLock {
159 async fn with_lock<R, F, Fut>(&self, _: &str, _t: Duration, op: F) -> Result<R, LockError>
160 where
161 F: FnOnce() -> Fut + Send + 'static,
162 Fut: Future<Output=R> + Send + 'static,
163 R: Send + 'static,
164 {
165 Ok(op().await)
166 }
167}
168
169#[cfg(all(feature = "native", not(target_arch = "wasm32")))]
171mod native_fifo {
172 use super::*;
173 use once_cell::sync::Lazy;
174 use std::collections::{HashMap, VecDeque};
175 use tokio::sync::{Mutex, Notify};
176 use futures::FutureExt;
177
178 type Queue = VecDeque<Arc<Notify>>;
179 static QUEUES: Lazy<Mutex<HashMap<String, Queue>>> = Lazy::new(|| Mutex::new(HashMap::new()));
180
181 #[derive(Clone, Debug, Default)]
183 pub struct FifoLock;
184
185 #[async_trait]
186 impl GlobalLock for FifoLock {
187 async fn with_lock<R, F, Fut>(
188 &self,
189 name: &str,
190 timeout: Duration,
191 op: F,
192 ) -> Result<R, LockError>
193 where
194 F: FnOnce() -> Fut + Send + 'static,
195 Fut: Future<Output = R> + Send + 'static,
196 R: Send + 'static,
197 {
198 use tokio::time;
199 use futures::FutureExt; use std::panic::{self, AssertUnwindSafe};
201
202 let me = Arc::new(Notify::new());
204 let pos = {
205 let mut map = QUEUES.lock().await;
206 let q = map.entry(name.into()).or_default();
207 q.push_back(me.clone());
208 q.len() - 1
209 };
210
211 if pos != 0 && time::timeout(timeout, me.notified()).await.is_err() {
213 QUEUES
215 .lock()
216 .await
217 .get_mut(name)
218 .map(|q| q.retain(|n| !Arc::ptr_eq(n, &me)));
219 return Err(LockError::Timeout(timeout));
220 }
221
222 let result_or_panic = {
224 let fut = AssertUnwindSafe(op()).catch_unwind();
225 fut.await
226 };
227
228 if let Some(next) = {
230 let mut map = QUEUES.lock().await;
231 let q = map.get_mut(name).unwrap();
232 q.pop_front(); q.front().cloned() } {
235 next.notify_one();
236 }
237
238 match result_or_panic {
240 Ok(val) => Ok(val),
241 Err(panic) => panic::resume_unwind(panic),
242 }
243 }
244 }
245
246
247 pub type DefaultNative = FifoLock;
248}
249
250#[cfg(all(feature = "browser", target_arch = "wasm32"))]
252mod browser;
253
254#[cfg(all(feature = "native", not(target_arch = "wasm32")))]
256pub type DefaultLock = native_fifo::DefaultNative;
257
258#[cfg(target_arch = "wasm32")]
259cfg_if::cfg_if! {
260 if #[cfg(feature = "browser")] {
261 pub type DefaultLock = browser::DefaultBrowser;
262 } else {
263 pub type DefaultLock = NoopLock;
264 }
265}
266
267#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[cfg(all(feature = "native", not(target_arch = "wasm32")))]
280 mod native {
281 use super::*;
282 use futures::future::join_all;
283 use tokio::{task, time::sleep};
284
285 #[tokio::test(flavor = "multi_thread")]
287 async fn fifo_is_exclusive() {
288 let lock = DefaultLock::default();
289 let spans = Arc::new(tokio::sync::Mutex::new(Vec::<(u128, u128)>::new()));
290
291 let mut jobs = Vec::new();
292 for _ in 0..5 {
293 let l = lock.clone();
294 let s = spans.clone();
295 jobs.push(task::spawn(async move {
296 l.with_lock("cs", Duration::from_secs(1), move || async move {
297 let st = std::time::Instant::now();
298 sleep(Duration::from_millis(10)).await;
299 let et = std::time::Instant::now();
300 s.lock().await.push((st.elapsed().as_micros(), et.elapsed().as_micros()));
301 }).await.unwrap();
302 }));
303 }
304 join_all(jobs).await;
305 let v = spans.lock().await.clone();
306 for pair in v.windows(2) {
307 assert!(pair[0].1 <= pair[1].0, "critical sections overlapped");
308 }
309 }
310
311 #[tokio::test]
313 async fn try_lock_semantics() {
314 let lock = DefaultLock::default();
315
316 lock.with_lock("demo", Duration::ZERO, || async {}).await.unwrap();
318
319 let l2 = lock.clone();
321 let _guard = task::spawn(async move {
322 l2.with_lock("demo", Duration::from_secs(1), || async {
323 sleep(Duration::from_millis(50)).await;
324 }).await.unwrap();
325 });
326
327 sleep(Duration::from_millis(5)).await;
328
329 let err = lock
331 .with_lock("demo", Duration::ZERO, || async {})
332 .await
333 .unwrap_err();
334 assert_eq!(err, LockError::Timeout(Duration::ZERO));
335 }
336
337 #[tokio::test]
339 async fn timeout_propagates() {
340 #[derive(Clone, Default)]
341 struct Stubborn;
342 #[async_trait]
343 impl GlobalLock for Stubborn {
344 async fn with_lock<R, F, Fut>(
345 &self,
346 _: &str,
347 t: Duration,
348 _: F,
349 ) -> Result<R, LockError>
350 where
351 F: FnOnce() -> Fut + Send + 'static,
352 Fut: Future<Output=R> + Send + 'static,
353 R: Send + 'static,
354 {
355 sleep(t + Duration::from_millis(5)).await;
356 Err(LockError::Timeout(t))
357 }
358 }
359 let err = Stubborn::default()
360 .with_lock("x", Duration::from_millis(1), || async {})
361 .await
362 .unwrap_err();
363 assert!(matches!(err, LockError::Timeout(_)));
364 }
365
366 #[tokio::test(flavor = "multi_thread")]
373 async fn lock_is_released_after_panic() {
374 use tokio::{task, time::{sleep, Duration}};
375
376 let lock = DefaultLock::default();
377
378 {
380 let l = lock.clone();
381 task::spawn(async move {
382 let _ = std::panic::AssertUnwindSafe(
384 l.with_lock("boom", Duration::from_secs(1), || async {
385 sleep(Duration::from_millis(5)).await;
386 panic!("intentional!");
387 })
388 )
389 .catch_unwind() .await;
391 });
392 }
393
394 sleep(Duration::from_millis(10)).await;
396
397 let res = lock
399 .with_lock("boom", Duration::from_millis(20), || async {})
400 .await;
401
402 assert!(
403 res.is_ok(),
404 "lock remained poisoned – got {res:?} instead of Ok(())"
405 );
406 }
407 }
408
409 #[cfg(all(feature = "browser", target_arch = "wasm32"))]
414 mod wasm {
415 use super::*;
416 use gloo_timers::future::TimeoutFuture;
417 use wasm_bindgen_test::*;
418 wasm_bindgen_test_configure!(run_in_browser);
419
420 fn micros() -> u128 {
422 web_sys::window()
423 .unwrap()
424 .performance()
425 .unwrap()
426 .now() as u128 * 1_000
427 }
428
429 #[wasm_bindgen_test]
431 async fn navigator_free_trylock_succeeds() {
432 DefaultLock::default()
433 .with_lock("free", Duration::ZERO, || async {})
434 .await
435 .unwrap();
436 }
437
438 #[wasm_bindgen_test]
440 async fn navigator_trylock_times_out_when_busy() {
441 let lock = DefaultLock::default();
442
443 {
445 let l = lock.clone();
446 wasm_bindgen_futures::spawn_local(async move {
447 l.with_lock("busy", Duration::from_secs(1), || async {
448 TimeoutFuture::new(50).await;
449 })
450 .await
451 .unwrap();
452 });
453 }
454
455 TimeoutFuture::new(5).await;
457
458 let err = lock
460 .with_lock("busy", Duration::ZERO, || async {})
461 .await
462 .unwrap_err();
463 assert_eq!(err, LockError::Timeout(Duration::ZERO));
464 }
465
466 #[wasm_bindgen_test]
468 async fn navigator_serialises_same_name() {
469 let lock = DefaultLock::default();
470 let mut ts = vec![];
471
472 for _ in 0..3 {
473 let l = lock.clone();
474 ts.push(async move {
475 let (start, end) = l
476 .with_lock("fifo", Duration::from_secs(1), || async {
477 let st = micros();
478 TimeoutFuture::new(20).await;
479 let et = micros();
480 (st, et)
481 })
482 .await
483 .unwrap();
484 (start, end)
485 });
486 }
487
488 let mut spans = futures::future::join_all(ts).await;
489 spans.sort_by_key(|s| s.0);
490 for pair in spans.windows(2) {
491 assert!(
492 pair[1].0 >= pair[0].1,
493 "critical-sections overlapped in the browser backend"
494 );
495 }
496 }
497
498 #[wasm_bindgen_test]
500 async fn navigator_distinct_names_overlap() {
501 let a = DefaultLock::default();
502 let b = a.clone(); let t1 = a.with_lock("A", Duration::from_secs(1), || async {
505 TimeoutFuture::new(30).await;
506 micros()
507 });
508
509 let t2 = b.with_lock("B", Duration::from_secs(1), || async {
510 TimeoutFuture::new(5).await;
511 micros()
512 });
513
514 let (end_a, end_b) = futures::join!(t1, t2);
515 assert!(end_b.unwrap() < end_a.unwrap());
517 }
518 }
519}