Skip to main content

grex_core/
pack_lock.rs

1//! Per-pack `.grex-lock` file lock — feat-m6-2.
2//!
3//! Acquires an exclusive `fd-lock` guard on `<pack_path>/.grex-lock` for the
4//! full duration of a pack-type plugin lifecycle method. Prevents two
5//! concurrent tasks (in-process) or processes (cross-process) from operating
6//! on the same pack at the same time.
7//!
8//! ## Lock ordering (tier 3 of 5)
9//!
10//! The spec fixes the global acquire order as:
11//!
12//! 1. workspace-sync lock (`<workspace>/.grex.sync.lock`)
13//! 2. scheduler semaphore permit (feat-m6-1)
14//! 3. **per-pack `.grex-lock`** — this module
15//! 4. per-repo backend lock (`<dest>.grex-backend.lock`)
16//! 5. manifest RW lock (`.grex/events.jsonl` sidecar)
17//!
18//! Plugins acquire tier 2 (permit) and tier 3 (pack lock) in that order
19//! inside every `PackTypePlugin` method. In debug builds, [`with_tier`]
20//! enforces strictly-increasing tiers on a thread-local stack; a reversal
21//! panics in debug and logs `tracing::error!` in release.
22//!
23//! ## In-process vs cross-process serialisation
24//!
25//! `fd-lock`'s `write()` is synchronous and blocks the calling OS thread
26//! until the kernel flock is free. Calling it directly inside an async
27//! plugin method would block a tokio worker thread; with a multi-thread
28//! runtime this scales poorly, and recursive re-entry on the same pack
29//! (meta-plugin walking into a symlinked child that points back at the
30//! parent) hangs the task outright because the second `write()` waits on
31//! the first, which is still on-stack.
32//!
33//! To avoid both problems we layer a process-wide async mutex keyed by
34//! canonical pack path **in front of** the fd-lock acquire:
35//!
36//! * [`PackLock::acquire_async`] first takes the canonical-path mutex
37//!   (`tokio::sync::Mutex`), which serialises in-process tasks without
38//!   blocking workers and detects same-task re-entry as a
39//!   [`PackLockError::Busy`] via `try_lock`.
40//! * Inside the async mutex it calls the blocking fd-lock `write()` —
41//!   fast because the only remaining contention is cross-process, which
42//!   is rare.
43//! * On `Drop` it releases the fd-lock guard first, then the async
44//!   mutex guard — reverse acquire order.
45
46#![allow(unsafe_code)]
47
48use std::collections::HashMap;
49use std::fs::{File, OpenOptions};
50use std::io;
51use std::path::{Path, PathBuf};
52use std::sync::{Arc, Mutex, OnceLock};
53
54use fd_lock::{RwLock, RwLockWriteGuard};
55
56/// Stable name of the per-pack lock file created inside every pack root.
57/// Exported so the managed-gitignore writer can hide it from `git status`.
58pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
59
60/// Error surfaced by [`PackLock::open`], [`PackLock::acquire`], and
61/// [`PackLock::try_acquire`].
62#[non_exhaustive]
63#[derive(Debug, thiserror::Error)]
64pub enum PackLockError {
65    /// I/O error opening or locking the sidecar file.
66    #[error("pack lock i/o on `{}`: {source}", path.display())]
67    Io {
68        /// Resolved `<pack_path>/.grex-lock` path.
69        path: PathBuf,
70        /// Underlying OS error.
71        #[source]
72        source: io::Error,
73    },
74    /// Non-blocking probe returned busy. The blocking path
75    /// ([`PackLock::acquire_async`]) never produces this for cross-pack
76    /// contention — it waits. Emitted by:
77    ///
78    /// * [`PackLock::try_acquire`] on any contention.
79    /// * [`PackLock::acquire_async`] on same-process re-entry (a plugin
80    ///   that recurses back into the same pack root). Cross-task
81    ///   contention blocks on the async mutex and never surfaces here.
82    #[error("pack lock `{}` is busy", path.display())]
83    Busy {
84        /// Lock path that was contended.
85        path: PathBuf,
86    },
87}
88
89/// Outcome of [`PackLock::acquire_cancellable`] — either the
90/// underlying lock acquire failed, or the supplied cancellation token
91/// fired before the guard was returned. Distinct from
92/// [`crate::scheduler::Cancelled`]: that ZST signals semaphore-permit
93/// cancellation; this variant signals pack-lock cancellation. Verb
94/// bodies translate either into the same `PluginError::Cancelled` at
95/// the call site (feat-m7-1 Stages 6-7).
96#[non_exhaustive]
97#[derive(Debug, thiserror::Error)]
98pub enum PackLockErrorOrCancelled {
99    /// The cancellation token fired before the lock was acquired.
100    /// The spawned blocking thread (if launched) may still be parked
101    /// in `fd_lock::write()` — see [`PackLock::acquire_cancellable`]
102    /// for the OS-thread leak-window contract.
103    #[error("pack lock acquire cancelled")]
104    Cancelled,
105    /// The underlying lock acquire failed before cancellation fired.
106    #[error(transparent)]
107    Lock(#[from] PackLockError),
108}
109
110use std::sync::Weak;
111
112fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
113    static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
114    REG.get_or_init(|| Mutex::new(HashMap::new()))
115}
116
117/// feat-m6 H3 — prune entries whose only remaining reference was the
118/// registry itself. Called opportunistically on every `mutex_for` so
119/// long-running processes that open many distinct pack paths do not
120/// accumulate unbounded registry entries. Runs under the registry
121/// mutex so there is no race against a concurrent `mutex_for`.
122fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
123    reg.retain(|_, weak| weak.strong_count() > 0);
124}
125
126fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
127    let mut reg = path_mutex_registry()
128        .lock()
129        .expect("pack lock path registry poisoned — this indicates a prior panic");
130    // Try to reuse an existing live entry. If the Weak is dead
131    // (no outstanding holders) fall through to insert a fresh Arc.
132    if let Some(weak) = reg.get(canonical) {
133        if let Some(existing) = weak.upgrade() {
134            return existing;
135        }
136    }
137    prune_dead(&mut reg);
138    let m = Arc::new(tokio::sync::Mutex::new(()));
139    reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
140    m
141}
142
143fn canonical_or_raw(path: &Path) -> PathBuf {
144    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
145}
146
147/// Per-pack file lock wrapper.
148///
149/// Construction via [`PackLock::open`] creates (or re-opens) the sidecar
150/// `<pack_path>/.grex-lock` but does **not** acquire the lock — call
151/// [`PackLock::acquire_async`] for the async-safe blocking path,
152/// [`PackLock::acquire`] for the thread-blocking synchronous path, or
153/// [`PackLock::try_acquire`] for a fail-fast probe.
154pub struct PackLock {
155    inner: RwLock<File>,
156    path: PathBuf,
157    canonical: PathBuf,
158}
159
160impl PackLock {
161    /// Open (and create if missing) the sidecar `<pack_path>/.grex-lock`.
162    /// Does **not** acquire the lock.
163    ///
164    /// # Errors
165    ///
166    /// Returns [`PackLockError::Io`] if the sidecar cannot be opened or
167    /// its parent directory cannot be created.
168    pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
169        let path = pack_path.join(PACK_LOCK_FILE_NAME);
170        if let Some(parent) = path.parent() {
171            std::fs::create_dir_all(parent)
172                .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
173        }
174        let file = OpenOptions::new()
175            .read(true)
176            .write(true)
177            .create(true)
178            .truncate(false)
179            .open(&path)
180            .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
181        let canonical = canonical_or_raw(pack_path);
182        Ok(Self { inner: RwLock::new(file), path, canonical })
183    }
184
185    /// Async acquire — pairs a process-wide [`tokio::sync::Mutex`] keyed
186    /// by canonical pack path with the sidecar `fd-lock`. Safe to call
187    /// from any tokio worker without blocking the runtime. Same-thread
188    /// re-entry (nested synchronous call chain that re-enters the same
189    /// pack root, e.g. meta-plugin recursion through a `..` child that
190    /// points back at the parent) returns [`PackLockError::Busy`] rather
191    /// than deadlocking.
192    ///
193    /// The returned [`PackLockHold`] drops the fd-lock guard and the
194    /// async mutex guard in reverse acquire order at end-of-scope.
195    ///
196    /// # Errors
197    ///
198    /// * [`PackLockError::Busy`] on same-thread re-entry.
199    /// * [`PackLockError::Io`] on any OS-level lock failure.
200    pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
201        // Same-thread re-entry detection — see module-level note on
202        // tokio tasks and thread affinity. This covers the common
203        // case where a nested plugin call runs on the same worker
204        // thread between `.await` points (meta-plugin recursion).
205        // Different threads holding the same pack root's mutex will
206        // queue on `lock_owned().await` below instead.
207        // Serialise in-process tasks on the canonical path via an
208        // async mutex — safe across tokio workers and non-blocking on
209        // the async runtime. Same-task re-entry (recursive plugin
210        // invocation on the same pack root) is the caller's
211        // responsibility to prevent via cycle detection; a same-task
212        // re-entry here would hang at `lock_owned().await` because
213        // tokio mutexes are non-reentrant.
214        //
215        // [`crate::plugin::pack_type::MetaPlugin`] threads the
216        // `visited_meta` set through recursion and inserts the pack
217        // root at every lifecycle entry so cycles halt with
218        // [`crate::execute::ExecError::MetaCycle`] before this mutex
219        // acquire runs. Other pack-type plugins are leaf by design
220        // (declarative, scripted) and cannot re-enter.
221        let mtx = mutex_for(&self.canonical);
222        let mutex_guard = Arc::clone(&mtx).lock_owned().await;
223
224        // Box `self` so its address is stable for the transmuted
225        // `'static` guard lifetime. Take the fd-lock guard from the
226        // boxed lock's `inner`.
227        let boxed = Box::new(self);
228        // feat-m6 H1 — `fd_lock::RwLock::write` is a synchronous
229        // blocking call that waits on the kernel flock. Running it
230        // directly on a tokio worker would park that worker until
231        // the kernel released the lock, starving the runtime when
232        // the only remaining contention is cross-process. Hop onto
233        // the blocking-thread pool so async workers stay free. The
234        // acquire happens inside `spawn_blocking` and the guard is
235        // transmuted to `'static` before leaving the closure so the
236        // box + guard can be returned as a pair.
237        let join = tokio::task::spawn_blocking(
238            move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
239                let mut boxed = boxed;
240                // SAFETY: see outer comment block — `boxed` is moved
241                // into the returned pair and never freed while the
242                // guard is live; field order in `PackLockHold` makes
243                // the guard drop first. Transmuting here (inside the
244                // closure) lets us return both the box and the guard.
245                let guard_ref = boxed
246                    .inner
247                    .write()
248                    .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
249                let guard_static: RwLockWriteGuard<'static, File> = unsafe {
250                    std::mem::transmute::<
251                        RwLockWriteGuard<'_, File>,
252                        RwLockWriteGuard<'static, File>,
253                    >(guard_ref)
254                };
255                Ok((boxed, guard_static))
256            },
257        )
258        .await;
259        let (boxed, guard_static) = match join {
260            Ok(res) => res?,
261            Err(join_err) => {
262                return Err(PackLockError::Io {
263                    path: PathBuf::new(),
264                    source: io::Error::other(join_err.to_string()),
265                });
266            }
267        };
268
269        Ok(PackLockHold {
270            _fd_guard: Some(guard_static),
271            _mutex_guard: Some(mutex_guard),
272            _lock: boxed,
273        })
274    }
275
276    /// Cancellable async acquire — same semantics as
277    /// [`PackLock::acquire_async`] but races the acquire against a
278    /// [`tokio_util::sync::CancellationToken`]. Used by the embedded
279    /// MCP server (feat-m7-1) so a `notifications/cancelled` from the
280    /// client unblocks tool handlers that are parked on a contended
281    /// pack lock.
282    ///
283    /// **Consumes `self`** to mirror [`PackLock::acquire_async`] — the
284    /// same boxed-fd + transmute lifetime dance is needed to hand the
285    /// guard back across a `spawn_blocking` boundary, and reusing the
286    /// consumes-self contract preserves drop ordering against
287    /// [`PackLockHold`].
288    ///
289    /// ## OS-thread leak window — contract
290    ///
291    /// `fd_lock::write()` is a synchronous syscall that parks the
292    /// calling OS thread until the kernel releases the flock. Once the
293    /// blocking call has been launched on the
294    /// [`tokio::task::spawn_blocking`] pool, **the runtime cannot
295    /// interrupt it** — there is no portable way to unpark a thread
296    /// blocked in `flock(2)`. When the cancellation token fires we
297    /// resolve the outer `select!` with [`PackLockErrorOrCancelled::Cancelled`]
298    /// immediately, but the spawned OS thread stays parked until the
299    /// holder eventually releases. When that happens, the spawned
300    /// thread acquires the guard, the `JoinHandle` resolves to
301    /// `Ok((boxed, guard))`, and the tuple is dropped on the spot
302    /// (because the `select!` arm has already won) — at which point
303    /// the guard's `Drop` releases the kernel flock and a subsequent
304    /// acquirer can proceed.
305    ///
306    /// In other words: **cancellation is observable to the caller
307    /// instantly, but the underlying OS thread holds the lock briefly
308    /// past the cancel point, until the syscall returns**. Callers
309    /// that immediately re-attempt acquire on the same path may see
310    /// transient contention until that thread drains. See
311    /// `.omne/cfg/mcp.md` §Cancellation.
312    ///
313    /// # Errors
314    ///
315    /// * [`PackLockErrorOrCancelled::Cancelled`] — the token fired
316    ///   before a guard was returned.
317    /// * [`PackLockErrorOrCancelled::Lock`] wrapping
318    ///   [`PackLockError::Busy`] on same-thread re-entry, or
319    ///   [`PackLockError::Io`] on any OS-level lock failure.
320    pub async fn acquire_cancellable(
321        self,
322        cancel: &::tokio_util::sync::CancellationToken,
323    ) -> Result<PackLockHold, PackLockErrorOrCancelled> {
324        // Mirror `acquire_async`: serialise on the canonical-path
325        // async mutex first. Race the mutex acquire itself against
326        // cancel — same-task re-entry would normally hang here, but
327        // the cancel arm gives the caller an out.
328        let mtx = mutex_for(&self.canonical);
329        let mutex_guard = tokio::select! {
330            biased;
331            () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
332            g = Arc::clone(&mtx).lock_owned() => g,
333        };
334
335        // Box `self` so the address is stable for the transmuted
336        // `'static` guard lifetime — same dance as `acquire_async`.
337        let boxed = Box::new(self);
338        // Capture the sidecar path before the move into the closure
339        // so the JoinError arm below can report it (the closure
340        // consumes `boxed`, so we cannot read it from there).
341        let join_err_path = boxed.path.clone();
342        // feat-m7-1 — replicates the `acquire_async` blocking-pool
343        // hop. The closure body is intentionally identical (do NOT
344        // refactor — see the SAFETY note in `acquire_async`).
345        let join = tokio::task::spawn_blocking(
346            move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
347                let mut boxed = boxed;
348                // SAFETY: see `acquire_async` — `boxed` is moved into
349                // the returned pair and never freed while the guard
350                // is live; field order in `PackLockHold` makes the
351                // guard drop first.
352                let guard_ref = boxed
353                    .inner
354                    .write()
355                    .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
356                let guard_static: RwLockWriteGuard<'static, File> = unsafe {
357                    std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
358                        guard_ref,
359                    )
360                };
361                Ok((boxed, guard_static))
362            },
363        );
364
365        // Race the blocking acquire against the cancellation token.
366        // If cancel wins, the JoinHandle is dropped on the spot — the
367        // spawned OS thread stays parked in `fd_lock::write()` until
368        // the kernel releases, at which point the returned tuple is
369        // dropped (see contract note above) and the flock is freed.
370        let join = tokio::select! {
371            biased;
372            () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
373            res = join => res,
374        };
375
376        let (boxed, guard_static) = match join {
377            Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
378            Err(join_err) => {
379                return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
380                    path: join_err_path,
381                    source: io::Error::other(join_err.to_string()),
382                }));
383            }
384        };
385
386        Ok(PackLockHold {
387            _fd_guard: Some(guard_static),
388            _mutex_guard: Some(mutex_guard),
389            _lock: boxed,
390        })
391    }
392
393    /// Thread-blocking acquire (no tokio integration). Waits on the
394    /// fd-lock synchronously. Suitable for synchronous call sites only
395    /// — async plugin methods MUST use [`PackLock::acquire_async`] to
396    /// avoid blocking a tokio worker.
397    ///
398    /// Returns a borrowed [`RwLockWriteGuard`]; the caller owns both
399    /// the outer [`PackLock`] and the guard in scope. Mirrors the
400    /// [`crate::fs::ScopedLock`] shape.
401    ///
402    /// # Errors
403    ///
404    /// Returns [`PackLockError::Io`] if the OS lock call fails.
405    pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
406        self.inner.write().map_err(|source| PackLockError::Io { path: self.path.clone(), source })
407    }
408
409    /// Non-blocking probe: return [`PackLockError::Busy`] instead of
410    /// waiting when another holder has the lock. Does not engage the
411    /// async mutex — purely a fail-fast diagnostics hook.
412    ///
413    /// # Errors
414    ///
415    /// * [`PackLockError::Busy`] when a concurrent holder owns the lock.
416    /// * [`PackLockError::Io`] on any other OS-level lock failure.
417    pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
418        match self.inner.try_write() {
419            Ok(g) => Ok(g),
420            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
421                Err(PackLockError::Busy { path: self.path.clone() })
422            }
423            Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
424        }
425    }
426
427    /// Sidecar path — `<pack_path>/.grex-lock`.
428    #[must_use]
429    pub fn path(&self) -> &Path {
430        &self.path
431    }
432}
433
434impl std::fmt::Debug for PackLock {
435    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436        f.debug_struct("PackLock").field("path", &self.path).finish()
437    }
438}
439
440/// RAII guard returned by [`PackLock::acquire_async`]. Holds the
441/// sidecar-file `fd-lock` guard plus the process-wide async mutex
442/// guard. Drops in reverse acquire order.
443#[repr(C)]
444pub struct PackLockHold {
445    // Field order is load-bearing: `_fd_guard` drops first (releasing
446    // the kernel flock), then `_mutex_guard` (releasing the async
447    // serialisation slot), then `_lock` (closing the file handle).
448    // `#[repr(C)]` pins source order to layout order so `offset_of!`
449    // assertions below stay meaningful.
450    _fd_guard: Option<RwLockWriteGuard<'static, File>>,
451    _mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
452    _lock: Box<PackLock>,
453}
454
455impl PackLockHold {
456    /// Sidecar path for diagnostics.
457    #[must_use]
458    pub fn path(&self) -> &Path {
459        self._lock.path()
460    }
461}
462
463impl std::fmt::Debug for PackLockHold {
464    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465        f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
466    }
467}
468
469// Field-order static assertion (feat-m6 B3) — the safety argument for the
470// unsafe lifetime extension in `acquire_async` depends on `_fd_guard`
471// dropping before `_lock`. Rust drops struct fields in declaration order,
472// so `_fd_guard` must sit at the lowest offset, then `_mutex_guard`, then
473// `_lock`. A refactor that reorders these fields would silently break the
474// Drop ordering and the transmuted `'static` borrow would outlive its box.
475const _: () = {
476    assert!(
477        std::mem::offset_of!(PackLockHold, _fd_guard)
478            < std::mem::offset_of!(PackLockHold, _mutex_guard),
479        "PackLockHold field order: _fd_guard must precede _mutex_guard"
480    );
481    assert!(
482        std::mem::offset_of!(PackLockHold, _mutex_guard)
483            < std::mem::offset_of!(PackLockHold, _lock),
484        "PackLockHold field order: _mutex_guard must precede _lock"
485    );
486};
487
488impl Drop for PackLockHold {
489    fn drop(&mut self) {
490        // Explicit take() on fd-lock guard first — the transmuted
491        // `'static` lifetime must expire before `_lock` drops.
492        self._fd_guard.take();
493        self._mutex_guard.take();
494        // `_lock` drops last when the struct itself drops, closing the
495        // underlying file handle.
496    }
497}
498
499// ---------------------------------------------------------------------------
500// Lock-ordering enforcement (debug builds).
501// ---------------------------------------------------------------------------
502
503/// Lock tier ordinals matching `.omne/cfg/concurrency.md`. Acquisitions
504/// must strictly increase; reversed order risks the deadlock class the
505/// feat-m6-3 Lean proof rules out.
506#[non_exhaustive]
507#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
508pub enum Tier {
509    /// Workspace sync lock — `<workspace>/.grex.sync.lock`.
510    WorkspaceSync = 0,
511    /// Scheduler semaphore permit — feat-m6-1.
512    Semaphore = 1,
513    /// Per-pack `.grex-lock` — feat-m6-2 (this module).
514    PerPack = 2,
515    /// Per-repo backend lock — `<dest>.grex-backend.lock`.
516    Backend = 3,
517    /// Manifest RW lock — `.grex/events.jsonl` sidecar.
518    Manifest = 4,
519}
520
521/// Run `f` while `tier` is pushed on the current thread's tier stack.
522/// Debug builds enforce strictly-increasing order across nested calls;
523/// release builds skip the check entirely.
524#[cfg(debug_assertions)]
525pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
526    tier::push(tier);
527    let out = f();
528    tier::pop_if_top(tier);
529    out
530}
531
532/// Release-build no-op mirror of [`with_tier`].
533#[cfg(not(debug_assertions))]
534#[inline]
535pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
536    f()
537}
538
539/// Wrap an async future in a task-local tier-stack scope so any
540/// [`TierGuard`] pushes inside it land in the correct frame even when
541/// the task migrates across tokio workers after `.await`. Release
542/// builds compile this down to the raw future — no scope, no cost.
543///
544/// Callers should wrap every top-level async dispatch (e.g. the
545/// per-pack plugin lifecycle calls driven by `rt.block_on(...)`) so
546/// the tier check can operate on a fresh stack per dispatch.
547#[cfg(debug_assertions)]
548pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
549    tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
550}
551
552/// Release-build no-op mirror of [`with_tier_scope`].
553#[cfg(not(debug_assertions))]
554#[inline]
555pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
556    f.await
557}
558
559/// RAII guard — pushes a tier onto the current-thread stack on
560/// construction, pops on drop. Lets lifecycle prologues enforce
561/// tier ordering across `.await` points without nesting the rest of
562/// the body inside a `with_tier` closure. Debug builds carry the
563/// ordering check; release builds compile to a zero-sized no-op.
564///
565/// Field/declaration-order note: callers must declare the guard
566/// **before** the permit/hold it is scoping. Rust drops locals in
567/// reverse declaration order, so `_tier` declared first drops last —
568/// after the lock/permit releases — matching `with_tier` semantics.
569#[must_use]
570pub struct TierGuard {
571    #[cfg(debug_assertions)]
572    tier: Tier,
573    // Sized placeholder for release so the type is still `Sized` and
574    // `must_use`-meaningful. Zero-sized once inlined.
575    #[cfg(not(debug_assertions))]
576    _private: (),
577}
578
579impl TierGuard {
580    /// Push `tier` onto the current-thread tier stack. Debug builds
581    /// assert strictly-increasing order against the existing top.
582    #[cfg(debug_assertions)]
583    pub fn push(tier: Tier) -> Self {
584        tier::push(tier);
585        TierGuard { tier }
586    }
587
588    /// Release-build no-op constructor.
589    #[cfg(not(debug_assertions))]
590    #[inline]
591    pub fn push(_tier: Tier) -> Self {
592        TierGuard { _private: () }
593    }
594}
595
596#[cfg(debug_assertions)]
597impl Drop for TierGuard {
598    fn drop(&mut self) {
599        tier::pop_if_top(self.tier);
600    }
601}
602
603#[cfg(debug_assertions)]
604pub(crate) mod tier {
605    use super::Tier;
606    use std::cell::RefCell;
607
608    // feat-m6 CI fix — previously this used `thread_local!`, but under a
609    // tokio multi-thread runtime a task can resume on a different worker
610    // after `.await`. A push on worker A followed by a yield and a pop on
611    // worker B left A's stack polluted and tripped the tier-ordering
612    // assertion on the next acquire. Migrating to `tokio::task_local!`
613    // pins the stack to the *task*, not the worker, so nested
614    // `TierGuard` bookkeeping follows the task across workers.
615    //
616    // `try_with` silently no-ops outside a `TIER_STACK.scope(...)`
617    // frame — that makes the module safe to use from synchronous
618    // (non-tokio) test harnesses and the module's own unit tests at
619    // the cost of debug-only tier enforcement being disabled there.
620    // Production dispatch wraps every pack-type plugin call in a scope
621    // (see `sync::dispatch_*`), so real runs retain enforcement.
622    tokio::task_local! {
623        pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
624    }
625
626    pub fn push(next: Tier) {
627        let _ = TIER_STACK.try_with(|s| {
628            let mut s = s.borrow_mut();
629            if let Some(&top) = s.last() {
630                assert!(
631                    next > top,
632                    "lock tier violation: trying to acquire {next:?} while already holding {top:?} \
633                     (tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
634                );
635            }
636            s.push(next);
637        });
638    }
639
640    pub fn pop_if_top(expected: Tier) {
641        let _ = TIER_STACK.try_with(|s| {
642            let mut s = s.borrow_mut();
643            if s.last().copied() == Some(expected) {
644                s.pop();
645            } else {
646                tracing::error!(
647                    target: "grex::concurrency",
648                    "tier pop mismatch: expected {:?} at top, stack = {:?}",
649                    expected,
650                    *s
651                );
652            }
653        });
654    }
655}
656
657// ---------------------------------------------------------------------------
658// Tests.
659// ---------------------------------------------------------------------------
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664    use std::sync::{Arc, Barrier};
665    use std::thread;
666    use std::time::{Duration, Instant};
667    use tempfile::tempdir;
668    use tokio_util::sync::CancellationToken;
669
670    #[test]
671    fn pack_lock_acquires_creates_file() {
672        let dir = tempdir().unwrap();
673        let mut plock = PackLock::open(dir.path()).unwrap();
674        let expected = plock.path().to_path_buf();
675        let _guard = plock.acquire().unwrap();
676        assert!(expected.exists(), "open must create the sidecar file");
677        assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
678    }
679
680    #[test]
681    fn pack_lock_second_try_acquire_reports_busy_while_held() {
682        let dir = tempdir().unwrap();
683        let mut first = PackLock::open(dir.path()).unwrap();
684        let _held = first.acquire().unwrap();
685        let mut second = PackLock::open(dir.path()).unwrap();
686        let err = second.try_acquire().unwrap_err();
687        match err {
688            PackLockError::Busy { path } => {
689                assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
690            }
691            other => panic!("expected Busy, got {other:?}"),
692        }
693    }
694
695    #[test]
696    fn pack_lock_release_on_drop() {
697        let dir = tempdir().unwrap();
698        {
699            let mut first = PackLock::open(dir.path()).unwrap();
700            let _g = first.acquire().unwrap();
701        }
702        let mut second = PackLock::open(dir.path()).unwrap();
703        let _g = second.acquire().unwrap();
704    }
705
706    #[test]
707    fn pack_lock_path_contains_pack_path() {
708        let dir = tempdir().unwrap();
709        let plock = PackLock::open(dir.path()).unwrap();
710        let p = plock.path();
711        assert!(p.starts_with(dir.path()));
712        assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
713    }
714
715    #[test]
716    fn pack_lock_blocking_acquire_waits_for_holder() {
717        let dir = tempdir().unwrap();
718        let path = dir.path().to_path_buf();
719        let barrier = Arc::new(Barrier::new(2));
720        let holder_barrier = Arc::clone(&barrier);
721        let holder_path = path.clone();
722
723        let holder = thread::spawn(move || {
724            let mut lock = PackLock::open(&holder_path).unwrap();
725            let _g = lock.acquire().unwrap();
726            holder_barrier.wait();
727            thread::sleep(Duration::from_millis(100));
728        });
729
730        barrier.wait();
731        let start = Instant::now();
732        let mut second = PackLock::open(&path).unwrap();
733        let _g = second.acquire().unwrap();
734        let waited = start.elapsed();
735        holder.join().unwrap();
736        assert!(
737            waited >= Duration::from_millis(40),
738            "blocking acquire must have waited (observed {waited:?})"
739        );
740    }
741
742    #[test]
743    fn pack_lock_distinct_paths_do_not_contend() {
744        let a = tempdir().unwrap();
745        let b = tempdir().unwrap();
746        let mut la = PackLock::open(a.path()).unwrap();
747        let _ga = la.acquire().unwrap();
748        let mut lb = PackLock::open(b.path()).unwrap();
749        let _gb = lb.try_acquire().unwrap();
750    }
751
752    #[tokio::test]
753    async fn async_acquire_serialises_in_process() {
754        // Two concurrent acquire_async calls on the same pack path
755        // must serialise cleanly (no hang).
756        let dir = tempdir().unwrap();
757        let path = dir.path().to_path_buf();
758        let path_clone = path.clone();
759        let h1 = tokio::spawn(async move {
760            let lock = PackLock::open(&path).unwrap();
761            let _hold = lock.acquire_async().await.unwrap();
762            tokio::time::sleep(Duration::from_millis(30)).await;
763        });
764        let h2 = tokio::spawn(async move {
765            tokio::time::sleep(Duration::from_millis(5)).await;
766            let lock = PackLock::open(&path_clone).unwrap();
767            let _hold = lock.acquire_async().await.unwrap();
768        });
769        h1.await.unwrap();
770        h2.await.unwrap();
771    }
772
773    // --- tier ordering (debug-only) -----------------------------------------
774
775    // feat-m6 CI fix — tier enforcement now lives in a `tokio::task_local!`
776    // stack, so these tests drive the check through a scoped task to
777    // establish the frame. `try_with` outside a scope silently no-ops.
778
779    #[cfg(debug_assertions)]
780    #[tokio::test]
781    async fn tier_strictly_increasing_ok() {
782        tier::TIER_STACK
783            .scope(std::cell::RefCell::new(Vec::new()), async {
784                with_tier(Tier::Semaphore, || {
785                    with_tier(Tier::PerPack, || {
786                        with_tier(Tier::Backend, || {
787                            with_tier(Tier::Manifest, || {});
788                        });
789                    });
790                });
791            })
792            .await;
793    }
794
795    #[cfg(debug_assertions)]
796    #[tokio::test]
797    async fn tier_reversed_panics_in_debug() {
798        use std::panic::{catch_unwind, AssertUnwindSafe};
799        let result = tier::TIER_STACK
800            .scope(std::cell::RefCell::new(Vec::new()), async {
801                catch_unwind(AssertUnwindSafe(|| {
802                    with_tier(Tier::PerPack, || {
803                        with_tier(Tier::Semaphore, || {});
804                    });
805                }))
806            })
807            .await;
808        assert!(result.is_err(), "reversed tier order must panic in debug builds");
809    }
810
811    // --- acquire_cancellable (feat-m7-1 Stage 4) -----------------------------
812
813    /// 4.T1 — uncontended path returns Ok(PackLockHold).
814    #[tokio::test]
815    async fn acquire_cancellable_happy_path() {
816        let dir = tempdir().unwrap();
817        let lock = PackLock::open(dir.path()).unwrap();
818        let token = CancellationToken::new();
819        let result = lock.acquire_cancellable(&token).await;
820        assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
821    }
822
823    /// 4.T2 — task A holds the lock; task B's token fires after 10 ms;
824    /// B must surface `Err(Cancelled)` within 50 ms.
825    #[tokio::test]
826    async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
827        let dir = tempdir().unwrap();
828        let path = dir.path().to_path_buf();
829        let path_b = path.clone();
830
831        // Task A: acquire and hold for 500 ms (long enough to cover B's window).
832        let a_started = Arc::new(tokio::sync::Notify::new());
833        let a_started_clone = Arc::clone(&a_started);
834        let a = tokio::spawn(async move {
835            let lock = PackLock::open(&path).unwrap();
836            let _hold = lock.acquire_async().await.unwrap();
837            a_started_clone.notify_one();
838            tokio::time::sleep(Duration::from_millis(500)).await;
839        });
840        a_started.notified().await;
841
842        let token = CancellationToken::new();
843        let cancel_handle = token.clone();
844        let canceller = tokio::spawn(async move {
845            tokio::time::sleep(Duration::from_millis(10)).await;
846            cancel_handle.cancel();
847        });
848
849        let started = Instant::now();
850        let lock_b = PackLock::open(&path_b).unwrap();
851        let result =
852            tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
853                .await
854                .expect("acquire_cancellable must return within 50 ms after cancel");
855
856        let waited = started.elapsed();
857        assert!(
858            matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
859            "expected Err(Cancelled), got {result:?} after {waited:?}"
860        );
861
862        canceller.await.unwrap();
863        a.abort();
864        let _ = a.await;
865    }
866
867    // --- helpers for 4.T3 / 4.T4 -------------------------------------------
868
869    /// Spawn a "holder" task that acquires `path` and releases on
870    /// signal. Returns `(JoinHandle, started_notify, release_notify)`.
871    /// Caller awaits `started` to know the lock is held, then fires
872    /// `release` when ready to let the holder drop its guard.
873    fn spawn_holder(
874        path: PathBuf,
875    ) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
876        let started = Arc::new(tokio::sync::Notify::new());
877        let release = Arc::new(tokio::sync::Notify::new());
878        let started_c = Arc::clone(&started);
879        let release_c = Arc::clone(&release);
880        let h = tokio::spawn(async move {
881            let lock = PackLock::open(&path).unwrap();
882            let _hold = lock.acquire_async().await.unwrap();
883            started_c.notify_one();
884            release_c.notified().await;
885        });
886        (h, started, release)
887    }
888
889    /// Poll `acquire_async` against `path` until it succeeds or the
890    /// outer deadline expires. Returns `Ok(())` on success, `Err(())`
891    /// on timeout.
892    async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
893        tokio::time::timeout(deadline, async move {
894            loop {
895                let lock = PackLock::open(&path).unwrap();
896                if let Ok(Ok(_hold)) =
897                    tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
898                {
899                    return;
900                }
901                tokio::time::sleep(Duration::from_millis(20)).await;
902            }
903        })
904        .await
905        .map_err(|_| ())
906    }
907
908    /// 4.T3 — regression for the documented OS-thread leak window:
909    /// after B is cancelled, A releases its lock; the spawn_blocking
910    /// thread that B kicked off must eventually unblock and drop its
911    /// guard. We observe this by polling from a third task C — if B's
912    /// blocking thread leaked its guard, C would wait forever.
913    #[tokio::test]
914    async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
915        let dir = tempdir().unwrap();
916        let path = dir.path().to_path_buf();
917        let (a, a_started, release_a) = spawn_holder(path.clone());
918        a_started.notified().await;
919
920        // Task B: race against cancel while A holds the fd-lock.
921        let token = CancellationToken::new();
922        let cancel_handle = token.clone();
923        let path_b = path.clone();
924        let b = tokio::spawn(async move {
925            let lock = PackLock::open(&path_b).unwrap();
926            lock.acquire_cancellable(&token).await
927        });
928        tokio::time::sleep(Duration::from_millis(20)).await;
929        cancel_handle.cancel();
930        let b_result = tokio::time::timeout(Duration::from_millis(100), b)
931            .await
932            .expect("B must resolve quickly after cancel")
933            .expect("B task panicked");
934        assert!(
935            matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
936            "expected B to see Cancelled, got {b_result:?}"
937        );
938
939        // Release A — B's parked OS thread should drain.
940        release_a.notify_one();
941        a.await.unwrap();
942
943        assert!(
944            poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
945            "task C never acquired — spawn_blocking thread leaked its fd-lock guard"
946        );
947    }
948
949    /// 4.T4 — covers the outer-mutex cancel arm of the `select!` in
950    /// `acquire_cancellable`. Task A holds the in-process async mutex
951    /// (via `acquire_async`, which acquires both tiers); task B calls
952    /// `acquire_cancellable` and is parked on `lock_owned()` (NOT yet
953    /// at the fd-lock blocking call). Cancelling B's token must
954    /// short-circuit the mutex wait and return `Err(Cancelled)`.
955    #[tokio::test]
956    async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
957        let dir = tempdir().unwrap();
958        let path = dir.path().to_path_buf();
959        let (a, a_started, release_a) = spawn_holder(path.clone());
960        a_started.notified().await;
961
962        let token = CancellationToken::new();
963        let cancel_handle = token.clone();
964        let canceller = tokio::spawn(async move {
965            tokio::time::sleep(Duration::from_millis(10)).await;
966            cancel_handle.cancel();
967        });
968
969        let lock_b = PackLock::open(&path).unwrap();
970        let result =
971            tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
972                .await
973                .expect("acquire_cancellable must return within 50 ms after cancel");
974
975        assert!(
976            matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
977            "expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
978        );
979
980        canceller.await.unwrap();
981        release_a.notify_one();
982        a.await.unwrap();
983    }
984}