Skip to main content

grex_core/
pack_lock.rs

1//! Per-pack `.grex-lock` file lock — feat-m6-2.
2//!
3//! Acquires an exclusive `fd-lock` guard on `<pack_path>/.grex-lock` for the
4//! full duration of a pack-type plugin lifecycle method. Prevents two
5//! concurrent tasks (in-process) or processes (cross-process) from operating
6//! on the same pack at the same time.
7//!
8//! ## Lock ordering (tier 3 of 5)
9//!
10//! The spec fixes the global acquire order as:
11//!
12//! 1. workspace-sync lock (`<workspace>/.grex.sync.lock`)
13//! 2. scheduler semaphore permit (feat-m6-1)
14//! 3. **per-pack `.grex-lock`** — this module
15//! 4. per-repo backend lock (`<dest>.grex-backend.lock`)
16//! 5. manifest RW lock (`.grex/events.jsonl` sidecar)
17//!
18//! Plugins acquire tier 2 (permit) and tier 3 (pack lock) in that order
19//! inside every `PackTypePlugin` method. In debug builds, [`with_tier`]
20//! enforces strictly-increasing tiers on a thread-local stack; a reversal
21//! panics in debug and logs `tracing::error!` in release.
22//!
23//! ## In-process vs cross-process serialisation
24//!
25//! `fd-lock`'s `write()` is synchronous and blocks the calling OS thread
26//! until the kernel flock is free. Calling it directly inside an async
27//! plugin method would block a tokio worker thread; with a multi-thread
28//! runtime this scales poorly, and recursive re-entry on the same pack
29//! (meta-plugin walking into a symlinked child that points back at the
30//! parent) hangs the task outright because the second `write()` waits on
31//! the first, which is still on-stack.
32//!
33//! To avoid both problems we layer a process-wide async mutex keyed by
34//! canonical pack path **in front of** the fd-lock acquire:
35//!
36//! * [`PackLock::acquire_async`] first takes the canonical-path mutex
37//!   (`tokio::sync::Mutex`), which serialises in-process tasks without
38//!   blocking workers and detects same-task re-entry as a
39//!   [`PackLockError::Busy`] via `try_lock`.
40//! * Inside the async mutex it calls the blocking fd-lock `write()` —
41//!   fast because the only remaining contention is cross-process, which
42//!   is rare.
43//! * On `Drop` it releases the fd-lock guard first, then the async
44//!   mutex guard — reverse acquire order.
45
46#![allow(unsafe_code)]
47
48use std::collections::HashMap;
49use std::fs::{File, OpenOptions};
50use std::io;
51use std::path::{Path, PathBuf};
52use std::sync::{Arc, Mutex, OnceLock};
53
54use fd_lock::{RwLock, RwLockWriteGuard};
55
56// feat-v1.2.5 A3 — pool deadlock guard (debug-only, cross-thread).
57//
58// `HELD_PACK_LOCKS` is a *process-global* set of `(canonical_pack_path,
59// thread_id)` tuples currently held. The set is consulted from rayon
60// worker threads inside `phase3_recurse` (walker.rs) at the entry of
61// every parallel-iter closure to assert the lock-acquisition order
62// documented in `.omne/cfg/concurrency.md` §"Five cooperating
63// mechanisms" — namely that a `PackLock` is never held by a thread
64// while that same thread enters a nested `pool.install` closure. The
65// v1.2.4 design used a `thread_local!` here, but rayon's work-stealing
66// scheduler routes the assertion onto worker threads whose TL would
67// always be empty (the lock was acquired on the *outer* tokio worker,
68// not on the rayon worker). The global tuple-set fixes that gap so
69// the assertion actually fires when the Coffman cycle is reproducible.
70// Release builds compile this out entirely.
71//
72// Pairs with `POOL_INSTALL_DEPTH` (per-OS-thread, owned by
73// `scheduler.rs`): each `PackLock` acquire asserts the *current
74// thread* is not inside a `pool.install` closure, so a worker thread
75// inside a `pool.install` cannot newly acquire a `PackLock` during the
76// closure body either. See feat-v1.2.5 design.md §A3.
77#[cfg(debug_assertions)]
78use std::collections::HashSet;
79#[cfg(debug_assertions)]
80use std::thread::ThreadId;
81
82// W3 (feat-v1.2.5) landed `crate::scheduler::POOL_INSTALL_DEPTH` as a
83// per-thread `RefCell<usize>` driven by `PoolInstallDepthGuard` around
84// every `pool.install` boundary in `phase3_recurse` (see scheduler.rs
85// §A3). We import it here so the deadlock assertion in
86// `register_held_lock` can verify, at every `PackLock` acquire, that
87// the current OS thread is NOT inside a rayon `pool.install` closure.
88#[cfg(debug_assertions)]
89use crate::scheduler::POOL_INSTALL_DEPTH;
90
91/// Process-global set of `(canonical_pack_path, owning_thread_id)`
92/// tuples currently held by `PackLock` acquires anywhere in the
93/// process. Inspected by rayon worker threads inside `phase3_recurse`
94/// (walker.rs) before each per-child closure body to assert the
95/// lock-acquisition order. See feat-v1.2.5 design.md §A3.
96///
97/// Storage shape: `OnceLock<Mutex<HashSet<...>>>` rather than
98/// `LazyLock<...>` because the workspace MSRV is 1.79 and `LazyLock`
99/// stabilised in 1.80. The behavior is identical — the inner
100/// `HashSet` is constructed on first `held_pack_locks_storage()`
101/// access via `OnceLock::get_or_init` and shared from then on.
102///
103/// Contract: a tuple is inserted in `register_held_lock` after the
104/// pool-install-depth-zero precondition is asserted, and removed in
105/// `unregister_held_lock` from `PackLock::Drop`. The thread-id half
106/// of the tuple identifies which thread is responsible for the
107/// outstanding hold so the assertion can target "current thread holds
108/// any pack lock" without false positives from other concurrent acquires.
109#[cfg(debug_assertions)]
110fn held_pack_locks_storage() -> &'static Mutex<HashSet<(PathBuf, ThreadId)>> {
111    static REG: OnceLock<Mutex<HashSet<(PathBuf, ThreadId)>>> = OnceLock::new();
112    REG.get_or_init(|| Mutex::new(HashSet::new()))
113}
114
115/// Returns true iff the current OS thread is NOT inside any rayon
116/// `pool.install` boundary (i.e. `POOL_INSTALL_DEPTH == 0`). Used by
117/// `register_held_lock` to enforce the design.md §A3 lock-acquisition
118/// order: a `PackLock` may never be acquired from inside a
119/// `pool.install` closure (otherwise the worker can deadlock against
120/// itself when a nested `pool.install` re-enters the same thread).
121#[cfg(debug_assertions)]
122#[inline]
123fn pool_install_depth_is_zero() -> bool {
124    POOL_INSTALL_DEPTH.with(|d| *d.borrow() == 0)
125}
126
127/// Register the canonical pack-lock path against the current OS
128/// thread in the global `HELD_PACK_LOCKS` set. Asserts the
129/// pool-install depth invariant before insertion. Debug-only.
130#[cfg(debug_assertions)]
131fn register_held_lock(path: &Path) {
132    assert!(
133        pool_install_depth_is_zero(),
134        "pool deadlock guard: attempted to acquire PackLock({}) while \
135         inside a `pool.install` boundary — this risks the rayon \
136         re-entrancy deadlock pinned by feat-v1.2.5 design.md §A3 \
137         (see .omne/cfg/concurrency.md §\"Five cooperating mechanisms\")",
138        path.display()
139    );
140    let tid = std::thread::current().id();
141    held_pack_locks_storage()
142        .lock()
143        .expect("HELD_PACK_LOCKS poisoned — prior panic in deadlock-guard bookkeeping")
144        .insert((path.to_path_buf(), tid));
145}
146
147/// Remove the canonical pack-lock path for the current OS thread from
148/// the global held-pack-locks registry. Called from `PackLock::Drop`.
149/// Debug-only. Idempotent — extra calls (e.g. from a `PackLock` that
150/// was opened but never acquired) are no-ops.
151#[cfg(debug_assertions)]
152fn unregister_held_lock(path: &Path) {
153    let tid = std::thread::current().id();
154    if let Ok(mut held) = held_pack_locks_storage().lock() {
155        held.remove(&(path.to_path_buf(), tid));
156    }
157}
158
159/// Debug-only inspector: snapshot of pack lock paths currently held by
160/// the *current* OS thread. Used by the per-`pool.install`-closure
161/// assertion in `phase3_recurse` (walker.rs §A3) and by the
162/// deadlock-guard tests. Available under `#[cfg(debug_assertions)]`
163/// (not just `#[cfg(test)]`) because the walker production code path
164/// itself reads it inside a `debug_assert!` block.
165#[cfg(debug_assertions)]
166pub(crate) fn held_pack_locks_for_test() -> Vec<PathBuf> {
167    let tid = std::thread::current().id();
168    held_pack_locks_storage()
169        .lock()
170        .expect("HELD_PACK_LOCKS poisoned — prior panic in deadlock-guard bookkeeping")
171        .iter()
172        .filter(|(_, t)| *t == tid)
173        .map(|(p, _)| p.clone())
174        .collect()
175}
176
177/// Test-only reset: clear the held-pack-locks set entries owned by the
178/// current OS thread. Used by tests that want to start from a clean
179/// slate without constructing/dropping real `PackLock` guards. Gated on
180/// `debug_assertions` because the underlying global is itself
181/// debug-only.
182#[cfg(all(test, debug_assertions))]
183#[allow(dead_code)]
184pub(crate) fn clear_held_pack_locks_for_test() {
185    let tid = std::thread::current().id();
186    if let Ok(mut held) = held_pack_locks_storage().lock() {
187        held.retain(|(_, t)| *t != tid);
188    }
189}
190
191/// Test-only injector: simulate having acquired a `PackLock` on the
192/// given path WITHOUT going through the real lock-file machinery (so
193/// the deadlock-guard tests in `walker.rs` can stage a "lock held"
194/// precondition without touching the filesystem). The current
195/// thread's id is recorded as the owner. Bypasses the
196/// `pool_install_depth_is_zero` assertion in `register_held_lock` —
197/// callers MUST run this BEFORE entering any `pool.install` boundary.
198/// Gated on `debug_assertions` because the underlying global is
199/// itself debug-only.
200#[cfg(all(test, debug_assertions))]
201pub(crate) fn register_pack_lock_for_test(path: &Path) {
202    let tid = std::thread::current().id();
203    held_pack_locks_storage()
204        .lock()
205        .expect("HELD_PACK_LOCKS poisoned — prior panic in deadlock-guard bookkeeping")
206        .insert((path.to_path_buf(), tid));
207}
208
209/// Test-only remover: undo a `register_pack_lock_for_test` injection
210/// on the current OS thread without going through the real
211/// `PackLock::Drop` path. Used by the F1 deadlock-guard test in
212/// `walker.rs` to keep the global held-pack-locks registry clean
213/// across cargo-test thread reuse so a retry on the same OS thread
214/// starts fresh. Gated on `debug_assertions` because the underlying
215/// global is itself debug-only.
216#[cfg(all(test, debug_assertions))]
217pub(crate) fn unregister_pack_lock_for_test(path: &Path) {
218    let tid = std::thread::current().id();
219    if let Ok(mut held) = held_pack_locks_storage().lock() {
220        held.remove(&(path.to_path_buf(), tid));
221    }
222}
223
224/// Stable name of the per-pack lock file created inside every pack root.
225/// Exported so the managed-gitignore writer can hide it from `git status`.
226pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
227
228/// Error surfaced by [`PackLock::open`], [`PackLock::acquire`], and
229/// [`PackLock::try_acquire`].
230///
231/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
232/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`].
233#[non_exhaustive]
234#[derive(Debug, thiserror::Error)]
235pub enum PackLockError {
236    /// I/O error opening or locking the sidecar file.
237    #[error("pack lock i/o on `{}`: {source}", path.display())]
238    Io {
239        /// Resolved `<pack_path>/.grex-lock` path.
240        path: PathBuf,
241        /// Underlying OS error.
242        #[source]
243        source: io::Error,
244    },
245    /// Non-blocking probe returned busy. The blocking path
246    /// ([`PackLock::acquire_async`]) never produces this for cross-pack
247    /// contention — it waits. Emitted by:
248    ///
249    /// * [`PackLock::try_acquire`] on any contention.
250    /// * [`PackLock::acquire_async`] on same-process re-entry (a plugin
251    ///   that recurses back into the same pack root). Cross-task
252    ///   contention blocks on the async mutex and never surfaces here.
253    #[error("pack lock `{}` is busy", path.display())]
254    Busy {
255        /// Lock path that was contended.
256        path: PathBuf,
257    },
258}
259
260/// Outcome of [`PackLock::acquire_cancellable`] — either the
261/// underlying lock acquire failed, or the supplied cancellation token
262/// fired before the guard was returned. Distinct from
263/// [`crate::scheduler::Cancelled`]: that ZST signals semaphore-permit
264/// cancellation; this variant signals pack-lock cancellation. Verb
265/// bodies translate either into the same `PluginError::Cancelled` at
266/// the call site (feat-m7-1 Stages 6-7).
267#[non_exhaustive]
268#[derive(Debug, thiserror::Error)]
269pub enum PackLockErrorOrCancelled {
270    /// The cancellation token fired before the lock was acquired.
271    /// The spawned blocking thread (if launched) may still be parked
272    /// in `fd_lock::write()` — see [`PackLock::acquire_cancellable`]
273    /// for the OS-thread leak-window contract.
274    #[error("pack lock acquire cancelled")]
275    Cancelled,
276    /// The underlying lock acquire failed before cancellation fired.
277    #[error(transparent)]
278    Lock(#[from] PackLockError),
279}
280
281use std::sync::Weak;
282
283fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
284    static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
285    REG.get_or_init(|| Mutex::new(HashMap::new()))
286}
287
288/// feat-m6 H3 — prune entries whose only remaining reference was the
289/// registry itself. Called opportunistically on every `mutex_for` so
290/// long-running processes that open many distinct pack paths do not
291/// accumulate unbounded registry entries. Runs under the registry
292/// mutex so there is no race against a concurrent `mutex_for`.
293fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
294    reg.retain(|_, weak| weak.strong_count() > 0);
295}
296
297fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
298    let mut reg = path_mutex_registry()
299        .lock()
300        .expect("pack lock path registry poisoned — this indicates a prior panic");
301    // Try to reuse an existing live entry. If the Weak is dead
302    // (no outstanding holders) fall through to insert a fresh Arc.
303    if let Some(weak) = reg.get(canonical) {
304        if let Some(existing) = weak.upgrade() {
305            return existing;
306        }
307    }
308    prune_dead(&mut reg);
309    let m = Arc::new(tokio::sync::Mutex::new(()));
310    reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
311    m
312}
313
314fn canonical_or_raw(path: &Path) -> PathBuf {
315    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
316}
317
318/// Per-pack file lock wrapper.
319///
320/// Construction via [`PackLock::open`] creates (or re-opens) the sidecar
321/// `<pack_path>/.grex-lock` but does **not** acquire the lock — call
322/// [`PackLock::acquire_async`] for the async-safe blocking path or
323/// [`PackLock::try_acquire`] for a fail-fast probe.
324///
325/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
326/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`]. The sync
327/// blocking variant will be removed in v1.3.0.
328pub struct PackLock {
329    inner: RwLock<File>,
330    path: PathBuf,
331    canonical: PathBuf,
332}
333
334impl PackLock {
335    /// Open (and create if missing) the sidecar `<pack_path>/.grex-lock`.
336    /// Does **not** acquire the lock.
337    ///
338    /// # Errors
339    ///
340    /// Returns [`PackLockError::Io`] if the sidecar cannot be opened or
341    /// its parent directory cannot be created.
342    pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
343        let path = pack_path.join(PACK_LOCK_FILE_NAME);
344        if let Some(parent) = path.parent() {
345            std::fs::create_dir_all(parent)
346                .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
347        }
348        let file = OpenOptions::new()
349            .read(true)
350            .write(true)
351            .create(true)
352            .truncate(false)
353            .open(&path)
354            .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
355        let canonical = canonical_or_raw(pack_path);
356        Ok(Self { inner: RwLock::new(file), path, canonical })
357    }
358
359    /// Async acquire — pairs a process-wide [`tokio::sync::Mutex`] keyed
360    /// by canonical pack path with the sidecar `fd-lock`. Safe to call
361    /// from any tokio worker without blocking the runtime. Same-thread
362    /// re-entry (nested synchronous call chain that re-enters the same
363    /// pack root, e.g. meta-plugin recursion through a `..` child that
364    /// points back at the parent) returns [`PackLockError::Busy`] rather
365    /// than deadlocking.
366    ///
367    /// The returned [`PackLockHold`] drops the fd-lock guard and the
368    /// async mutex guard in reverse acquire order at end-of-scope.
369    ///
370    /// # Errors
371    ///
372    /// * [`PackLockError::Busy`] on same-thread re-entry.
373    /// * [`PackLockError::Io`] on any OS-level lock failure.
374    pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
375        // Same-thread re-entry detection — see module-level note on
376        // tokio tasks and thread affinity. This covers the common
377        // case where a nested plugin call runs on the same worker
378        // thread between `.await` points (meta-plugin recursion).
379        // Different threads holding the same pack root's mutex will
380        // queue on `lock_owned().await` below instead.
381        // Serialise in-process tasks on the canonical path via an
382        // async mutex — safe across tokio workers and non-blocking on
383        // the async runtime. Same-task re-entry (recursive plugin
384        // invocation on the same pack root) is the caller's
385        // responsibility to prevent via cycle detection; a same-task
386        // re-entry here would hang at `lock_owned().await` because
387        // tokio mutexes are non-reentrant.
388        //
389        // [`crate::plugin::pack_type::MetaPlugin`] threads the
390        // `visited_meta` set through recursion and inserts the pack
391        // root at every lifecycle entry so cycles halt with
392        // [`crate::execute::ExecError::MetaCycle`] before this mutex
393        // acquire runs. Other pack-type plugins are leaf by design
394        // (declarative, scripted) and cannot re-enter.
395        let mtx = mutex_for(&self.canonical);
396        let mutex_guard = Arc::clone(&mtx).lock_owned().await;
397
398        // Box `self` so its address is stable for the transmuted
399        // `'static` guard lifetime. Take the fd-lock guard from the
400        // boxed lock's `inner`.
401        let boxed = Box::new(self);
402        // feat-m6 H1 — `fd_lock::RwLock::write` is a synchronous
403        // blocking call that waits on the kernel flock. Running it
404        // directly on a tokio worker would park that worker until
405        // the kernel released the lock, starving the runtime when
406        // the only remaining contention is cross-process. Hop onto
407        // the blocking-thread pool so async workers stay free. The
408        // acquire happens inside `spawn_blocking` and the guard is
409        // transmuted to `'static` before leaving the closure so the
410        // box + guard can be returned as a pair.
411        let join = tokio::task::spawn_blocking(
412            move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
413                let mut boxed = boxed;
414                // SAFETY: see outer comment block — `boxed` is moved
415                // into the returned pair and never freed while the
416                // guard is live; field order in `PackLockHold` makes
417                // the guard drop first. Transmuting here (inside the
418                // closure) lets us return both the box and the guard.
419                let guard_ref = boxed
420                    .inner
421                    .write()
422                    .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
423                let guard_static: RwLockWriteGuard<'static, File> = unsafe {
424                    std::mem::transmute::<
425                        RwLockWriteGuard<'_, File>,
426                        RwLockWriteGuard<'static, File>,
427                    >(guard_ref)
428                };
429                Ok((boxed, guard_static))
430            },
431        )
432        .await;
433        let (boxed, guard_static) = match join {
434            Ok(res) => res?,
435            Err(join_err) => {
436                return Err(PackLockError::Io {
437                    path: PathBuf::new(),
438                    source: io::Error::other(join_err.to_string()),
439                });
440            }
441        };
442
443        // feat-v1.2.5 A3 — debug-only deadlock guard registration.
444        // Asserts no `pool.install` is on this OS thread's stack and
445        // records the canonical pack path in `HELD_PACK_LOCKS`. The
446        // matching `unregister_held_lock` runs in `PackLock::Drop`
447        // which fires from `PackLockHold::Drop` (the boxed `_lock`
448        // field is the last to drop after the fd-lock guard releases).
449        #[cfg(debug_assertions)]
450        register_held_lock(&boxed.canonical);
451
452        Ok(PackLockHold {
453            _fd_guard: Some(guard_static),
454            _mutex_guard: Some(mutex_guard),
455            _lock: boxed,
456        })
457    }
458
459    /// Cancellable async acquire — same semantics as
460    /// [`PackLock::acquire_async`] but races the acquire against a
461    /// [`tokio_util::sync::CancellationToken`]. Used by the embedded
462    /// MCP server (feat-m7-1) so a `notifications/cancelled` from the
463    /// client unblocks tool handlers that are parked on a contended
464    /// pack lock.
465    ///
466    /// **Consumes `self`** to mirror [`PackLock::acquire_async`] — the
467    /// same boxed-fd + transmute lifetime dance is needed to hand the
468    /// guard back across a `spawn_blocking` boundary, and reusing the
469    /// consumes-self contract preserves drop ordering against
470    /// [`PackLockHold`].
471    ///
472    /// ## OS-thread leak window — contract
473    ///
474    /// `fd_lock::write()` is a synchronous syscall that parks the
475    /// calling OS thread until the kernel releases the flock. Once the
476    /// blocking call has been launched on the
477    /// [`tokio::task::spawn_blocking`] pool, **the runtime cannot
478    /// interrupt it** — there is no portable way to unpark a thread
479    /// blocked in `flock(2)`. When the cancellation token fires we
480    /// resolve the outer `select!` with [`PackLockErrorOrCancelled::Cancelled`]
481    /// immediately, but the spawned OS thread stays parked until the
482    /// holder eventually releases. When that happens, the spawned
483    /// thread acquires the guard, the `JoinHandle` resolves to
484    /// `Ok((boxed, guard))`, and the tuple is dropped on the spot
485    /// (because the `select!` arm has already won) — at which point
486    /// the guard's `Drop` releases the kernel flock and a subsequent
487    /// acquirer can proceed.
488    ///
489    /// In other words: **cancellation is observable to the caller
490    /// instantly, but the underlying OS thread holds the lock briefly
491    /// past the cancel point, until the syscall returns**. Callers
492    /// that immediately re-attempt acquire on the same path may see
493    /// transient contention until that thread drains. See
494    /// `.omne/cfg/mcp.md` §Cancellation.
495    ///
496    /// # Errors
497    ///
498    /// * [`PackLockErrorOrCancelled::Cancelled`] — the token fired
499    ///   before a guard was returned.
500    /// * [`PackLockErrorOrCancelled::Lock`] wrapping
501    ///   [`PackLockError::Busy`] on same-thread re-entry, or
502    ///   [`PackLockError::Io`] on any OS-level lock failure.
503    pub async fn acquire_cancellable(
504        self,
505        cancel: &::tokio_util::sync::CancellationToken,
506    ) -> Result<PackLockHold, PackLockErrorOrCancelled> {
507        // Mirror `acquire_async`: serialise on the canonical-path
508        // async mutex first. Race the mutex acquire itself against
509        // cancel — same-task re-entry would normally hang here, but
510        // the cancel arm gives the caller an out.
511        let mtx = mutex_for(&self.canonical);
512        let mutex_guard = tokio::select! {
513            biased;
514            () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
515            g = Arc::clone(&mtx).lock_owned() => g,
516        };
517
518        // Box `self` so the address is stable for the transmuted
519        // `'static` guard lifetime — same dance as `acquire_async`.
520        let boxed = Box::new(self);
521        // Capture the sidecar path before the move into the closure
522        // so the JoinError arm below can report it (the closure
523        // consumes `boxed`, so we cannot read it from there).
524        let join_err_path = boxed.path.clone();
525        // feat-m7-1 — replicates the `acquire_async` blocking-pool
526        // hop. The closure body is intentionally identical (do NOT
527        // refactor — see the SAFETY note in `acquire_async`).
528        let join = tokio::task::spawn_blocking(
529            move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
530                let mut boxed = boxed;
531                // SAFETY: see `acquire_async` — `boxed` is moved into
532                // the returned pair and never freed while the guard
533                // is live; field order in `PackLockHold` makes the
534                // guard drop first.
535                let guard_ref = boxed
536                    .inner
537                    .write()
538                    .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
539                let guard_static: RwLockWriteGuard<'static, File> = unsafe {
540                    std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
541                        guard_ref,
542                    )
543                };
544                Ok((boxed, guard_static))
545            },
546        );
547
548        // Race the blocking acquire against the cancellation token.
549        // If cancel wins, the JoinHandle is dropped on the spot — the
550        // spawned OS thread stays parked in `fd_lock::write()` until
551        // the kernel releases, at which point the returned tuple is
552        // dropped (see contract note above) and the flock is freed.
553        let join = tokio::select! {
554            biased;
555            () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
556            res = join => res,
557        };
558
559        let (boxed, guard_static) = match join {
560            Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
561            Err(join_err) => {
562                return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
563                    path: join_err_path,
564                    source: io::Error::other(join_err.to_string()),
565                }));
566            }
567        };
568
569        // feat-v1.2.5 A3 — debug-only deadlock guard registration
570        // (same contract as `acquire_async`; see comment there).
571        #[cfg(debug_assertions)]
572        register_held_lock(&boxed.canonical);
573
574        Ok(PackLockHold {
575            _fd_guard: Some(guard_static),
576            _mutex_guard: Some(mutex_guard),
577            _lock: boxed,
578        })
579    }
580
581    /// Thread-blocking acquire (no tokio integration). Waits on the
582    /// fd-lock synchronously. Suitable for synchronous call sites only
583    /// — async plugin methods MUST use [`PackLock::acquire_async`] to
584    /// avoid blocking a tokio worker.
585    ///
586    /// Returns a borrowed [`RwLockWriteGuard`]; the caller owns both
587    /// the outer [`PackLock`] and the guard in scope. Mirrors the
588    /// [`crate::fs::ScopedLock`] shape.
589    ///
590    /// # Errors
591    ///
592    /// Returns [`PackLockError::Io`] if the OS lock call fails.
593    ///
594    /// # Deprecation
595    ///
596    /// Implementation strategy: restored as the original direct
597    /// blocking `fd_lock::write()` call (single-line body, no
598    /// behaviour change vs v1.2.3) — simpler and safer than a
599    /// busy-wait shim, since `fd-lock` already parks the calling OS
600    /// thread on the kernel flock and a busy-wait would burn CPU
601    /// without engaging the kernel waiter queue.
602    #[deprecated(
603        since = "1.2.4",
604        note = "use `acquire_async` for async contexts or `try_acquire` for non-blocking; sync `acquire` will be removed in v1.3.0"
605    )]
606    pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
607        let guard = self
608            .inner
609            .write()
610            .map_err(|source| PackLockError::Io { path: self.path.clone(), source })?;
611        // feat-v1.2.5 A3 — debug-only deadlock guard registration.
612        // Borrowed-guard fns (`acquire`, `try_acquire`) cannot
613        // intercept their guard's `Drop` (it's an `fd_lock` type), so
614        // the unregister hook lives in `PackLock::Drop` below. The
615        // typical usage pattern (`let mut p = PackLock::open(); let
616        // _g = p.acquire();`) drops `_g` first (releasing the OS
617        // flock) then `p` (closing the file + unregistering on this
618        // OS thread), which keeps `HELD_PACK_LOCKS` accurate for the
619        // duration the OS-level lock is held.
620        #[cfg(debug_assertions)]
621        register_held_lock(&self.canonical);
622        Ok(guard)
623    }
624
625    /// Non-blocking probe: return [`PackLockError::Busy`] instead of
626    /// waiting when another holder has the lock. Does not engage the
627    /// async mutex — purely a fail-fast diagnostics hook.
628    ///
629    /// # Errors
630    ///
631    /// * [`PackLockError::Busy`] when a concurrent holder owns the lock.
632    /// * [`PackLockError::Io`] on any other OS-level lock failure.
633    pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
634        match self.inner.try_write() {
635            Ok(g) => {
636                // feat-v1.2.5 A3 — debug-only deadlock guard
637                // registration. Same borrowed-guard caveat as
638                // `acquire`: unregister fires from `PackLock::Drop`.
639                #[cfg(debug_assertions)]
640                register_held_lock(&self.canonical);
641                Ok(g)
642            }
643            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
644                Err(PackLockError::Busy { path: self.path.clone() })
645            }
646            Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
647        }
648    }
649
650    /// Sidecar path — `<pack_path>/.grex-lock`.
651    #[must_use]
652    pub fn path(&self) -> &Path {
653        &self.path
654    }
655}
656
657impl std::fmt::Debug for PackLock {
658    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
659        f.debug_struct("PackLock").field("path", &self.path).finish()
660    }
661}
662
663// feat-v1.2.5 A3 — debug-only deadlock guard unregistration. Fires
664// when a `PackLock` is dropped, whether directly (sync `acquire` /
665// `try_acquire` call sites that own the `PackLock` in scope) or
666// transitively from `PackLockHold::Drop` (the boxed `_lock` field
667// drops last, after the fd-lock guard releases). The unregister is
668// idempotent and safe even if the lock was never acquired (e.g. an
669// `open` that never called any `acquire*`).
670#[cfg(debug_assertions)]
671impl Drop for PackLock {
672    fn drop(&mut self) {
673        unregister_held_lock(&self.canonical);
674    }
675}
676
677/// RAII guard returned by [`PackLock::acquire_async`]. Holds the
678/// sidecar-file `fd-lock` guard plus the process-wide async mutex
679/// guard. Drops in reverse acquire order.
680#[repr(C)]
681pub struct PackLockHold {
682    // Field order is load-bearing: `_fd_guard` drops first (releasing
683    // the kernel flock), then `_mutex_guard` (releasing the async
684    // serialisation slot), then `_lock` (closing the file handle).
685    // `#[repr(C)]` pins source order to layout order so `offset_of!`
686    // assertions below stay meaningful.
687    _fd_guard: Option<RwLockWriteGuard<'static, File>>,
688    _mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
689    _lock: Box<PackLock>,
690}
691
692impl PackLockHold {
693    /// Sidecar path for diagnostics.
694    #[must_use]
695    pub fn path(&self) -> &Path {
696        self._lock.path()
697    }
698}
699
700impl std::fmt::Debug for PackLockHold {
701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702        f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
703    }
704}
705
706// Field-order static assertion (feat-m6 B3) — the safety argument for the
707// unsafe lifetime extension in `acquire_async` depends on `_fd_guard`
708// dropping before `_lock`. Rust drops struct fields in declaration order,
709// so `_fd_guard` must sit at the lowest offset, then `_mutex_guard`, then
710// `_lock`. A refactor that reorders these fields would silently break the
711// Drop ordering and the transmuted `'static` borrow would outlive its box.
712const _: () = {
713    assert!(
714        std::mem::offset_of!(PackLockHold, _fd_guard)
715            < std::mem::offset_of!(PackLockHold, _mutex_guard),
716        "PackLockHold field order: _fd_guard must precede _mutex_guard"
717    );
718    assert!(
719        std::mem::offset_of!(PackLockHold, _mutex_guard)
720            < std::mem::offset_of!(PackLockHold, _lock),
721        "PackLockHold field order: _mutex_guard must precede _lock"
722    );
723};
724
725impl Drop for PackLockHold {
726    fn drop(&mut self) {
727        // Explicit take() on fd-lock guard first — the transmuted
728        // `'static` lifetime must expire before `_lock` drops.
729        self._fd_guard.take();
730        self._mutex_guard.take();
731        // `_lock` drops last when the struct itself drops, closing the
732        // underlying file handle.
733    }
734}
735
736// ---------------------------------------------------------------------------
737// Lock-ordering enforcement (debug builds).
738// ---------------------------------------------------------------------------
739
740/// Lock tier ordinals matching `.omne/cfg/concurrency.md`. Acquisitions
741/// must strictly increase; reversed order risks the deadlock class the
742/// feat-m6-3 Lean proof rules out.
743#[non_exhaustive]
744#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
745pub enum Tier {
746    /// Workspace sync lock — `<workspace>/.grex.sync.lock`.
747    WorkspaceSync = 0,
748    /// Scheduler semaphore permit — feat-m6-1.
749    Semaphore = 1,
750    /// Per-pack `.grex-lock` — feat-m6-2 (this module).
751    PerPack = 2,
752    /// Per-repo backend lock — `<dest>.grex-backend.lock`.
753    Backend = 3,
754    /// Manifest RW lock — `.grex/events.jsonl` sidecar.
755    Manifest = 4,
756}
757
758/// Run `f` while `tier` is pushed on the current thread's tier stack.
759/// Debug builds enforce strictly-increasing order across nested calls;
760/// release builds skip the check entirely.
761#[cfg(debug_assertions)]
762pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
763    tier::push(tier);
764    let out = f();
765    tier::pop_if_top(tier);
766    out
767}
768
769/// Release-build no-op mirror of [`with_tier`].
770#[cfg(not(debug_assertions))]
771#[inline]
772pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
773    f()
774}
775
776/// Wrap an async future in a task-local tier-stack scope so any
777/// [`TierGuard`] pushes inside it land in the correct frame even when
778/// the task migrates across tokio workers after `.await`. Release
779/// builds compile this down to the raw future — no scope, no cost.
780///
781/// Callers should wrap every top-level async dispatch (e.g. the
782/// per-pack plugin lifecycle calls driven by `rt.block_on(...)`) so
783/// the tier check can operate on a fresh stack per dispatch.
784#[cfg(debug_assertions)]
785pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
786    tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
787}
788
789/// Release-build no-op mirror of [`with_tier_scope`].
790#[cfg(not(debug_assertions))]
791#[inline]
792pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
793    f.await
794}
795
796/// RAII guard — pushes a tier onto the current-thread stack on
797/// construction, pops on drop. Lets lifecycle prologues enforce
798/// tier ordering across `.await` points without nesting the rest of
799/// the body inside a `with_tier` closure. Debug builds carry the
800/// ordering check; release builds compile to a zero-sized no-op.
801///
802/// Field/declaration-order note: callers must declare the guard
803/// **before** the permit/hold it is scoping. Rust drops locals in
804/// reverse declaration order, so `_tier` declared first drops last —
805/// after the lock/permit releases — matching `with_tier` semantics.
806#[must_use]
807pub struct TierGuard {
808    #[cfg(debug_assertions)]
809    tier: Tier,
810    // Sized placeholder for release so the type is still `Sized` and
811    // `must_use`-meaningful. Zero-sized once inlined.
812    #[cfg(not(debug_assertions))]
813    _private: (),
814}
815
816impl TierGuard {
817    /// Push `tier` onto the current-thread tier stack. Debug builds
818    /// assert strictly-increasing order against the existing top.
819    #[cfg(debug_assertions)]
820    pub fn push(tier: Tier) -> Self {
821        tier::push(tier);
822        TierGuard { tier }
823    }
824
825    /// Release-build no-op constructor.
826    #[cfg(not(debug_assertions))]
827    #[inline]
828    pub fn push(_tier: Tier) -> Self {
829        TierGuard { _private: () }
830    }
831}
832
833#[cfg(debug_assertions)]
834impl Drop for TierGuard {
835    fn drop(&mut self) {
836        tier::pop_if_top(self.tier);
837    }
838}
839
840#[cfg(debug_assertions)]
841pub(crate) mod tier {
842    use super::Tier;
843    use std::cell::RefCell;
844
845    // feat-m6 CI fix — previously this used `thread_local!`, but under a
846    // tokio multi-thread runtime a task can resume on a different worker
847    // after `.await`. A push on worker A followed by a yield and a pop on
848    // worker B left A's stack polluted and tripped the tier-ordering
849    // assertion on the next acquire. Migrating to `tokio::task_local!`
850    // pins the stack to the *task*, not the worker, so nested
851    // `TierGuard` bookkeeping follows the task across workers.
852    //
853    // `try_with` silently no-ops outside a `TIER_STACK.scope(...)`
854    // frame — that makes the module safe to use from synchronous
855    // (non-tokio) test harnesses and the module's own unit tests at
856    // the cost of debug-only tier enforcement being disabled there.
857    // Production dispatch wraps every pack-type plugin call in a scope
858    // (see `sync::dispatch_*`), so real runs retain enforcement.
859    tokio::task_local! {
860        pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
861    }
862
863    pub fn push(next: Tier) {
864        let _ = TIER_STACK.try_with(|s| {
865            let mut s = s.borrow_mut();
866            if let Some(&top) = s.last() {
867                assert!(
868                    next > top,
869                    "lock tier violation: trying to acquire {next:?} while already holding {top:?} \
870                     (tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
871                );
872            }
873            s.push(next);
874        });
875    }
876
877    pub fn pop_if_top(expected: Tier) {
878        let _ = TIER_STACK.try_with(|s| {
879            let mut s = s.borrow_mut();
880            if s.last().copied() == Some(expected) {
881                s.pop();
882            } else {
883                tracing::error!(
884                    target: "grex::concurrency",
885                    "tier pop mismatch: expected {:?} at top, stack = {:?}",
886                    expected,
887                    *s
888                );
889            }
890        });
891    }
892}
893
894// ---------------------------------------------------------------------------
895// Tests.
896// ---------------------------------------------------------------------------
897
898#[cfg(test)]
899#[allow(deprecated)] // exercising deprecated `PackLock::acquire` for back-compat
900mod tests {
901    use super::*;
902    use std::sync::{Arc, Barrier};
903    use std::thread;
904    use std::time::{Duration, Instant};
905    use tempfile::tempdir;
906    use tokio_util::sync::CancellationToken;
907
908    #[test]
909    fn pack_lock_acquires_creates_file() {
910        let dir = tempdir().unwrap();
911        let mut plock = PackLock::open(dir.path()).unwrap();
912        let expected = plock.path().to_path_buf();
913        let _guard = plock.acquire().unwrap();
914        assert!(expected.exists(), "open must create the sidecar file");
915        assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
916    }
917
918    #[test]
919    fn pack_lock_second_try_acquire_reports_busy_while_held() {
920        let dir = tempdir().unwrap();
921        let mut first = PackLock::open(dir.path()).unwrap();
922        let _held = first.acquire().unwrap();
923        let mut second = PackLock::open(dir.path()).unwrap();
924        let err = second.try_acquire().unwrap_err();
925        match err {
926            PackLockError::Busy { path } => {
927                assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
928            }
929            other => panic!("expected Busy, got {other:?}"),
930        }
931    }
932
933    #[test]
934    fn pack_lock_release_on_drop() {
935        let dir = tempdir().unwrap();
936        {
937            let mut first = PackLock::open(dir.path()).unwrap();
938            let _g = first.acquire().unwrap();
939        }
940        let mut second = PackLock::open(dir.path()).unwrap();
941        let _g = second.acquire().unwrap();
942    }
943
944    #[test]
945    fn pack_lock_path_contains_pack_path() {
946        let dir = tempdir().unwrap();
947        let plock = PackLock::open(dir.path()).unwrap();
948        let p = plock.path();
949        assert!(p.starts_with(dir.path()));
950        assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
951    }
952
953    #[test]
954    fn pack_lock_blocking_acquire_waits_for_holder() {
955        let dir = tempdir().unwrap();
956        let path = dir.path().to_path_buf();
957        let barrier = Arc::new(Barrier::new(2));
958        let holder_barrier = Arc::clone(&barrier);
959        let holder_path = path.clone();
960
961        let holder = thread::spawn(move || {
962            let mut lock = PackLock::open(&holder_path).unwrap();
963            let _g = lock.acquire().unwrap();
964            holder_barrier.wait();
965            thread::sleep(Duration::from_millis(100));
966        });
967
968        barrier.wait();
969        let start = Instant::now();
970        let mut second = PackLock::open(&path).unwrap();
971        let _g = second.acquire().unwrap();
972        let waited = start.elapsed();
973        holder.join().unwrap();
974        assert!(
975            waited >= Duration::from_millis(40),
976            "blocking acquire must have waited (observed {waited:?})"
977        );
978    }
979
980    #[test]
981    fn pack_lock_distinct_paths_do_not_contend() {
982        let a = tempdir().unwrap();
983        let b = tempdir().unwrap();
984        let mut la = PackLock::open(a.path()).unwrap();
985        let _ga = la.acquire().unwrap();
986        let mut lb = PackLock::open(b.path()).unwrap();
987        let _gb = lb.try_acquire().unwrap();
988    }
989
990    #[tokio::test]
991    async fn async_acquire_serialises_in_process() {
992        // Two concurrent acquire_async calls on the same pack path
993        // must serialise cleanly (no hang).
994        let dir = tempdir().unwrap();
995        let path = dir.path().to_path_buf();
996        let path_clone = path.clone();
997        let h1 = tokio::spawn(async move {
998            let lock = PackLock::open(&path).unwrap();
999            let _hold = lock.acquire_async().await.unwrap();
1000            tokio::time::sleep(Duration::from_millis(30)).await;
1001        });
1002        let h2 = tokio::spawn(async move {
1003            tokio::time::sleep(Duration::from_millis(5)).await;
1004            let lock = PackLock::open(&path_clone).unwrap();
1005            let _hold = lock.acquire_async().await.unwrap();
1006        });
1007        h1.await.unwrap();
1008        h2.await.unwrap();
1009    }
1010
1011    // --- tier ordering (debug-only) -----------------------------------------
1012
1013    // feat-m6 CI fix — tier enforcement now lives in a `tokio::task_local!`
1014    // stack, so these tests drive the check through a scoped task to
1015    // establish the frame. `try_with` outside a scope silently no-ops.
1016
1017    #[cfg(debug_assertions)]
1018    #[tokio::test]
1019    async fn tier_strictly_increasing_ok() {
1020        tier::TIER_STACK
1021            .scope(std::cell::RefCell::new(Vec::new()), async {
1022                with_tier(Tier::Semaphore, || {
1023                    with_tier(Tier::PerPack, || {
1024                        with_tier(Tier::Backend, || {
1025                            with_tier(Tier::Manifest, || {});
1026                        });
1027                    });
1028                });
1029            })
1030            .await;
1031    }
1032
1033    #[cfg(debug_assertions)]
1034    #[tokio::test]
1035    async fn tier_reversed_panics_in_debug() {
1036        use std::panic::{catch_unwind, AssertUnwindSafe};
1037        let result = tier::TIER_STACK
1038            .scope(std::cell::RefCell::new(Vec::new()), async {
1039                catch_unwind(AssertUnwindSafe(|| {
1040                    with_tier(Tier::PerPack, || {
1041                        with_tier(Tier::Semaphore, || {});
1042                    });
1043                }))
1044            })
1045            .await;
1046        assert!(result.is_err(), "reversed tier order must panic in debug builds");
1047    }
1048
1049    // --- acquire_cancellable (feat-m7-1 Stage 4) -----------------------------
1050
1051    /// 4.T1 — uncontended path returns Ok(PackLockHold).
1052    #[tokio::test]
1053    async fn acquire_cancellable_happy_path() {
1054        let dir = tempdir().unwrap();
1055        let lock = PackLock::open(dir.path()).unwrap();
1056        let token = CancellationToken::new();
1057        let result = lock.acquire_cancellable(&token).await;
1058        assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
1059    }
1060
1061    /// 4.T2 — task A holds the lock; task B's token fires after 10 ms;
1062    /// B must surface `Err(Cancelled)` within 50 ms.
1063    #[tokio::test]
1064    async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
1065        let dir = tempdir().unwrap();
1066        let path = dir.path().to_path_buf();
1067        let path_b = path.clone();
1068
1069        // Task A: acquire and hold for 500 ms (long enough to cover B's window).
1070        let a_started = Arc::new(tokio::sync::Notify::new());
1071        let a_started_clone = Arc::clone(&a_started);
1072        let a = tokio::spawn(async move {
1073            let lock = PackLock::open(&path).unwrap();
1074            let _hold = lock.acquire_async().await.unwrap();
1075            a_started_clone.notify_one();
1076            tokio::time::sleep(Duration::from_millis(500)).await;
1077        });
1078        a_started.notified().await;
1079
1080        let token = CancellationToken::new();
1081        let cancel_handle = token.clone();
1082        let canceller = tokio::spawn(async move {
1083            tokio::time::sleep(Duration::from_millis(10)).await;
1084            cancel_handle.cancel();
1085        });
1086
1087        let started = Instant::now();
1088        let lock_b = PackLock::open(&path_b).unwrap();
1089        let result =
1090            tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
1091                .await
1092                .expect("acquire_cancellable must return within 50 ms after cancel");
1093
1094        let waited = started.elapsed();
1095        assert!(
1096            matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
1097            "expected Err(Cancelled), got {result:?} after {waited:?}"
1098        );
1099
1100        canceller.await.unwrap();
1101        a.abort();
1102        let _ = a.await;
1103    }
1104
1105    // --- helpers for 4.T3 / 4.T4 -------------------------------------------
1106
1107    /// Spawn a "holder" task that acquires `path` and releases on
1108    /// signal. Returns `(JoinHandle, started_notify, release_notify)`.
1109    /// Caller awaits `started` to know the lock is held, then fires
1110    /// `release` when ready to let the holder drop its guard.
1111    fn spawn_holder(
1112        path: PathBuf,
1113    ) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
1114        let started = Arc::new(tokio::sync::Notify::new());
1115        let release = Arc::new(tokio::sync::Notify::new());
1116        let started_c = Arc::clone(&started);
1117        let release_c = Arc::clone(&release);
1118        let h = tokio::spawn(async move {
1119            let lock = PackLock::open(&path).unwrap();
1120            let _hold = lock.acquire_async().await.unwrap();
1121            started_c.notify_one();
1122            release_c.notified().await;
1123        });
1124        (h, started, release)
1125    }
1126
1127    /// Poll `acquire_async` against `path` until it succeeds or the
1128    /// outer deadline expires. Returns `Ok(())` on success, `Err(())`
1129    /// on timeout.
1130    async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
1131        tokio::time::timeout(deadline, async move {
1132            loop {
1133                let lock = PackLock::open(&path).unwrap();
1134                if let Ok(Ok(_hold)) =
1135                    tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
1136                {
1137                    return;
1138                }
1139                tokio::time::sleep(Duration::from_millis(20)).await;
1140            }
1141        })
1142        .await
1143        .map_err(|_| ())
1144    }
1145
1146    /// 4.T3 — regression for the documented OS-thread leak window:
1147    /// after B is cancelled, A releases its lock; the spawn_blocking
1148    /// thread that B kicked off must eventually unblock and drop its
1149    /// guard. We observe this by polling from a third task C — if B's
1150    /// blocking thread leaked its guard, C would wait forever.
1151    #[tokio::test]
1152    async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
1153        let dir = tempdir().unwrap();
1154        let path = dir.path().to_path_buf();
1155        let (a, a_started, release_a) = spawn_holder(path.clone());
1156        a_started.notified().await;
1157
1158        // Task B: race against cancel while A holds the fd-lock.
1159        let token = CancellationToken::new();
1160        let cancel_handle = token.clone();
1161        let path_b = path.clone();
1162        let b = tokio::spawn(async move {
1163            let lock = PackLock::open(&path_b).unwrap();
1164            lock.acquire_cancellable(&token).await
1165        });
1166        tokio::time::sleep(Duration::from_millis(20)).await;
1167        cancel_handle.cancel();
1168        let b_result = tokio::time::timeout(Duration::from_millis(100), b)
1169            .await
1170            .expect("B must resolve quickly after cancel")
1171            .expect("B task panicked");
1172        assert!(
1173            matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
1174            "expected B to see Cancelled, got {b_result:?}"
1175        );
1176
1177        // Release A — B's parked OS thread should drain.
1178        release_a.notify_one();
1179        a.await.unwrap();
1180
1181        assert!(
1182            poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
1183            "task C never acquired — spawn_blocking thread leaked its fd-lock guard"
1184        );
1185    }
1186
1187    /// 4.T4 — covers the outer-mutex cancel arm of the `select!` in
1188    /// `acquire_cancellable`. Task A holds the in-process async mutex
1189    /// (via `acquire_async`, which acquires both tiers); task B calls
1190    /// `acquire_cancellable` and is parked on `lock_owned()` (NOT yet
1191    /// at the fd-lock blocking call). Cancelling B's token must
1192    /// short-circuit the mutex wait and return `Err(Cancelled)`.
1193    #[tokio::test]
1194    async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
1195        let dir = tempdir().unwrap();
1196        let path = dir.path().to_path_buf();
1197        let (a, a_started, release_a) = spawn_holder(path.clone());
1198        a_started.notified().await;
1199
1200        let token = CancellationToken::new();
1201        let cancel_handle = token.clone();
1202        let canceller = tokio::spawn(async move {
1203            tokio::time::sleep(Duration::from_millis(10)).await;
1204            cancel_handle.cancel();
1205        });
1206
1207        let lock_b = PackLock::open(&path).unwrap();
1208        let result =
1209            tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
1210                .await
1211                .expect("acquire_cancellable must return within 50 ms after cancel");
1212
1213        assert!(
1214            matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
1215            "expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
1216        );
1217
1218        canceller.await.unwrap();
1219        release_a.notify_one();
1220        a.await.unwrap();
1221    }
1222}