grex_core/pack_lock.rs
1//! Per-pack `.grex-lock` file lock — feat-m6-2.
2//!
3//! Acquires an exclusive `fd-lock` guard on `<pack_path>/.grex-lock` for the
4//! full duration of a pack-type plugin lifecycle method. Prevents two
5//! concurrent tasks (in-process) or processes (cross-process) from operating
6//! on the same pack at the same time.
7//!
8//! ## Lock ordering (tier 3 of 5)
9//!
10//! The spec fixes the global acquire order as:
11//!
12//! 1. workspace-sync lock (`<workspace>/.grex.sync.lock`)
13//! 2. scheduler semaphore permit (feat-m6-1)
14//! 3. **per-pack `.grex-lock`** — this module
15//! 4. per-repo backend lock (`<dest>.grex-backend.lock`)
16//! 5. manifest RW lock (`.grex/events.jsonl` sidecar)
17//!
18//! Plugins acquire tier 2 (permit) and tier 3 (pack lock) in that order
19//! inside every `PackTypePlugin` method. In debug builds, [`with_tier`]
20//! enforces strictly-increasing tiers on a thread-local stack; a reversal
21//! panics in debug and logs `tracing::error!` in release.
22//!
23//! ## In-process vs cross-process serialisation
24//!
25//! `fd-lock`'s `write()` is synchronous and blocks the calling OS thread
26//! until the kernel flock is free. Calling it directly inside an async
27//! plugin method would block a tokio worker thread; with a multi-thread
28//! runtime this scales poorly, and recursive re-entry on the same pack
29//! (meta-plugin walking into a symlinked child that points back at the
30//! parent) hangs the task outright because the second `write()` waits on
31//! the first, which is still on-stack.
32//!
33//! To avoid both problems we layer a process-wide async mutex keyed by
34//! canonical pack path **in front of** the fd-lock acquire:
35//!
36//! * [`PackLock::acquire_async`] first takes the canonical-path mutex
37//! (`tokio::sync::Mutex`), which serialises in-process tasks without
38//! blocking workers and detects same-task re-entry as a
39//! [`PackLockError::Busy`] via `try_lock`.
40//! * Inside the async mutex it calls the blocking fd-lock `write()` —
41//! fast because the only remaining contention is cross-process, which
42//! is rare.
43//! * On `Drop` it releases the fd-lock guard first, then the async
44//! mutex guard — reverse acquire order.
45
46#![allow(unsafe_code)]
47
48use std::collections::HashMap;
49use std::fs::{File, OpenOptions};
50use std::io;
51use std::path::{Path, PathBuf};
52use std::sync::{Arc, Mutex, OnceLock};
53
54use fd_lock::{RwLock, RwLockWriteGuard};
55
56/// Stable name of the per-pack lock file created inside every pack root.
57/// Exported so the managed-gitignore writer can hide it from `git status`.
58pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
59
60/// Error surfaced by [`PackLock::open`], [`PackLock::acquire`], and
61/// [`PackLock::try_acquire`].
62///
63/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
64/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`].
65#[non_exhaustive]
66#[derive(Debug, thiserror::Error)]
67pub enum PackLockError {
68 /// I/O error opening or locking the sidecar file.
69 #[error("pack lock i/o on `{}`: {source}", path.display())]
70 Io {
71 /// Resolved `<pack_path>/.grex-lock` path.
72 path: PathBuf,
73 /// Underlying OS error.
74 #[source]
75 source: io::Error,
76 },
77 /// Non-blocking probe returned busy. The blocking path
78 /// ([`PackLock::acquire_async`]) never produces this for cross-pack
79 /// contention — it waits. Emitted by:
80 ///
81 /// * [`PackLock::try_acquire`] on any contention.
82 /// * [`PackLock::acquire_async`] on same-process re-entry (a plugin
83 /// that recurses back into the same pack root). Cross-task
84 /// contention blocks on the async mutex and never surfaces here.
85 #[error("pack lock `{}` is busy", path.display())]
86 Busy {
87 /// Lock path that was contended.
88 path: PathBuf,
89 },
90}
91
92/// Outcome of [`PackLock::acquire_cancellable`] — either the
93/// underlying lock acquire failed, or the supplied cancellation token
94/// fired before the guard was returned. Distinct from
95/// [`crate::scheduler::Cancelled`]: that ZST signals semaphore-permit
96/// cancellation; this variant signals pack-lock cancellation. Verb
97/// bodies translate either into the same `PluginError::Cancelled` at
98/// the call site (feat-m7-1 Stages 6-7).
99#[non_exhaustive]
100#[derive(Debug, thiserror::Error)]
101pub enum PackLockErrorOrCancelled {
102 /// The cancellation token fired before the lock was acquired.
103 /// The spawned blocking thread (if launched) may still be parked
104 /// in `fd_lock::write()` — see [`PackLock::acquire_cancellable`]
105 /// for the OS-thread leak-window contract.
106 #[error("pack lock acquire cancelled")]
107 Cancelled,
108 /// The underlying lock acquire failed before cancellation fired.
109 #[error(transparent)]
110 Lock(#[from] PackLockError),
111}
112
113use std::sync::Weak;
114
115fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
116 static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
117 REG.get_or_init(|| Mutex::new(HashMap::new()))
118}
119
120/// feat-m6 H3 — prune entries whose only remaining reference was the
121/// registry itself. Called opportunistically on every `mutex_for` so
122/// long-running processes that open many distinct pack paths do not
123/// accumulate unbounded registry entries. Runs under the registry
124/// mutex so there is no race against a concurrent `mutex_for`.
125fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
126 reg.retain(|_, weak| weak.strong_count() > 0);
127}
128
129fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
130 let mut reg = path_mutex_registry()
131 .lock()
132 .expect("pack lock path registry poisoned — this indicates a prior panic");
133 // Try to reuse an existing live entry. If the Weak is dead
134 // (no outstanding holders) fall through to insert a fresh Arc.
135 if let Some(weak) = reg.get(canonical) {
136 if let Some(existing) = weak.upgrade() {
137 return existing;
138 }
139 }
140 prune_dead(&mut reg);
141 let m = Arc::new(tokio::sync::Mutex::new(()));
142 reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
143 m
144}
145
146fn canonical_or_raw(path: &Path) -> PathBuf {
147 std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
148}
149
150/// Per-pack file lock wrapper.
151///
152/// Construction via [`PackLock::open`] creates (or re-opens) the sidecar
153/// `<pack_path>/.grex-lock` but does **not** acquire the lock — call
154/// [`PackLock::acquire_async`] for the async-safe blocking path or
155/// [`PackLock::try_acquire`] for a fail-fast probe.
156///
157/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
158/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`]. The sync
159/// blocking variant will be removed in v1.3.0.
160pub struct PackLock {
161 inner: RwLock<File>,
162 path: PathBuf,
163 canonical: PathBuf,
164}
165
166impl PackLock {
167 /// Open (and create if missing) the sidecar `<pack_path>/.grex-lock`.
168 /// Does **not** acquire the lock.
169 ///
170 /// # Errors
171 ///
172 /// Returns [`PackLockError::Io`] if the sidecar cannot be opened or
173 /// its parent directory cannot be created.
174 pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
175 let path = pack_path.join(PACK_LOCK_FILE_NAME);
176 if let Some(parent) = path.parent() {
177 std::fs::create_dir_all(parent)
178 .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
179 }
180 let file = OpenOptions::new()
181 .read(true)
182 .write(true)
183 .create(true)
184 .truncate(false)
185 .open(&path)
186 .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
187 let canonical = canonical_or_raw(pack_path);
188 Ok(Self { inner: RwLock::new(file), path, canonical })
189 }
190
191 /// Async acquire — pairs a process-wide [`tokio::sync::Mutex`] keyed
192 /// by canonical pack path with the sidecar `fd-lock`. Safe to call
193 /// from any tokio worker without blocking the runtime. Same-thread
194 /// re-entry (nested synchronous call chain that re-enters the same
195 /// pack root, e.g. meta-plugin recursion through a `..` child that
196 /// points back at the parent) returns [`PackLockError::Busy`] rather
197 /// than deadlocking.
198 ///
199 /// The returned [`PackLockHold`] drops the fd-lock guard and the
200 /// async mutex guard in reverse acquire order at end-of-scope.
201 ///
202 /// # Errors
203 ///
204 /// * [`PackLockError::Busy`] on same-thread re-entry.
205 /// * [`PackLockError::Io`] on any OS-level lock failure.
206 pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
207 // Same-thread re-entry detection — see module-level note on
208 // tokio tasks and thread affinity. This covers the common
209 // case where a nested plugin call runs on the same worker
210 // thread between `.await` points (meta-plugin recursion).
211 // Different threads holding the same pack root's mutex will
212 // queue on `lock_owned().await` below instead.
213 // Serialise in-process tasks on the canonical path via an
214 // async mutex — safe across tokio workers and non-blocking on
215 // the async runtime. Same-task re-entry (recursive plugin
216 // invocation on the same pack root) is the caller's
217 // responsibility to prevent via cycle detection; a same-task
218 // re-entry here would hang at `lock_owned().await` because
219 // tokio mutexes are non-reentrant.
220 //
221 // [`crate::plugin::pack_type::MetaPlugin`] threads the
222 // `visited_meta` set through recursion and inserts the pack
223 // root at every lifecycle entry so cycles halt with
224 // [`crate::execute::ExecError::MetaCycle`] before this mutex
225 // acquire runs. Other pack-type plugins are leaf by design
226 // (declarative, scripted) and cannot re-enter.
227 let mtx = mutex_for(&self.canonical);
228 let mutex_guard = Arc::clone(&mtx).lock_owned().await;
229
230 // Box `self` so its address is stable for the transmuted
231 // `'static` guard lifetime. Take the fd-lock guard from the
232 // boxed lock's `inner`.
233 let boxed = Box::new(self);
234 // feat-m6 H1 — `fd_lock::RwLock::write` is a synchronous
235 // blocking call that waits on the kernel flock. Running it
236 // directly on a tokio worker would park that worker until
237 // the kernel released the lock, starving the runtime when
238 // the only remaining contention is cross-process. Hop onto
239 // the blocking-thread pool so async workers stay free. The
240 // acquire happens inside `spawn_blocking` and the guard is
241 // transmuted to `'static` before leaving the closure so the
242 // box + guard can be returned as a pair.
243 let join = tokio::task::spawn_blocking(
244 move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
245 let mut boxed = boxed;
246 // SAFETY: see outer comment block — `boxed` is moved
247 // into the returned pair and never freed while the
248 // guard is live; field order in `PackLockHold` makes
249 // the guard drop first. Transmuting here (inside the
250 // closure) lets us return both the box and the guard.
251 let guard_ref = boxed
252 .inner
253 .write()
254 .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
255 let guard_static: RwLockWriteGuard<'static, File> = unsafe {
256 std::mem::transmute::<
257 RwLockWriteGuard<'_, File>,
258 RwLockWriteGuard<'static, File>,
259 >(guard_ref)
260 };
261 Ok((boxed, guard_static))
262 },
263 )
264 .await;
265 let (boxed, guard_static) = match join {
266 Ok(res) => res?,
267 Err(join_err) => {
268 return Err(PackLockError::Io {
269 path: PathBuf::new(),
270 source: io::Error::other(join_err.to_string()),
271 });
272 }
273 };
274
275 Ok(PackLockHold {
276 _fd_guard: Some(guard_static),
277 _mutex_guard: Some(mutex_guard),
278 _lock: boxed,
279 })
280 }
281
282 /// Cancellable async acquire — same semantics as
283 /// [`PackLock::acquire_async`] but races the acquire against a
284 /// [`tokio_util::sync::CancellationToken`]. Used by the embedded
285 /// MCP server (feat-m7-1) so a `notifications/cancelled` from the
286 /// client unblocks tool handlers that are parked on a contended
287 /// pack lock.
288 ///
289 /// **Consumes `self`** to mirror [`PackLock::acquire_async`] — the
290 /// same boxed-fd + transmute lifetime dance is needed to hand the
291 /// guard back across a `spawn_blocking` boundary, and reusing the
292 /// consumes-self contract preserves drop ordering against
293 /// [`PackLockHold`].
294 ///
295 /// ## OS-thread leak window — contract
296 ///
297 /// `fd_lock::write()` is a synchronous syscall that parks the
298 /// calling OS thread until the kernel releases the flock. Once the
299 /// blocking call has been launched on the
300 /// [`tokio::task::spawn_blocking`] pool, **the runtime cannot
301 /// interrupt it** — there is no portable way to unpark a thread
302 /// blocked in `flock(2)`. When the cancellation token fires we
303 /// resolve the outer `select!` with [`PackLockErrorOrCancelled::Cancelled`]
304 /// immediately, but the spawned OS thread stays parked until the
305 /// holder eventually releases. When that happens, the spawned
306 /// thread acquires the guard, the `JoinHandle` resolves to
307 /// `Ok((boxed, guard))`, and the tuple is dropped on the spot
308 /// (because the `select!` arm has already won) — at which point
309 /// the guard's `Drop` releases the kernel flock and a subsequent
310 /// acquirer can proceed.
311 ///
312 /// In other words: **cancellation is observable to the caller
313 /// instantly, but the underlying OS thread holds the lock briefly
314 /// past the cancel point, until the syscall returns**. Callers
315 /// that immediately re-attempt acquire on the same path may see
316 /// transient contention until that thread drains. See
317 /// `.omne/cfg/mcp.md` §Cancellation.
318 ///
319 /// # Errors
320 ///
321 /// * [`PackLockErrorOrCancelled::Cancelled`] — the token fired
322 /// before a guard was returned.
323 /// * [`PackLockErrorOrCancelled::Lock`] wrapping
324 /// [`PackLockError::Busy`] on same-thread re-entry, or
325 /// [`PackLockError::Io`] on any OS-level lock failure.
326 pub async fn acquire_cancellable(
327 self,
328 cancel: &::tokio_util::sync::CancellationToken,
329 ) -> Result<PackLockHold, PackLockErrorOrCancelled> {
330 // Mirror `acquire_async`: serialise on the canonical-path
331 // async mutex first. Race the mutex acquire itself against
332 // cancel — same-task re-entry would normally hang here, but
333 // the cancel arm gives the caller an out.
334 let mtx = mutex_for(&self.canonical);
335 let mutex_guard = tokio::select! {
336 biased;
337 () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
338 g = Arc::clone(&mtx).lock_owned() => g,
339 };
340
341 // Box `self` so the address is stable for the transmuted
342 // `'static` guard lifetime — same dance as `acquire_async`.
343 let boxed = Box::new(self);
344 // Capture the sidecar path before the move into the closure
345 // so the JoinError arm below can report it (the closure
346 // consumes `boxed`, so we cannot read it from there).
347 let join_err_path = boxed.path.clone();
348 // feat-m7-1 — replicates the `acquire_async` blocking-pool
349 // hop. The closure body is intentionally identical (do NOT
350 // refactor — see the SAFETY note in `acquire_async`).
351 let join = tokio::task::spawn_blocking(
352 move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
353 let mut boxed = boxed;
354 // SAFETY: see `acquire_async` — `boxed` is moved into
355 // the returned pair and never freed while the guard
356 // is live; field order in `PackLockHold` makes the
357 // guard drop first.
358 let guard_ref = boxed
359 .inner
360 .write()
361 .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
362 let guard_static: RwLockWriteGuard<'static, File> = unsafe {
363 std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
364 guard_ref,
365 )
366 };
367 Ok((boxed, guard_static))
368 },
369 );
370
371 // Race the blocking acquire against the cancellation token.
372 // If cancel wins, the JoinHandle is dropped on the spot — the
373 // spawned OS thread stays parked in `fd_lock::write()` until
374 // the kernel releases, at which point the returned tuple is
375 // dropped (see contract note above) and the flock is freed.
376 let join = tokio::select! {
377 biased;
378 () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
379 res = join => res,
380 };
381
382 let (boxed, guard_static) = match join {
383 Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
384 Err(join_err) => {
385 return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
386 path: join_err_path,
387 source: io::Error::other(join_err.to_string()),
388 }));
389 }
390 };
391
392 Ok(PackLockHold {
393 _fd_guard: Some(guard_static),
394 _mutex_guard: Some(mutex_guard),
395 _lock: boxed,
396 })
397 }
398
399 /// Thread-blocking acquire (no tokio integration). Waits on the
400 /// fd-lock synchronously. Suitable for synchronous call sites only
401 /// — async plugin methods MUST use [`PackLock::acquire_async`] to
402 /// avoid blocking a tokio worker.
403 ///
404 /// Returns a borrowed [`RwLockWriteGuard`]; the caller owns both
405 /// the outer [`PackLock`] and the guard in scope. Mirrors the
406 /// [`crate::fs::ScopedLock`] shape.
407 ///
408 /// # Errors
409 ///
410 /// Returns [`PackLockError::Io`] if the OS lock call fails.
411 ///
412 /// # Deprecation
413 ///
414 /// Implementation strategy: restored as the original direct
415 /// blocking `fd_lock::write()` call (single-line body, no
416 /// behaviour change vs v1.2.3) — simpler and safer than a
417 /// busy-wait shim, since `fd-lock` already parks the calling OS
418 /// thread on the kernel flock and a busy-wait would burn CPU
419 /// without engaging the kernel waiter queue.
420 #[deprecated(
421 since = "1.2.4",
422 note = "use `acquire_async` for async contexts or `try_acquire` for non-blocking; sync `acquire` will be removed in v1.3.0"
423 )]
424 pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
425 self.inner.write().map_err(|source| PackLockError::Io { path: self.path.clone(), source })
426 }
427
428 /// Non-blocking probe: return [`PackLockError::Busy`] instead of
429 /// waiting when another holder has the lock. Does not engage the
430 /// async mutex — purely a fail-fast diagnostics hook.
431 ///
432 /// # Errors
433 ///
434 /// * [`PackLockError::Busy`] when a concurrent holder owns the lock.
435 /// * [`PackLockError::Io`] on any other OS-level lock failure.
436 pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
437 match self.inner.try_write() {
438 Ok(g) => Ok(g),
439 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
440 Err(PackLockError::Busy { path: self.path.clone() })
441 }
442 Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
443 }
444 }
445
446 /// Sidecar path — `<pack_path>/.grex-lock`.
447 #[must_use]
448 pub fn path(&self) -> &Path {
449 &self.path
450 }
451}
452
453impl std::fmt::Debug for PackLock {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 f.debug_struct("PackLock").field("path", &self.path).finish()
456 }
457}
458
459/// RAII guard returned by [`PackLock::acquire_async`]. Holds the
460/// sidecar-file `fd-lock` guard plus the process-wide async mutex
461/// guard. Drops in reverse acquire order.
462#[repr(C)]
463pub struct PackLockHold {
464 // Field order is load-bearing: `_fd_guard` drops first (releasing
465 // the kernel flock), then `_mutex_guard` (releasing the async
466 // serialisation slot), then `_lock` (closing the file handle).
467 // `#[repr(C)]` pins source order to layout order so `offset_of!`
468 // assertions below stay meaningful.
469 _fd_guard: Option<RwLockWriteGuard<'static, File>>,
470 _mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
471 _lock: Box<PackLock>,
472}
473
474impl PackLockHold {
475 /// Sidecar path for diagnostics.
476 #[must_use]
477 pub fn path(&self) -> &Path {
478 self._lock.path()
479 }
480}
481
482impl std::fmt::Debug for PackLockHold {
483 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
484 f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
485 }
486}
487
488// Field-order static assertion (feat-m6 B3) — the safety argument for the
489// unsafe lifetime extension in `acquire_async` depends on `_fd_guard`
490// dropping before `_lock`. Rust drops struct fields in declaration order,
491// so `_fd_guard` must sit at the lowest offset, then `_mutex_guard`, then
492// `_lock`. A refactor that reorders these fields would silently break the
493// Drop ordering and the transmuted `'static` borrow would outlive its box.
494const _: () = {
495 assert!(
496 std::mem::offset_of!(PackLockHold, _fd_guard)
497 < std::mem::offset_of!(PackLockHold, _mutex_guard),
498 "PackLockHold field order: _fd_guard must precede _mutex_guard"
499 );
500 assert!(
501 std::mem::offset_of!(PackLockHold, _mutex_guard)
502 < std::mem::offset_of!(PackLockHold, _lock),
503 "PackLockHold field order: _mutex_guard must precede _lock"
504 );
505};
506
507impl Drop for PackLockHold {
508 fn drop(&mut self) {
509 // Explicit take() on fd-lock guard first — the transmuted
510 // `'static` lifetime must expire before `_lock` drops.
511 self._fd_guard.take();
512 self._mutex_guard.take();
513 // `_lock` drops last when the struct itself drops, closing the
514 // underlying file handle.
515 }
516}
517
518// ---------------------------------------------------------------------------
519// Lock-ordering enforcement (debug builds).
520// ---------------------------------------------------------------------------
521
522/// Lock tier ordinals matching `.omne/cfg/concurrency.md`. Acquisitions
523/// must strictly increase; reversed order risks the deadlock class the
524/// feat-m6-3 Lean proof rules out.
525#[non_exhaustive]
526#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
527pub enum Tier {
528 /// Workspace sync lock — `<workspace>/.grex.sync.lock`.
529 WorkspaceSync = 0,
530 /// Scheduler semaphore permit — feat-m6-1.
531 Semaphore = 1,
532 /// Per-pack `.grex-lock` — feat-m6-2 (this module).
533 PerPack = 2,
534 /// Per-repo backend lock — `<dest>.grex-backend.lock`.
535 Backend = 3,
536 /// Manifest RW lock — `.grex/events.jsonl` sidecar.
537 Manifest = 4,
538}
539
540/// Run `f` while `tier` is pushed on the current thread's tier stack.
541/// Debug builds enforce strictly-increasing order across nested calls;
542/// release builds skip the check entirely.
543#[cfg(debug_assertions)]
544pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
545 tier::push(tier);
546 let out = f();
547 tier::pop_if_top(tier);
548 out
549}
550
551/// Release-build no-op mirror of [`with_tier`].
552#[cfg(not(debug_assertions))]
553#[inline]
554pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
555 f()
556}
557
558/// Wrap an async future in a task-local tier-stack scope so any
559/// [`TierGuard`] pushes inside it land in the correct frame even when
560/// the task migrates across tokio workers after `.await`. Release
561/// builds compile this down to the raw future — no scope, no cost.
562///
563/// Callers should wrap every top-level async dispatch (e.g. the
564/// per-pack plugin lifecycle calls driven by `rt.block_on(...)`) so
565/// the tier check can operate on a fresh stack per dispatch.
566#[cfg(debug_assertions)]
567pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
568 tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
569}
570
571/// Release-build no-op mirror of [`with_tier_scope`].
572#[cfg(not(debug_assertions))]
573#[inline]
574pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
575 f.await
576}
577
578/// RAII guard — pushes a tier onto the current-thread stack on
579/// construction, pops on drop. Lets lifecycle prologues enforce
580/// tier ordering across `.await` points without nesting the rest of
581/// the body inside a `with_tier` closure. Debug builds carry the
582/// ordering check; release builds compile to a zero-sized no-op.
583///
584/// Field/declaration-order note: callers must declare the guard
585/// **before** the permit/hold it is scoping. Rust drops locals in
586/// reverse declaration order, so `_tier` declared first drops last —
587/// after the lock/permit releases — matching `with_tier` semantics.
588#[must_use]
589pub struct TierGuard {
590 #[cfg(debug_assertions)]
591 tier: Tier,
592 // Sized placeholder for release so the type is still `Sized` and
593 // `must_use`-meaningful. Zero-sized once inlined.
594 #[cfg(not(debug_assertions))]
595 _private: (),
596}
597
598impl TierGuard {
599 /// Push `tier` onto the current-thread tier stack. Debug builds
600 /// assert strictly-increasing order against the existing top.
601 #[cfg(debug_assertions)]
602 pub fn push(tier: Tier) -> Self {
603 tier::push(tier);
604 TierGuard { tier }
605 }
606
607 /// Release-build no-op constructor.
608 #[cfg(not(debug_assertions))]
609 #[inline]
610 pub fn push(_tier: Tier) -> Self {
611 TierGuard { _private: () }
612 }
613}
614
615#[cfg(debug_assertions)]
616impl Drop for TierGuard {
617 fn drop(&mut self) {
618 tier::pop_if_top(self.tier);
619 }
620}
621
622#[cfg(debug_assertions)]
623pub(crate) mod tier {
624 use super::Tier;
625 use std::cell::RefCell;
626
627 // feat-m6 CI fix — previously this used `thread_local!`, but under a
628 // tokio multi-thread runtime a task can resume on a different worker
629 // after `.await`. A push on worker A followed by a yield and a pop on
630 // worker B left A's stack polluted and tripped the tier-ordering
631 // assertion on the next acquire. Migrating to `tokio::task_local!`
632 // pins the stack to the *task*, not the worker, so nested
633 // `TierGuard` bookkeeping follows the task across workers.
634 //
635 // `try_with` silently no-ops outside a `TIER_STACK.scope(...)`
636 // frame — that makes the module safe to use from synchronous
637 // (non-tokio) test harnesses and the module's own unit tests at
638 // the cost of debug-only tier enforcement being disabled there.
639 // Production dispatch wraps every pack-type plugin call in a scope
640 // (see `sync::dispatch_*`), so real runs retain enforcement.
641 tokio::task_local! {
642 pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
643 }
644
645 pub fn push(next: Tier) {
646 let _ = TIER_STACK.try_with(|s| {
647 let mut s = s.borrow_mut();
648 if let Some(&top) = s.last() {
649 assert!(
650 next > top,
651 "lock tier violation: trying to acquire {next:?} while already holding {top:?} \
652 (tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
653 );
654 }
655 s.push(next);
656 });
657 }
658
659 pub fn pop_if_top(expected: Tier) {
660 let _ = TIER_STACK.try_with(|s| {
661 let mut s = s.borrow_mut();
662 if s.last().copied() == Some(expected) {
663 s.pop();
664 } else {
665 tracing::error!(
666 target: "grex::concurrency",
667 "tier pop mismatch: expected {:?} at top, stack = {:?}",
668 expected,
669 *s
670 );
671 }
672 });
673 }
674}
675
676// ---------------------------------------------------------------------------
677// Tests.
678// ---------------------------------------------------------------------------
679
680#[cfg(test)]
681#[allow(deprecated)] // exercising deprecated `PackLock::acquire` for back-compat
682mod tests {
683 use super::*;
684 use std::sync::{Arc, Barrier};
685 use std::thread;
686 use std::time::{Duration, Instant};
687 use tempfile::tempdir;
688 use tokio_util::sync::CancellationToken;
689
690 #[test]
691 fn pack_lock_acquires_creates_file() {
692 let dir = tempdir().unwrap();
693 let mut plock = PackLock::open(dir.path()).unwrap();
694 let expected = plock.path().to_path_buf();
695 let _guard = plock.acquire().unwrap();
696 assert!(expected.exists(), "open must create the sidecar file");
697 assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
698 }
699
700 #[test]
701 fn pack_lock_second_try_acquire_reports_busy_while_held() {
702 let dir = tempdir().unwrap();
703 let mut first = PackLock::open(dir.path()).unwrap();
704 let _held = first.acquire().unwrap();
705 let mut second = PackLock::open(dir.path()).unwrap();
706 let err = second.try_acquire().unwrap_err();
707 match err {
708 PackLockError::Busy { path } => {
709 assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
710 }
711 other => panic!("expected Busy, got {other:?}"),
712 }
713 }
714
715 #[test]
716 fn pack_lock_release_on_drop() {
717 let dir = tempdir().unwrap();
718 {
719 let mut first = PackLock::open(dir.path()).unwrap();
720 let _g = first.acquire().unwrap();
721 }
722 let mut second = PackLock::open(dir.path()).unwrap();
723 let _g = second.acquire().unwrap();
724 }
725
726 #[test]
727 fn pack_lock_path_contains_pack_path() {
728 let dir = tempdir().unwrap();
729 let plock = PackLock::open(dir.path()).unwrap();
730 let p = plock.path();
731 assert!(p.starts_with(dir.path()));
732 assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
733 }
734
735 #[test]
736 fn pack_lock_blocking_acquire_waits_for_holder() {
737 let dir = tempdir().unwrap();
738 let path = dir.path().to_path_buf();
739 let barrier = Arc::new(Barrier::new(2));
740 let holder_barrier = Arc::clone(&barrier);
741 let holder_path = path.clone();
742
743 let holder = thread::spawn(move || {
744 let mut lock = PackLock::open(&holder_path).unwrap();
745 let _g = lock.acquire().unwrap();
746 holder_barrier.wait();
747 thread::sleep(Duration::from_millis(100));
748 });
749
750 barrier.wait();
751 let start = Instant::now();
752 let mut second = PackLock::open(&path).unwrap();
753 let _g = second.acquire().unwrap();
754 let waited = start.elapsed();
755 holder.join().unwrap();
756 assert!(
757 waited >= Duration::from_millis(40),
758 "blocking acquire must have waited (observed {waited:?})"
759 );
760 }
761
762 #[test]
763 fn pack_lock_distinct_paths_do_not_contend() {
764 let a = tempdir().unwrap();
765 let b = tempdir().unwrap();
766 let mut la = PackLock::open(a.path()).unwrap();
767 let _ga = la.acquire().unwrap();
768 let mut lb = PackLock::open(b.path()).unwrap();
769 let _gb = lb.try_acquire().unwrap();
770 }
771
772 #[tokio::test]
773 async fn async_acquire_serialises_in_process() {
774 // Two concurrent acquire_async calls on the same pack path
775 // must serialise cleanly (no hang).
776 let dir = tempdir().unwrap();
777 let path = dir.path().to_path_buf();
778 let path_clone = path.clone();
779 let h1 = tokio::spawn(async move {
780 let lock = PackLock::open(&path).unwrap();
781 let _hold = lock.acquire_async().await.unwrap();
782 tokio::time::sleep(Duration::from_millis(30)).await;
783 });
784 let h2 = tokio::spawn(async move {
785 tokio::time::sleep(Duration::from_millis(5)).await;
786 let lock = PackLock::open(&path_clone).unwrap();
787 let _hold = lock.acquire_async().await.unwrap();
788 });
789 h1.await.unwrap();
790 h2.await.unwrap();
791 }
792
793 // --- tier ordering (debug-only) -----------------------------------------
794
795 // feat-m6 CI fix — tier enforcement now lives in a `tokio::task_local!`
796 // stack, so these tests drive the check through a scoped task to
797 // establish the frame. `try_with` outside a scope silently no-ops.
798
799 #[cfg(debug_assertions)]
800 #[tokio::test]
801 async fn tier_strictly_increasing_ok() {
802 tier::TIER_STACK
803 .scope(std::cell::RefCell::new(Vec::new()), async {
804 with_tier(Tier::Semaphore, || {
805 with_tier(Tier::PerPack, || {
806 with_tier(Tier::Backend, || {
807 with_tier(Tier::Manifest, || {});
808 });
809 });
810 });
811 })
812 .await;
813 }
814
815 #[cfg(debug_assertions)]
816 #[tokio::test]
817 async fn tier_reversed_panics_in_debug() {
818 use std::panic::{catch_unwind, AssertUnwindSafe};
819 let result = tier::TIER_STACK
820 .scope(std::cell::RefCell::new(Vec::new()), async {
821 catch_unwind(AssertUnwindSafe(|| {
822 with_tier(Tier::PerPack, || {
823 with_tier(Tier::Semaphore, || {});
824 });
825 }))
826 })
827 .await;
828 assert!(result.is_err(), "reversed tier order must panic in debug builds");
829 }
830
831 // --- acquire_cancellable (feat-m7-1 Stage 4) -----------------------------
832
833 /// 4.T1 — uncontended path returns Ok(PackLockHold).
834 #[tokio::test]
835 async fn acquire_cancellable_happy_path() {
836 let dir = tempdir().unwrap();
837 let lock = PackLock::open(dir.path()).unwrap();
838 let token = CancellationToken::new();
839 let result = lock.acquire_cancellable(&token).await;
840 assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
841 }
842
843 /// 4.T2 — task A holds the lock; task B's token fires after 10 ms;
844 /// B must surface `Err(Cancelled)` within 50 ms.
845 #[tokio::test]
846 async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
847 let dir = tempdir().unwrap();
848 let path = dir.path().to_path_buf();
849 let path_b = path.clone();
850
851 // Task A: acquire and hold for 500 ms (long enough to cover B's window).
852 let a_started = Arc::new(tokio::sync::Notify::new());
853 let a_started_clone = Arc::clone(&a_started);
854 let a = tokio::spawn(async move {
855 let lock = PackLock::open(&path).unwrap();
856 let _hold = lock.acquire_async().await.unwrap();
857 a_started_clone.notify_one();
858 tokio::time::sleep(Duration::from_millis(500)).await;
859 });
860 a_started.notified().await;
861
862 let token = CancellationToken::new();
863 let cancel_handle = token.clone();
864 let canceller = tokio::spawn(async move {
865 tokio::time::sleep(Duration::from_millis(10)).await;
866 cancel_handle.cancel();
867 });
868
869 let started = Instant::now();
870 let lock_b = PackLock::open(&path_b).unwrap();
871 let result =
872 tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
873 .await
874 .expect("acquire_cancellable must return within 50 ms after cancel");
875
876 let waited = started.elapsed();
877 assert!(
878 matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
879 "expected Err(Cancelled), got {result:?} after {waited:?}"
880 );
881
882 canceller.await.unwrap();
883 a.abort();
884 let _ = a.await;
885 }
886
887 // --- helpers for 4.T3 / 4.T4 -------------------------------------------
888
889 /// Spawn a "holder" task that acquires `path` and releases on
890 /// signal. Returns `(JoinHandle, started_notify, release_notify)`.
891 /// Caller awaits `started` to know the lock is held, then fires
892 /// `release` when ready to let the holder drop its guard.
893 fn spawn_holder(
894 path: PathBuf,
895 ) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
896 let started = Arc::new(tokio::sync::Notify::new());
897 let release = Arc::new(tokio::sync::Notify::new());
898 let started_c = Arc::clone(&started);
899 let release_c = Arc::clone(&release);
900 let h = tokio::spawn(async move {
901 let lock = PackLock::open(&path).unwrap();
902 let _hold = lock.acquire_async().await.unwrap();
903 started_c.notify_one();
904 release_c.notified().await;
905 });
906 (h, started, release)
907 }
908
909 /// Poll `acquire_async` against `path` until it succeeds or the
910 /// outer deadline expires. Returns `Ok(())` on success, `Err(())`
911 /// on timeout.
912 async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
913 tokio::time::timeout(deadline, async move {
914 loop {
915 let lock = PackLock::open(&path).unwrap();
916 if let Ok(Ok(_hold)) =
917 tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
918 {
919 return;
920 }
921 tokio::time::sleep(Duration::from_millis(20)).await;
922 }
923 })
924 .await
925 .map_err(|_| ())
926 }
927
928 /// 4.T3 — regression for the documented OS-thread leak window:
929 /// after B is cancelled, A releases its lock; the spawn_blocking
930 /// thread that B kicked off must eventually unblock and drop its
931 /// guard. We observe this by polling from a third task C — if B's
932 /// blocking thread leaked its guard, C would wait forever.
933 #[tokio::test]
934 async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
935 let dir = tempdir().unwrap();
936 let path = dir.path().to_path_buf();
937 let (a, a_started, release_a) = spawn_holder(path.clone());
938 a_started.notified().await;
939
940 // Task B: race against cancel while A holds the fd-lock.
941 let token = CancellationToken::new();
942 let cancel_handle = token.clone();
943 let path_b = path.clone();
944 let b = tokio::spawn(async move {
945 let lock = PackLock::open(&path_b).unwrap();
946 lock.acquire_cancellable(&token).await
947 });
948 tokio::time::sleep(Duration::from_millis(20)).await;
949 cancel_handle.cancel();
950 let b_result = tokio::time::timeout(Duration::from_millis(100), b)
951 .await
952 .expect("B must resolve quickly after cancel")
953 .expect("B task panicked");
954 assert!(
955 matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
956 "expected B to see Cancelled, got {b_result:?}"
957 );
958
959 // Release A — B's parked OS thread should drain.
960 release_a.notify_one();
961 a.await.unwrap();
962
963 assert!(
964 poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
965 "task C never acquired — spawn_blocking thread leaked its fd-lock guard"
966 );
967 }
968
969 /// 4.T4 — covers the outer-mutex cancel arm of the `select!` in
970 /// `acquire_cancellable`. Task A holds the in-process async mutex
971 /// (via `acquire_async`, which acquires both tiers); task B calls
972 /// `acquire_cancellable` and is parked on `lock_owned()` (NOT yet
973 /// at the fd-lock blocking call). Cancelling B's token must
974 /// short-circuit the mutex wait and return `Err(Cancelled)`.
975 #[tokio::test]
976 async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
977 let dir = tempdir().unwrap();
978 let path = dir.path().to_path_buf();
979 let (a, a_started, release_a) = spawn_holder(path.clone());
980 a_started.notified().await;
981
982 let token = CancellationToken::new();
983 let cancel_handle = token.clone();
984 let canceller = tokio::spawn(async move {
985 tokio::time::sleep(Duration::from_millis(10)).await;
986 cancel_handle.cancel();
987 });
988
989 let lock_b = PackLock::open(&path).unwrap();
990 let result =
991 tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
992 .await
993 .expect("acquire_cancellable must return within 50 ms after cancel");
994
995 assert!(
996 matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
997 "expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
998 );
999
1000 canceller.await.unwrap();
1001 release_a.notify_one();
1002 a.await.unwrap();
1003 }
1004}