cross_locks/
lib.rs

1//! # cross‑locks
2//!
3//! A **single‑source, zero‑dependency**¹ implementation of Supabase/GoTrue‑style
4//! *global exclusive locks* that work **identically** across every runtime.
5//!
6//! | target / runtime                     | compile‑time feature | backend used by `DefaultLock` |
7//! |--------------------------------------|----------------------|--------------------------------|
8//! | Native (Tokio, multi‑thread)         | **`native`**         | fair FIFO queue (`Notify`)     |
9//! | Browser WASM (2022 + `navigator.locks`) | **`browser`**        | `Navigator.locks.request`      |
10//! | Head‑less WASM (Node / WASI / tests) | **`wasm`**           | no‑op passthrough              |
11//!
12//! \* Safari shipped the LockManager API in 16.4 (2023‑03).
13//!
14//! ## Guarantees
15//! * **FIFO fairness** for every waiter requesting the *same* lock name on
16//!   native.
17//! * **Zero‑timeout try‑lock** (`Duration::ZERO`) mirrors the semantics of the
18//!   JS SDK.
19//! * **Task‑local re‑entrancy** helper demonstrated in the test‑suite.
20//! * `Arc<T>` automatically implements `GlobalLock`, ideal for storing in an
21//!   `Axum` `State`, `OnceCell`, etc.
22//!
23//! ---
24//! ¹ Outside the optional features listed below (`async‑trait`, `thiserror`,
25//!   Tokio / wasm‑bindgen).
26//!
27//! ```toml
28//! # Cargo.toml – choose exactly ONE backend
29//! [dependencies]
30//! cross-locks = { version = "*", default-features = false, features = ["native"] }
31//! # or  "browser"  |  "wasm"
32//! ```
33//!
34//! ```bash
35//! # run the exhaustive native test‑suite
36//! cargo test --features native
37//!
38//! # browser smoke‑test (headless Firefox)
39//! wasm-pack test --firefox --headless --features browser
40//! ```
41#![cfg_attr(docsrs, feature(doc_cfg))]
42
43mod utils; // misc helpers shared by the native test‑suite
44
45use async_trait::async_trait;
46use std::{future::Future, sync::Arc, time::Duration};
47use thiserror::Error;
48
49/*────────────────────────────── errors ──────────────────────────────*/
50
51/// Errors returned by [`GlobalLock::with_lock`].
52#[derive(Debug, Error, PartialEq, Eq)]
53pub enum LockError {
54    /// Timed‑out while waiting to acquire the lock.
55    #[error("acquiring lock timed‑out after {0:?}")]
56    Timeout(Duration),
57
58    /// Browser only – the JS *Navigator LockManager* rejected the request.
59    #[cfg(all(feature = "browser", target_arch = "wasm32"))]
60    #[error("Navigator LockManager rejected: {0}")]
61    Js(String),
62}
63
64/*────────────────────────── core abstraction ───────────────────────*/
65
66/// Back‑end agnostic trait.
67/// *On `wasm32`* the `Send` bounds are automatically relaxed (`?Send`).
68#[cfg(not(target_arch = "wasm32"))]
69#[async_trait]
70pub trait GlobalLock: Send + Sync + 'static {
71    async fn with_lock<R, F, Fut>(
72        &self,
73        name: &str,
74        timeout: Duration,
75        op: F,
76    ) -> Result<R, LockError>
77    where
78        F: FnOnce() -> Fut + Send + 'static,
79        Fut: Future<Output=R> + Send + 'static,
80        R: Send + 'static;
81}
82
83#[cfg(target_arch = "wasm32")]
84#[async_trait(?Send)]
85pub trait GlobalLock: Send + Sync + 'static {
86    async fn with_lock<R, F, Fut>(
87        &self,
88        name: &str,
89        timeout: Duration,
90        op: F,
91    ) -> Result<R, LockError>
92    where
93        F: FnOnce() -> Fut + 'static,
94        Fut: Future<Output=R> + 'static,
95        R: 'static;
96}
97
98/*──────── blanket impl so `Arc<T>` is itself a lock ─────────*/
99
100#[cfg(target_arch = "wasm32")]
101#[async_trait(?Send)]
102impl<T: GlobalLock> GlobalLock for Arc<T> {
103    async fn with_lock<R, F, Fut>(
104        &self,
105        n: &str,
106        t: Duration,
107        f: F,
108    ) -> Result<R, LockError>
109    where
110        F: FnOnce() -> Fut + 'static,
111        Fut: Future<Output=R> + 'static,
112        R: 'static,
113    {
114        (**self).with_lock(n, t, f).await
115    }
116}
117
118#[cfg(not(target_arch = "wasm32"))]
119#[async_trait]
120impl<T: GlobalLock> GlobalLock for Arc<T> {
121    async fn with_lock<R, F, Fut>(
122        &self,
123        n: &str,
124        t: Duration,
125        f: F,
126    ) -> Result<R, LockError>
127    where
128        F: FnOnce() -> Fut + Send + 'static,
129        Fut: Future<Output=R> + Send + 'static,
130        R: Send + 'static,
131    {
132        (**self).with_lock(n, t, f).await
133    }
134}
135
136/*────────────────────────── no‑op backend ─────────────────────────*/
137
138/// Passthrough implementation – used by the `wasm` (Node/WASI) feature and in
139/// single‑thread CLI tests.
140#[derive(Clone, Debug, Default)]
141pub struct NoopLock;
142
143#[cfg(target_arch = "wasm32")]
144#[async_trait(?Send)]
145impl GlobalLock for NoopLock {
146    async fn with_lock<R, F, Fut>(&self, _: &str, _t: Duration, op: F) -> Result<R, LockError>
147    where
148        F: FnOnce() -> Fut + 'static,
149        Fut: Future<Output=R> + 'static,
150        R: 'static,
151    {
152        Ok(op().await)
153    }
154}
155
156#[cfg(not(target_arch = "wasm32"))]
157#[async_trait]
158impl GlobalLock for NoopLock {
159    async fn with_lock<R, F, Fut>(&self, _: &str, _t: Duration, op: F) -> Result<R, LockError>
160    where
161        F: FnOnce() -> Fut + Send + 'static,
162        Fut: Future<Output=R> + Send + 'static,
163        R: Send + 'static,
164    {
165        Ok(op().await)
166    }
167}
168
169/*──────────────────────── native FIFO backend ─────────────────────*/
170#[cfg(all(feature = "native", not(target_arch = "wasm32")))]
171mod native_fifo {
172    use super::*;
173    use once_cell::sync::Lazy;
174    use std::collections::{HashMap, VecDeque};
175    use tokio::sync::{Mutex, Notify};
176    use futures::FutureExt;
177
178    type Queue = VecDeque<Arc<Notify>>;
179    static QUEUES: Lazy<Mutex<HashMap<String, Queue>>> = Lazy::new(|| Mutex::new(HashMap::new()));
180
181    /// Tokio **FIFO‑fair** global lock.
182    #[derive(Clone, Debug, Default)]
183    pub struct FifoLock;
184
185    #[async_trait]
186    impl GlobalLock for FifoLock {
187        async fn with_lock<R, F, Fut>(
188            &self,
189            name: &str,
190            timeout: Duration,
191            op: F,
192        ) -> Result<R, LockError>
193        where
194            F: FnOnce() -> Fut + Send + 'static,
195            Fut: Future<Output = R> + Send + 'static,
196            R: Send + 'static,
197        {
198            use tokio::time;
199            use futures::FutureExt;               //  ← new
200            use std::panic::{self, AssertUnwindSafe};
201
202            // ── 1️⃣  enqueue our Notify handle ─────────────────────────────
203            let me = Arc::new(Notify::new());
204            let pos = {
205                let mut map = QUEUES.lock().await;
206                let q = map.entry(name.into()).or_default();
207                q.push_back(me.clone());
208                q.len() - 1
209            };
210
211            // ── 2️⃣  wait (with timeout) until we're at the head ───────────
212            if pos != 0 && time::timeout(timeout, me.notified()).await.is_err() {
213                // timed-out ⇒ remove our entry and bail out
214                QUEUES
215                    .lock()
216                    .await
217                    .get_mut(name)
218                    .map(|q| q.retain(|n| !Arc::ptr_eq(n, &me)));
219                return Err(LockError::Timeout(timeout));
220            }
221
222            // ── 3️⃣  run the critical section, *catching panics* ────────────
223            let result_or_panic = {
224                let fut = AssertUnwindSafe(op()).catch_unwind();
225                fut.await
226            };
227
228            // ── 4️⃣  ALWAYS clean up the queue & wake the next waiter ───────
229            if let Some(next) = {
230                let mut map = QUEUES.lock().await;
231                let q = map.get_mut(name).unwrap();
232                q.pop_front();               // remove *our* entry
233                q.front().cloned()           // next in FIFO (if any)
234            } {
235                next.notify_one();
236            }
237
238            // ── 5️⃣  propagate success or re-panic ‐ like std::sync::Mutex ──
239            match result_or_panic {
240                Ok(val)   => Ok(val),
241                Err(panic) => panic::resume_unwind(panic),
242            }
243        }
244    }
245
246
247    pub type DefaultNative = FifoLock;
248}
249
250/*────────────────────── browser backend ───────────────────────────*/
251#[cfg(all(feature = "browser", target_arch = "wasm32"))]
252mod browser;
253
254/*──────── re-exports so user can depend on exactly one name ────────*/
255#[cfg(all(feature = "native", not(target_arch = "wasm32")))]
256pub type DefaultLock = native_fifo::DefaultNative;
257
258#[cfg(target_arch = "wasm32")]
259cfg_if::cfg_if! {
260    if #[cfg(feature = "browser")] {
261        pub type DefaultLock = browser::DefaultBrowser;
262    } else {
263        pub type DefaultLock = NoopLock;
264    }
265}
266
267// #[cfg(all(feature = "wasm", target_arch = "wasm32"))]
268// pub type DefaultLock = browser::DefaultBrowser;
269
270/*────────────────────────── exhaustive tests ───────────────────────*/
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    /*===============================================================
276     =                        NATIVE TESTS                           =
277     ===============================================================*/
278
279    #[cfg(all(feature = "native", not(target_arch = "wasm32")))]
280    mod native {
281        use super::*;
282        use futures::future::join_all;
283        use tokio::{task, time::sleep};
284
285        /// critical-sections never overlap (FIFO fairness)
286        #[tokio::test(flavor = "multi_thread")]
287        async fn fifo_is_exclusive() {
288            let lock = DefaultLock::default();
289            let spans = Arc::new(tokio::sync::Mutex::new(Vec::<(u128, u128)>::new()));
290
291            let mut jobs = Vec::new();
292            for _ in 0..5 {
293                let l = lock.clone();
294                let s = spans.clone();
295                jobs.push(task::spawn(async move {
296                    l.with_lock("cs", Duration::from_secs(1), move || async move {
297                        let st = std::time::Instant::now();
298                        sleep(Duration::from_millis(10)).await;
299                        let et = std::time::Instant::now();
300                        s.lock().await.push((st.elapsed().as_micros(), et.elapsed().as_micros()));
301                    }).await.unwrap();
302                }));
303            }
304            join_all(jobs).await;
305            let v = spans.lock().await.clone();
306            for pair in v.windows(2) {
307                assert!(pair[0].1 <= pair[1].0, "critical sections overlapped");
308            }
309        }
310
311        /// zero-timeout succeeds when lock is free, fails when busy.
312        #[tokio::test]
313        async fn try_lock_semantics() {
314            let lock = DefaultLock::default();
315
316            // immediately succeeds
317            lock.with_lock("demo", Duration::ZERO, || async {}).await.unwrap();
318
319            // occupy lock
320            let l2 = lock.clone();
321            let _guard = task::spawn(async move {
322                l2.with_lock("demo", Duration::from_secs(1), || async {
323                    sleep(Duration::from_millis(50)).await;
324                }).await.unwrap();
325            });
326
327            sleep(Duration::from_millis(5)).await;
328
329            // try-lock should now fail
330            let err = lock
331                .with_lock("demo", Duration::ZERO, || async {})
332                .await
333                .unwrap_err();
334            assert_eq!(err, LockError::Timeout(Duration::ZERO));
335        }
336
337        /// timeout error propagates correctly
338        #[tokio::test]
339        async fn timeout_propagates() {
340            #[derive(Clone, Default)]
341            struct Stubborn;
342            #[async_trait]
343            impl GlobalLock for Stubborn {
344                async fn with_lock<R, F, Fut>(
345                    &self,
346                    _: &str,
347                    t: Duration,
348                    _: F,
349                ) -> Result<R, LockError>
350                where
351                    F: FnOnce() -> Fut + Send + 'static,
352                    Fut: Future<Output=R> + Send + 'static,
353                    R: Send + 'static,
354                {
355                    sleep(t + Duration::from_millis(5)).await;
356                    Err(LockError::Timeout(t))
357                }
358            }
359            let err = Stubborn::default()
360                .with_lock("x", Duration::from_millis(1), || async {})
361                .await
362                .unwrap_err();
363            assert!(matches!(err, LockError::Timeout(_)));
364        }
365
366        /// ❺  A panic inside the critical-section must NOT poison the lock.
367        ///
368        /// The first task grabs the lock, sleeps 5 ms, then panics.
369        /// A second task should still be able to obtain the same lock
370        /// (within 20 ms) once the first future unwinds.
371        /// Current implementation keeps the queue entry forever → times-out.
372        #[tokio::test(flavor = "multi_thread")]
373        async fn lock_is_released_after_panic() {
374            use tokio::{task, time::{sleep, Duration}};
375
376            let lock = DefaultLock::default();
377
378            // ── 1️⃣  spawn a task that panics while holding the lock ──────────
379            {
380                let l = lock.clone();
381                task::spawn(async move {
382                    // ignore the JoinError – we *want* this panicking branch
383                    let _ = std::panic::AssertUnwindSafe(
384                        l.with_lock("boom", Duration::from_secs(1), || async {
385                            sleep(Duration::from_millis(5)).await;
386                            panic!("intentional!");
387                        })
388                    )
389                        .catch_unwind()           // keep the test process alive
390                        .await;
391                });
392            }
393
394            // give the first task enough time to enter the CS & panic
395            sleep(Duration::from_millis(10)).await;
396
397            // ── 2️⃣  second attempt must succeed quickly if lock was cleaned up
398            let res = lock
399                .with_lock("boom", Duration::from_millis(20), || async {})
400                .await;
401
402            assert!(
403                res.is_ok(),
404                "lock remained poisoned – got {res:?} instead of Ok(())"
405            );
406        }
407    }
408
409    /*===============================================================
410     =                     WASM (BROWSER) TESTS                      =
411     ===============================================================*/
412
413    #[cfg(all(feature = "browser", target_arch = "wasm32"))]
414    mod wasm {
415        use super::*;
416        use gloo_timers::future::TimeoutFuture;
417        use wasm_bindgen_test::*;
418        wasm_bindgen_test_configure!(run_in_browser);
419
420        // Helper to grab high-resolution µs since page load
421        fn micros() -> u128 {
422            web_sys::window()
423                .unwrap()
424                .performance()
425                .unwrap()
426                .now() as u128 * 1_000
427        }
428
429        /// 1️⃣ Smoke-test: lock can be acquired when free (try-lock path).
430        #[wasm_bindgen_test]
431        async fn navigator_free_trylock_succeeds() {
432            DefaultLock::default()
433                .with_lock("free", Duration::ZERO, || async {})
434                .await
435                .unwrap();
436        }
437
438        /// 2️⃣ Try-lock fails with `Timeout(0)` while another task is holding it.
439        #[wasm_bindgen_test]
440        async fn navigator_trylock_times_out_when_busy() {
441            let lock = DefaultLock::default();
442
443            // Spawn a future that holds the lock for 50 ms
444            {
445                let l = lock.clone();
446                wasm_bindgen_futures::spawn_local(async move {
447                    l.with_lock("busy", Duration::from_secs(1), || async {
448                        TimeoutFuture::new(50).await;
449                    })
450                        .await
451                        .unwrap();
452                });
453            }
454
455            // Yield to let the first future grab the lock
456            TimeoutFuture::new(5).await;
457
458            // Immediate try-lock must fail
459            let err = lock
460                .with_lock("busy", Duration::ZERO, || async {})
461                .await
462                .unwrap_err();
463            assert_eq!(err, LockError::Timeout(Duration::ZERO));
464        }
465
466        /// 3️⃣ Critical-sections on the **same name** never overlap.
467        #[wasm_bindgen_test]
468        async fn navigator_serialises_same_name() {
469            let lock = DefaultLock::default();
470            let mut ts = vec![];
471
472            for _ in 0..3 {
473                let l = lock.clone();
474                ts.push(async move {
475                    let (start, end) = l
476                        .with_lock("fifo", Duration::from_secs(1), || async {
477                            let st = micros();
478                            TimeoutFuture::new(20).await;
479                            let et = micros();
480                            (st, et)
481                        })
482                        .await
483                        .unwrap();
484                    (start, end)
485                });
486            }
487
488            let mut spans = futures::future::join_all(ts).await;
489            spans.sort_by_key(|s| s.0);
490            for pair in spans.windows(2) {
491                assert!(
492                    pair[1].0 >= pair[0].1,
493                    "critical-sections overlapped in the browser backend"
494                );
495            }
496        }
497
498        /// 4️⃣ Locks with **different names** can overlap.
499        #[wasm_bindgen_test]
500        async fn navigator_distinct_names_overlap() {
501            let a = DefaultLock::default();
502            let b = a.clone(); // same backend, different name
503
504            let t1 = a.with_lock("A", Duration::from_secs(1), || async {
505                TimeoutFuture::new(30).await;
506                micros()
507            });
508
509            let t2 = b.with_lock("B", Duration::from_secs(1), || async {
510                TimeoutFuture::new(5).await;
511                micros()
512            });
513
514            let (end_a, end_b) = futures::join!(t1, t2);
515            // If they overlapped, B should finish before A completes its 30 ms sleep
516            assert!(end_b.unwrap() < end_a.unwrap());
517        }
518    }
519}