Skip to main content

grex_core/
pack_lock.rs

1//! Per-pack `.grex-lock` file lock — feat-m6-2.
2//!
3//! Acquires an exclusive `fd-lock` guard on `<pack_path>/.grex-lock` for the
4//! full duration of a pack-type plugin lifecycle method. Prevents two
5//! concurrent tasks (in-process) or processes (cross-process) from operating
6//! on the same pack at the same time.
7//!
8//! ## Lock ordering (tier 3 of 5)
9//!
10//! The spec fixes the global acquire order as:
11//!
12//! 1. workspace-sync lock (`<workspace>/.grex.sync.lock`)
13//! 2. scheduler semaphore permit (feat-m6-1)
14//! 3. **per-pack `.grex-lock`** — this module
15//! 4. per-repo backend lock (`<dest>.grex-backend.lock`)
16//! 5. manifest RW lock (`.grex/events.jsonl` sidecar)
17//!
18//! Plugins acquire tier 2 (permit) and tier 3 (pack lock) in that order
19//! inside every `PackTypePlugin` method. In debug builds, [`with_tier`]
20//! enforces strictly-increasing tiers on a thread-local stack; a reversal
21//! panics in debug and logs `tracing::error!` in release.
22//!
23//! ## In-process vs cross-process serialisation
24//!
25//! `fd-lock`'s `write()` is synchronous and blocks the calling OS thread
26//! until the kernel flock is free. Calling it directly inside an async
27//! plugin method would block a tokio worker thread; with a multi-thread
28//! runtime this scales poorly, and recursive re-entry on the same pack
29//! (meta-plugin walking into a symlinked child that points back at the
30//! parent) hangs the task outright because the second `write()` waits on
31//! the first, which is still on-stack.
32//!
33//! To avoid both problems we layer a process-wide async mutex keyed by
34//! canonical pack path **in front of** the fd-lock acquire:
35//!
36//! * [`PackLock::acquire_async`] first takes the canonical-path mutex
37//!   (`tokio::sync::Mutex`), which serialises in-process tasks without
38//!   blocking workers and detects same-task re-entry as a
39//!   [`PackLockError::Busy`] via `try_lock`.
40//! * Inside the async mutex it calls the blocking fd-lock `write()` —
41//!   fast because the only remaining contention is cross-process, which
42//!   is rare.
43//! * On `Drop` it releases the fd-lock guard first, then the async
44//!   mutex guard — reverse acquire order.
45
46#![allow(unsafe_code)]
47
48use std::collections::HashMap;
49use std::fs::{File, OpenOptions};
50use std::io;
51use std::path::{Path, PathBuf};
52use std::sync::{Arc, Mutex, OnceLock};
53
54use fd_lock::{RwLock, RwLockWriteGuard};
55
56/// Stable name of the per-pack lock file created inside every pack root.
57/// Exported so the managed-gitignore writer can hide it from `git status`.
58pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
59
60/// Error surfaced by [`PackLock::open`], [`PackLock::acquire`], and
61/// [`PackLock::try_acquire`].
62///
63/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
64/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`].
65#[non_exhaustive]
66#[derive(Debug, thiserror::Error)]
67pub enum PackLockError {
68    /// I/O error opening or locking the sidecar file.
69    #[error("pack lock i/o on `{}`: {source}", path.display())]
70    Io {
71        /// Resolved `<pack_path>/.grex-lock` path.
72        path: PathBuf,
73        /// Underlying OS error.
74        #[source]
75        source: io::Error,
76    },
77    /// Non-blocking probe returned busy. The blocking path
78    /// ([`PackLock::acquire_async`]) never produces this for cross-pack
79    /// contention — it waits. Emitted by:
80    ///
81    /// * [`PackLock::try_acquire`] on any contention.
82    /// * [`PackLock::acquire_async`] on same-process re-entry (a plugin
83    ///   that recurses back into the same pack root). Cross-task
84    ///   contention blocks on the async mutex and never surfaces here.
85    #[error("pack lock `{}` is busy", path.display())]
86    Busy {
87        /// Lock path that was contended.
88        path: PathBuf,
89    },
90}
91
92/// Outcome of [`PackLock::acquire_cancellable`] — either the
93/// underlying lock acquire failed, or the supplied cancellation token
94/// fired before the guard was returned. Distinct from
95/// [`crate::scheduler::Cancelled`]: that ZST signals semaphore-permit
96/// cancellation; this variant signals pack-lock cancellation. Verb
97/// bodies translate either into the same `PluginError::Cancelled` at
98/// the call site (feat-m7-1 Stages 6-7).
99#[non_exhaustive]
100#[derive(Debug, thiserror::Error)]
101pub enum PackLockErrorOrCancelled {
102    /// The cancellation token fired before the lock was acquired.
103    /// The spawned blocking thread (if launched) may still be parked
104    /// in `fd_lock::write()` — see [`PackLock::acquire_cancellable`]
105    /// for the OS-thread leak-window contract.
106    #[error("pack lock acquire cancelled")]
107    Cancelled,
108    /// The underlying lock acquire failed before cancellation fired.
109    #[error(transparent)]
110    Lock(#[from] PackLockError),
111}
112
113use std::sync::Weak;
114
115fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
116    static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
117    REG.get_or_init(|| Mutex::new(HashMap::new()))
118}
119
120/// feat-m6 H3 — prune entries whose only remaining reference was the
121/// registry itself. Called opportunistically on every `mutex_for` so
122/// long-running processes that open many distinct pack paths do not
123/// accumulate unbounded registry entries. Runs under the registry
124/// mutex so there is no race against a concurrent `mutex_for`.
125fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
126    reg.retain(|_, weak| weak.strong_count() > 0);
127}
128
129fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
130    let mut reg = path_mutex_registry()
131        .lock()
132        .expect("pack lock path registry poisoned — this indicates a prior panic");
133    // Try to reuse an existing live entry. If the Weak is dead
134    // (no outstanding holders) fall through to insert a fresh Arc.
135    if let Some(weak) = reg.get(canonical) {
136        if let Some(existing) = weak.upgrade() {
137            return existing;
138        }
139    }
140    prune_dead(&mut reg);
141    let m = Arc::new(tokio::sync::Mutex::new(()));
142    reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
143    m
144}
145
146fn canonical_or_raw(path: &Path) -> PathBuf {
147    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
148}
149
150/// Per-pack file lock wrapper.
151///
152/// Construction via [`PackLock::open`] creates (or re-opens) the sidecar
153/// `<pack_path>/.grex-lock` but does **not** acquire the lock — call
154/// [`PackLock::acquire_async`] for the async-safe blocking path or
155/// [`PackLock::try_acquire`] for a fail-fast probe.
156///
157/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
158/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`]. The sync
159/// blocking variant will be removed in v1.3.0.
160pub struct PackLock {
161    inner: RwLock<File>,
162    path: PathBuf,
163    canonical: PathBuf,
164}
165
166impl PackLock {
167    /// Open (and create if missing) the sidecar `<pack_path>/.grex-lock`.
168    /// Does **not** acquire the lock.
169    ///
170    /// # Errors
171    ///
172    /// Returns [`PackLockError::Io`] if the sidecar cannot be opened or
173    /// its parent directory cannot be created.
174    pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
175        let path = pack_path.join(PACK_LOCK_FILE_NAME);
176        if let Some(parent) = path.parent() {
177            std::fs::create_dir_all(parent)
178                .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
179        }
180        let file = OpenOptions::new()
181            .read(true)
182            .write(true)
183            .create(true)
184            .truncate(false)
185            .open(&path)
186            .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
187        let canonical = canonical_or_raw(pack_path);
188        Ok(Self { inner: RwLock::new(file), path, canonical })
189    }
190
191    /// Async acquire — pairs a process-wide [`tokio::sync::Mutex`] keyed
192    /// by canonical pack path with the sidecar `fd-lock`. Safe to call
193    /// from any tokio worker without blocking the runtime. Same-thread
194    /// re-entry (nested synchronous call chain that re-enters the same
195    /// pack root, e.g. meta-plugin recursion through a `..` child that
196    /// points back at the parent) returns [`PackLockError::Busy`] rather
197    /// than deadlocking.
198    ///
199    /// The returned [`PackLockHold`] drops the fd-lock guard and the
200    /// async mutex guard in reverse acquire order at end-of-scope.
201    ///
202    /// # Errors
203    ///
204    /// * [`PackLockError::Busy`] on same-thread re-entry.
205    /// * [`PackLockError::Io`] on any OS-level lock failure.
206    pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
207        // Same-thread re-entry detection — see module-level note on
208        // tokio tasks and thread affinity. This covers the common
209        // case where a nested plugin call runs on the same worker
210        // thread between `.await` points (meta-plugin recursion).
211        // Different threads holding the same pack root's mutex will
212        // queue on `lock_owned().await` below instead.
213        // Serialise in-process tasks on the canonical path via an
214        // async mutex — safe across tokio workers and non-blocking on
215        // the async runtime. Same-task re-entry (recursive plugin
216        // invocation on the same pack root) is the caller's
217        // responsibility to prevent via cycle detection; a same-task
218        // re-entry here would hang at `lock_owned().await` because
219        // tokio mutexes are non-reentrant.
220        //
221        // [`crate::plugin::pack_type::MetaPlugin`] threads the
222        // `visited_meta` set through recursion and inserts the pack
223        // root at every lifecycle entry so cycles halt with
224        // [`crate::execute::ExecError::MetaCycle`] before this mutex
225        // acquire runs. Other pack-type plugins are leaf by design
226        // (declarative, scripted) and cannot re-enter.
227        let mtx = mutex_for(&self.canonical);
228        let mutex_guard = Arc::clone(&mtx).lock_owned().await;
229
230        // Box `self` so its address is stable for the transmuted
231        // `'static` guard lifetime. Take the fd-lock guard from the
232        // boxed lock's `inner`.
233        let boxed = Box::new(self);
234        // feat-m6 H1 — `fd_lock::RwLock::write` is a synchronous
235        // blocking call that waits on the kernel flock. Running it
236        // directly on a tokio worker would park that worker until
237        // the kernel released the lock, starving the runtime when
238        // the only remaining contention is cross-process. Hop onto
239        // the blocking-thread pool so async workers stay free. The
240        // acquire happens inside `spawn_blocking` and the guard is
241        // transmuted to `'static` before leaving the closure so the
242        // box + guard can be returned as a pair.
243        let join = tokio::task::spawn_blocking(
244            move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
245                let mut boxed = boxed;
246                // SAFETY: see outer comment block — `boxed` is moved
247                // into the returned pair and never freed while the
248                // guard is live; field order in `PackLockHold` makes
249                // the guard drop first. Transmuting here (inside the
250                // closure) lets us return both the box and the guard.
251                let guard_ref = boxed
252                    .inner
253                    .write()
254                    .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
255                let guard_static: RwLockWriteGuard<'static, File> = unsafe {
256                    std::mem::transmute::<
257                        RwLockWriteGuard<'_, File>,
258                        RwLockWriteGuard<'static, File>,
259                    >(guard_ref)
260                };
261                Ok((boxed, guard_static))
262            },
263        )
264        .await;
265        let (boxed, guard_static) = match join {
266            Ok(res) => res?,
267            Err(join_err) => {
268                return Err(PackLockError::Io {
269                    path: PathBuf::new(),
270                    source: io::Error::other(join_err.to_string()),
271                });
272            }
273        };
274
275        Ok(PackLockHold {
276            _fd_guard: Some(guard_static),
277            _mutex_guard: Some(mutex_guard),
278            _lock: boxed,
279        })
280    }
281
282    /// Cancellable async acquire — same semantics as
283    /// [`PackLock::acquire_async`] but races the acquire against a
284    /// [`tokio_util::sync::CancellationToken`]. Used by the embedded
285    /// MCP server (feat-m7-1) so a `notifications/cancelled` from the
286    /// client unblocks tool handlers that are parked on a contended
287    /// pack lock.
288    ///
289    /// **Consumes `self`** to mirror [`PackLock::acquire_async`] — the
290    /// same boxed-fd + transmute lifetime dance is needed to hand the
291    /// guard back across a `spawn_blocking` boundary, and reusing the
292    /// consumes-self contract preserves drop ordering against
293    /// [`PackLockHold`].
294    ///
295    /// ## OS-thread leak window — contract
296    ///
297    /// `fd_lock::write()` is a synchronous syscall that parks the
298    /// calling OS thread until the kernel releases the flock. Once the
299    /// blocking call has been launched on the
300    /// [`tokio::task::spawn_blocking`] pool, **the runtime cannot
301    /// interrupt it** — there is no portable way to unpark a thread
302    /// blocked in `flock(2)`. When the cancellation token fires we
303    /// resolve the outer `select!` with [`PackLockErrorOrCancelled::Cancelled`]
304    /// immediately, but the spawned OS thread stays parked until the
305    /// holder eventually releases. When that happens, the spawned
306    /// thread acquires the guard, the `JoinHandle` resolves to
307    /// `Ok((boxed, guard))`, and the tuple is dropped on the spot
308    /// (because the `select!` arm has already won) — at which point
309    /// the guard's `Drop` releases the kernel flock and a subsequent
310    /// acquirer can proceed.
311    ///
312    /// In other words: **cancellation is observable to the caller
313    /// instantly, but the underlying OS thread holds the lock briefly
314    /// past the cancel point, until the syscall returns**. Callers
315    /// that immediately re-attempt acquire on the same path may see
316    /// transient contention until that thread drains. See
317    /// `.omne/cfg/mcp.md` §Cancellation.
318    ///
319    /// # Errors
320    ///
321    /// * [`PackLockErrorOrCancelled::Cancelled`] — the token fired
322    ///   before a guard was returned.
323    /// * [`PackLockErrorOrCancelled::Lock`] wrapping
324    ///   [`PackLockError::Busy`] on same-thread re-entry, or
325    ///   [`PackLockError::Io`] on any OS-level lock failure.
326    pub async fn acquire_cancellable(
327        self,
328        cancel: &::tokio_util::sync::CancellationToken,
329    ) -> Result<PackLockHold, PackLockErrorOrCancelled> {
330        // Mirror `acquire_async`: serialise on the canonical-path
331        // async mutex first. Race the mutex acquire itself against
332        // cancel — same-task re-entry would normally hang here, but
333        // the cancel arm gives the caller an out.
334        let mtx = mutex_for(&self.canonical);
335        let mutex_guard = tokio::select! {
336            biased;
337            () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
338            g = Arc::clone(&mtx).lock_owned() => g,
339        };
340
341        // Box `self` so the address is stable for the transmuted
342        // `'static` guard lifetime — same dance as `acquire_async`.
343        let boxed = Box::new(self);
344        // Capture the sidecar path before the move into the closure
345        // so the JoinError arm below can report it (the closure
346        // consumes `boxed`, so we cannot read it from there).
347        let join_err_path = boxed.path.clone();
348        // feat-m7-1 — replicates the `acquire_async` blocking-pool
349        // hop. The closure body is intentionally identical (do NOT
350        // refactor — see the SAFETY note in `acquire_async`).
351        let join = tokio::task::spawn_blocking(
352            move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
353                let mut boxed = boxed;
354                // SAFETY: see `acquire_async` — `boxed` is moved into
355                // the returned pair and never freed while the guard
356                // is live; field order in `PackLockHold` makes the
357                // guard drop first.
358                let guard_ref = boxed
359                    .inner
360                    .write()
361                    .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
362                let guard_static: RwLockWriteGuard<'static, File> = unsafe {
363                    std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
364                        guard_ref,
365                    )
366                };
367                Ok((boxed, guard_static))
368            },
369        );
370
371        // Race the blocking acquire against the cancellation token.
372        // If cancel wins, the JoinHandle is dropped on the spot — the
373        // spawned OS thread stays parked in `fd_lock::write()` until
374        // the kernel releases, at which point the returned tuple is
375        // dropped (see contract note above) and the flock is freed.
376        let join = tokio::select! {
377            biased;
378            () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
379            res = join => res,
380        };
381
382        let (boxed, guard_static) = match join {
383            Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
384            Err(join_err) => {
385                return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
386                    path: join_err_path,
387                    source: io::Error::other(join_err.to_string()),
388                }));
389            }
390        };
391
392        Ok(PackLockHold {
393            _fd_guard: Some(guard_static),
394            _mutex_guard: Some(mutex_guard),
395            _lock: boxed,
396        })
397    }
398
399    /// Thread-blocking acquire (no tokio integration). Waits on the
400    /// fd-lock synchronously. Suitable for synchronous call sites only
401    /// — async plugin methods MUST use [`PackLock::acquire_async`] to
402    /// avoid blocking a tokio worker.
403    ///
404    /// Returns a borrowed [`RwLockWriteGuard`]; the caller owns both
405    /// the outer [`PackLock`] and the guard in scope. Mirrors the
406    /// [`crate::fs::ScopedLock`] shape.
407    ///
408    /// # Errors
409    ///
410    /// Returns [`PackLockError::Io`] if the OS lock call fails.
411    ///
412    /// # Deprecation
413    ///
414    /// Implementation strategy: restored as the original direct
415    /// blocking `fd_lock::write()` call (single-line body, no
416    /// behaviour change vs v1.2.3) — simpler and safer than a
417    /// busy-wait shim, since `fd-lock` already parks the calling OS
418    /// thread on the kernel flock and a busy-wait would burn CPU
419    /// without engaging the kernel waiter queue.
420    #[deprecated(
421        since = "1.2.4",
422        note = "use `acquire_async` for async contexts or `try_acquire` for non-blocking; sync `acquire` will be removed in v1.3.0"
423    )]
424    pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
425        self.inner.write().map_err(|source| PackLockError::Io { path: self.path.clone(), source })
426    }
427
428    /// Non-blocking probe: return [`PackLockError::Busy`] instead of
429    /// waiting when another holder has the lock. Does not engage the
430    /// async mutex — purely a fail-fast diagnostics hook.
431    ///
432    /// # Errors
433    ///
434    /// * [`PackLockError::Busy`] when a concurrent holder owns the lock.
435    /// * [`PackLockError::Io`] on any other OS-level lock failure.
436    pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
437        match self.inner.try_write() {
438            Ok(g) => Ok(g),
439            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
440                Err(PackLockError::Busy { path: self.path.clone() })
441            }
442            Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
443        }
444    }
445
446    /// Sidecar path — `<pack_path>/.grex-lock`.
447    #[must_use]
448    pub fn path(&self) -> &Path {
449        &self.path
450    }
451}
452
453impl std::fmt::Debug for PackLock {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        f.debug_struct("PackLock").field("path", &self.path).finish()
456    }
457}
458
459/// RAII guard returned by [`PackLock::acquire_async`]. Holds the
460/// sidecar-file `fd-lock` guard plus the process-wide async mutex
461/// guard. Drops in reverse acquire order.
462#[repr(C)]
463pub struct PackLockHold {
464    // Field order is load-bearing: `_fd_guard` drops first (releasing
465    // the kernel flock), then `_mutex_guard` (releasing the async
466    // serialisation slot), then `_lock` (closing the file handle).
467    // `#[repr(C)]` pins source order to layout order so `offset_of!`
468    // assertions below stay meaningful.
469    _fd_guard: Option<RwLockWriteGuard<'static, File>>,
470    _mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
471    _lock: Box<PackLock>,
472}
473
474impl PackLockHold {
475    /// Sidecar path for diagnostics.
476    #[must_use]
477    pub fn path(&self) -> &Path {
478        self._lock.path()
479    }
480}
481
482impl std::fmt::Debug for PackLockHold {
483    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
484        f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
485    }
486}
487
488// Field-order static assertion (feat-m6 B3) — the safety argument for the
489// unsafe lifetime extension in `acquire_async` depends on `_fd_guard`
490// dropping before `_lock`. Rust drops struct fields in declaration order,
491// so `_fd_guard` must sit at the lowest offset, then `_mutex_guard`, then
492// `_lock`. A refactor that reorders these fields would silently break the
493// Drop ordering and the transmuted `'static` borrow would outlive its box.
494const _: () = {
495    assert!(
496        std::mem::offset_of!(PackLockHold, _fd_guard)
497            < std::mem::offset_of!(PackLockHold, _mutex_guard),
498        "PackLockHold field order: _fd_guard must precede _mutex_guard"
499    );
500    assert!(
501        std::mem::offset_of!(PackLockHold, _mutex_guard)
502            < std::mem::offset_of!(PackLockHold, _lock),
503        "PackLockHold field order: _mutex_guard must precede _lock"
504    );
505};
506
507impl Drop for PackLockHold {
508    fn drop(&mut self) {
509        // Explicit take() on fd-lock guard first — the transmuted
510        // `'static` lifetime must expire before `_lock` drops.
511        self._fd_guard.take();
512        self._mutex_guard.take();
513        // `_lock` drops last when the struct itself drops, closing the
514        // underlying file handle.
515    }
516}
517
518// ---------------------------------------------------------------------------
519// Lock-ordering enforcement (debug builds).
520// ---------------------------------------------------------------------------
521
522/// Lock tier ordinals matching `.omne/cfg/concurrency.md`. Acquisitions
523/// must strictly increase; reversed order risks the deadlock class the
524/// feat-m6-3 Lean proof rules out.
525#[non_exhaustive]
526#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
527pub enum Tier {
528    /// Workspace sync lock — `<workspace>/.grex.sync.lock`.
529    WorkspaceSync = 0,
530    /// Scheduler semaphore permit — feat-m6-1.
531    Semaphore = 1,
532    /// Per-pack `.grex-lock` — feat-m6-2 (this module).
533    PerPack = 2,
534    /// Per-repo backend lock — `<dest>.grex-backend.lock`.
535    Backend = 3,
536    /// Manifest RW lock — `.grex/events.jsonl` sidecar.
537    Manifest = 4,
538}
539
540/// Run `f` while `tier` is pushed on the current thread's tier stack.
541/// Debug builds enforce strictly-increasing order across nested calls;
542/// release builds skip the check entirely.
543#[cfg(debug_assertions)]
544pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
545    tier::push(tier);
546    let out = f();
547    tier::pop_if_top(tier);
548    out
549}
550
551/// Release-build no-op mirror of [`with_tier`].
552#[cfg(not(debug_assertions))]
553#[inline]
554pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
555    f()
556}
557
558/// Wrap an async future in a task-local tier-stack scope so any
559/// [`TierGuard`] pushes inside it land in the correct frame even when
560/// the task migrates across tokio workers after `.await`. Release
561/// builds compile this down to the raw future — no scope, no cost.
562///
563/// Callers should wrap every top-level async dispatch (e.g. the
564/// per-pack plugin lifecycle calls driven by `rt.block_on(...)`) so
565/// the tier check can operate on a fresh stack per dispatch.
566#[cfg(debug_assertions)]
567pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
568    tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
569}
570
571/// Release-build no-op mirror of [`with_tier_scope`].
572#[cfg(not(debug_assertions))]
573#[inline]
574pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
575    f.await
576}
577
578/// RAII guard — pushes a tier onto the current-thread stack on
579/// construction, pops on drop. Lets lifecycle prologues enforce
580/// tier ordering across `.await` points without nesting the rest of
581/// the body inside a `with_tier` closure. Debug builds carry the
582/// ordering check; release builds compile to a zero-sized no-op.
583///
584/// Field/declaration-order note: callers must declare the guard
585/// **before** the permit/hold it is scoping. Rust drops locals in
586/// reverse declaration order, so `_tier` declared first drops last —
587/// after the lock/permit releases — matching `with_tier` semantics.
588#[must_use]
589pub struct TierGuard {
590    #[cfg(debug_assertions)]
591    tier: Tier,
592    // Sized placeholder for release so the type is still `Sized` and
593    // `must_use`-meaningful. Zero-sized once inlined.
594    #[cfg(not(debug_assertions))]
595    _private: (),
596}
597
598impl TierGuard {
599    /// Push `tier` onto the current-thread tier stack. Debug builds
600    /// assert strictly-increasing order against the existing top.
601    #[cfg(debug_assertions)]
602    pub fn push(tier: Tier) -> Self {
603        tier::push(tier);
604        TierGuard { tier }
605    }
606
607    /// Release-build no-op constructor.
608    #[cfg(not(debug_assertions))]
609    #[inline]
610    pub fn push(_tier: Tier) -> Self {
611        TierGuard { _private: () }
612    }
613}
614
615#[cfg(debug_assertions)]
616impl Drop for TierGuard {
617    fn drop(&mut self) {
618        tier::pop_if_top(self.tier);
619    }
620}
621
622#[cfg(debug_assertions)]
623pub(crate) mod tier {
624    use super::Tier;
625    use std::cell::RefCell;
626
627    // feat-m6 CI fix — previously this used `thread_local!`, but under a
628    // tokio multi-thread runtime a task can resume on a different worker
629    // after `.await`. A push on worker A followed by a yield and a pop on
630    // worker B left A's stack polluted and tripped the tier-ordering
631    // assertion on the next acquire. Migrating to `tokio::task_local!`
632    // pins the stack to the *task*, not the worker, so nested
633    // `TierGuard` bookkeeping follows the task across workers.
634    //
635    // `try_with` silently no-ops outside a `TIER_STACK.scope(...)`
636    // frame — that makes the module safe to use from synchronous
637    // (non-tokio) test harnesses and the module's own unit tests at
638    // the cost of debug-only tier enforcement being disabled there.
639    // Production dispatch wraps every pack-type plugin call in a scope
640    // (see `sync::dispatch_*`), so real runs retain enforcement.
641    tokio::task_local! {
642        pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
643    }
644
645    pub fn push(next: Tier) {
646        let _ = TIER_STACK.try_with(|s| {
647            let mut s = s.borrow_mut();
648            if let Some(&top) = s.last() {
649                assert!(
650                    next > top,
651                    "lock tier violation: trying to acquire {next:?} while already holding {top:?} \
652                     (tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
653                );
654            }
655            s.push(next);
656        });
657    }
658
659    pub fn pop_if_top(expected: Tier) {
660        let _ = TIER_STACK.try_with(|s| {
661            let mut s = s.borrow_mut();
662            if s.last().copied() == Some(expected) {
663                s.pop();
664            } else {
665                tracing::error!(
666                    target: "grex::concurrency",
667                    "tier pop mismatch: expected {:?} at top, stack = {:?}",
668                    expected,
669                    *s
670                );
671            }
672        });
673    }
674}
675
676// ---------------------------------------------------------------------------
677// Tests.
678// ---------------------------------------------------------------------------
679
680#[cfg(test)]
681#[allow(deprecated)] // exercising deprecated `PackLock::acquire` for back-compat
682mod tests {
683    use super::*;
684    use std::sync::{Arc, Barrier};
685    use std::thread;
686    use std::time::{Duration, Instant};
687    use tempfile::tempdir;
688    use tokio_util::sync::CancellationToken;
689
690    #[test]
691    fn pack_lock_acquires_creates_file() {
692        let dir = tempdir().unwrap();
693        let mut plock = PackLock::open(dir.path()).unwrap();
694        let expected = plock.path().to_path_buf();
695        let _guard = plock.acquire().unwrap();
696        assert!(expected.exists(), "open must create the sidecar file");
697        assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
698    }
699
700    #[test]
701    fn pack_lock_second_try_acquire_reports_busy_while_held() {
702        let dir = tempdir().unwrap();
703        let mut first = PackLock::open(dir.path()).unwrap();
704        let _held = first.acquire().unwrap();
705        let mut second = PackLock::open(dir.path()).unwrap();
706        let err = second.try_acquire().unwrap_err();
707        match err {
708            PackLockError::Busy { path } => {
709                assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
710            }
711            other => panic!("expected Busy, got {other:?}"),
712        }
713    }
714
715    #[test]
716    fn pack_lock_release_on_drop() {
717        let dir = tempdir().unwrap();
718        {
719            let mut first = PackLock::open(dir.path()).unwrap();
720            let _g = first.acquire().unwrap();
721        }
722        let mut second = PackLock::open(dir.path()).unwrap();
723        let _g = second.acquire().unwrap();
724    }
725
726    #[test]
727    fn pack_lock_path_contains_pack_path() {
728        let dir = tempdir().unwrap();
729        let plock = PackLock::open(dir.path()).unwrap();
730        let p = plock.path();
731        assert!(p.starts_with(dir.path()));
732        assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
733    }
734
735    #[test]
736    fn pack_lock_blocking_acquire_waits_for_holder() {
737        let dir = tempdir().unwrap();
738        let path = dir.path().to_path_buf();
739        let barrier = Arc::new(Barrier::new(2));
740        let holder_barrier = Arc::clone(&barrier);
741        let holder_path = path.clone();
742
743        let holder = thread::spawn(move || {
744            let mut lock = PackLock::open(&holder_path).unwrap();
745            let _g = lock.acquire().unwrap();
746            holder_barrier.wait();
747            thread::sleep(Duration::from_millis(100));
748        });
749
750        barrier.wait();
751        let start = Instant::now();
752        let mut second = PackLock::open(&path).unwrap();
753        let _g = second.acquire().unwrap();
754        let waited = start.elapsed();
755        holder.join().unwrap();
756        assert!(
757            waited >= Duration::from_millis(40),
758            "blocking acquire must have waited (observed {waited:?})"
759        );
760    }
761
762    #[test]
763    fn pack_lock_distinct_paths_do_not_contend() {
764        let a = tempdir().unwrap();
765        let b = tempdir().unwrap();
766        let mut la = PackLock::open(a.path()).unwrap();
767        let _ga = la.acquire().unwrap();
768        let mut lb = PackLock::open(b.path()).unwrap();
769        let _gb = lb.try_acquire().unwrap();
770    }
771
772    #[tokio::test]
773    async fn async_acquire_serialises_in_process() {
774        // Two concurrent acquire_async calls on the same pack path
775        // must serialise cleanly (no hang).
776        let dir = tempdir().unwrap();
777        let path = dir.path().to_path_buf();
778        let path_clone = path.clone();
779        let h1 = tokio::spawn(async move {
780            let lock = PackLock::open(&path).unwrap();
781            let _hold = lock.acquire_async().await.unwrap();
782            tokio::time::sleep(Duration::from_millis(30)).await;
783        });
784        let h2 = tokio::spawn(async move {
785            tokio::time::sleep(Duration::from_millis(5)).await;
786            let lock = PackLock::open(&path_clone).unwrap();
787            let _hold = lock.acquire_async().await.unwrap();
788        });
789        h1.await.unwrap();
790        h2.await.unwrap();
791    }
792
793    // --- tier ordering (debug-only) -----------------------------------------
794
795    // feat-m6 CI fix — tier enforcement now lives in a `tokio::task_local!`
796    // stack, so these tests drive the check through a scoped task to
797    // establish the frame. `try_with` outside a scope silently no-ops.
798
799    #[cfg(debug_assertions)]
800    #[tokio::test]
801    async fn tier_strictly_increasing_ok() {
802        tier::TIER_STACK
803            .scope(std::cell::RefCell::new(Vec::new()), async {
804                with_tier(Tier::Semaphore, || {
805                    with_tier(Tier::PerPack, || {
806                        with_tier(Tier::Backend, || {
807                            with_tier(Tier::Manifest, || {});
808                        });
809                    });
810                });
811            })
812            .await;
813    }
814
815    #[cfg(debug_assertions)]
816    #[tokio::test]
817    async fn tier_reversed_panics_in_debug() {
818        use std::panic::{catch_unwind, AssertUnwindSafe};
819        let result = tier::TIER_STACK
820            .scope(std::cell::RefCell::new(Vec::new()), async {
821                catch_unwind(AssertUnwindSafe(|| {
822                    with_tier(Tier::PerPack, || {
823                        with_tier(Tier::Semaphore, || {});
824                    });
825                }))
826            })
827            .await;
828        assert!(result.is_err(), "reversed tier order must panic in debug builds");
829    }
830
831    // --- acquire_cancellable (feat-m7-1 Stage 4) -----------------------------
832
833    /// 4.T1 — uncontended path returns Ok(PackLockHold).
834    #[tokio::test]
835    async fn acquire_cancellable_happy_path() {
836        let dir = tempdir().unwrap();
837        let lock = PackLock::open(dir.path()).unwrap();
838        let token = CancellationToken::new();
839        let result = lock.acquire_cancellable(&token).await;
840        assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
841    }
842
843    /// 4.T2 — task A holds the lock; task B's token fires after 10 ms;
844    /// B must surface `Err(Cancelled)` within 50 ms.
845    #[tokio::test]
846    async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
847        let dir = tempdir().unwrap();
848        let path = dir.path().to_path_buf();
849        let path_b = path.clone();
850
851        // Task A: acquire and hold for 500 ms (long enough to cover B's window).
852        let a_started = Arc::new(tokio::sync::Notify::new());
853        let a_started_clone = Arc::clone(&a_started);
854        let a = tokio::spawn(async move {
855            let lock = PackLock::open(&path).unwrap();
856            let _hold = lock.acquire_async().await.unwrap();
857            a_started_clone.notify_one();
858            tokio::time::sleep(Duration::from_millis(500)).await;
859        });
860        a_started.notified().await;
861
862        let token = CancellationToken::new();
863        let cancel_handle = token.clone();
864        let canceller = tokio::spawn(async move {
865            tokio::time::sleep(Duration::from_millis(10)).await;
866            cancel_handle.cancel();
867        });
868
869        let started = Instant::now();
870        let lock_b = PackLock::open(&path_b).unwrap();
871        let result =
872            tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
873                .await
874                .expect("acquire_cancellable must return within 50 ms after cancel");
875
876        let waited = started.elapsed();
877        assert!(
878            matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
879            "expected Err(Cancelled), got {result:?} after {waited:?}"
880        );
881
882        canceller.await.unwrap();
883        a.abort();
884        let _ = a.await;
885    }
886
887    // --- helpers for 4.T3 / 4.T4 -------------------------------------------
888
889    /// Spawn a "holder" task that acquires `path` and releases on
890    /// signal. Returns `(JoinHandle, started_notify, release_notify)`.
891    /// Caller awaits `started` to know the lock is held, then fires
892    /// `release` when ready to let the holder drop its guard.
893    fn spawn_holder(
894        path: PathBuf,
895    ) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
896        let started = Arc::new(tokio::sync::Notify::new());
897        let release = Arc::new(tokio::sync::Notify::new());
898        let started_c = Arc::clone(&started);
899        let release_c = Arc::clone(&release);
900        let h = tokio::spawn(async move {
901            let lock = PackLock::open(&path).unwrap();
902            let _hold = lock.acquire_async().await.unwrap();
903            started_c.notify_one();
904            release_c.notified().await;
905        });
906        (h, started, release)
907    }
908
909    /// Poll `acquire_async` against `path` until it succeeds or the
910    /// outer deadline expires. Returns `Ok(())` on success, `Err(())`
911    /// on timeout.
912    async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
913        tokio::time::timeout(deadline, async move {
914            loop {
915                let lock = PackLock::open(&path).unwrap();
916                if let Ok(Ok(_hold)) =
917                    tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
918                {
919                    return;
920                }
921                tokio::time::sleep(Duration::from_millis(20)).await;
922            }
923        })
924        .await
925        .map_err(|_| ())
926    }
927
928    /// 4.T3 — regression for the documented OS-thread leak window:
929    /// after B is cancelled, A releases its lock; the spawn_blocking
930    /// thread that B kicked off must eventually unblock and drop its
931    /// guard. We observe this by polling from a third task C — if B's
932    /// blocking thread leaked its guard, C would wait forever.
933    #[tokio::test]
934    async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
935        let dir = tempdir().unwrap();
936        let path = dir.path().to_path_buf();
937        let (a, a_started, release_a) = spawn_holder(path.clone());
938        a_started.notified().await;
939
940        // Task B: race against cancel while A holds the fd-lock.
941        let token = CancellationToken::new();
942        let cancel_handle = token.clone();
943        let path_b = path.clone();
944        let b = tokio::spawn(async move {
945            let lock = PackLock::open(&path_b).unwrap();
946            lock.acquire_cancellable(&token).await
947        });
948        tokio::time::sleep(Duration::from_millis(20)).await;
949        cancel_handle.cancel();
950        let b_result = tokio::time::timeout(Duration::from_millis(100), b)
951            .await
952            .expect("B must resolve quickly after cancel")
953            .expect("B task panicked");
954        assert!(
955            matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
956            "expected B to see Cancelled, got {b_result:?}"
957        );
958
959        // Release A — B's parked OS thread should drain.
960        release_a.notify_one();
961        a.await.unwrap();
962
963        assert!(
964            poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
965            "task C never acquired — spawn_blocking thread leaked its fd-lock guard"
966        );
967    }
968
969    /// 4.T4 — covers the outer-mutex cancel arm of the `select!` in
970    /// `acquire_cancellable`. Task A holds the in-process async mutex
971    /// (via `acquire_async`, which acquires both tiers); task B calls
972    /// `acquire_cancellable` and is parked on `lock_owned()` (NOT yet
973    /// at the fd-lock blocking call). Cancelling B's token must
974    /// short-circuit the mutex wait and return `Err(Cancelled)`.
975    #[tokio::test]
976    async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
977        let dir = tempdir().unwrap();
978        let path = dir.path().to_path_buf();
979        let (a, a_started, release_a) = spawn_holder(path.clone());
980        a_started.notified().await;
981
982        let token = CancellationToken::new();
983        let cancel_handle = token.clone();
984        let canceller = tokio::spawn(async move {
985            tokio::time::sleep(Duration::from_millis(10)).await;
986            cancel_handle.cancel();
987        });
988
989        let lock_b = PackLock::open(&path).unwrap();
990        let result =
991            tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
992                .await
993                .expect("acquire_cancellable must return within 50 ms after cancel");
994
995        assert!(
996            matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
997            "expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
998        );
999
1000        canceller.await.unwrap();
1001        release_a.notify_one();
1002        a.await.unwrap();
1003    }
1004}