grex_core/pack_lock.rs
1//! Per-pack `.grex-lock` file lock — feat-m6-2.
2//!
3//! Acquires an exclusive `fd-lock` guard on `<pack_path>/.grex-lock` for the
4//! full duration of a pack-type plugin lifecycle method. Prevents two
5//! concurrent tasks (in-process) or processes (cross-process) from operating
6//! on the same pack at the same time.
7//!
8//! ## Lock ordering (tier 3 of 5)
9//!
10//! The spec fixes the global acquire order as:
11//!
12//! 1. workspace-sync lock (`<workspace>/.grex.sync.lock`)
13//! 2. scheduler semaphore permit (feat-m6-1)
14//! 3. **per-pack `.grex-lock`** — this module
15//! 4. per-repo backend lock (`<dest>.grex-backend.lock`)
16//! 5. manifest RW lock (`.grex/events.jsonl` sidecar)
17//!
18//! Plugins acquire tier 2 (permit) and tier 3 (pack lock) in that order
19//! inside every `PackTypePlugin` method. In debug builds, [`with_tier`]
20//! enforces strictly-increasing tiers on a thread-local stack; a reversal
21//! panics in debug and logs `tracing::error!` in release.
22//!
23//! ## In-process vs cross-process serialisation
24//!
25//! `fd-lock`'s `write()` is synchronous and blocks the calling OS thread
26//! until the kernel flock is free. Calling it directly inside an async
27//! plugin method would block a tokio worker thread; with a multi-thread
28//! runtime this scales poorly, and recursive re-entry on the same pack
29//! (meta-plugin walking into a symlinked child that points back at the
30//! parent) hangs the task outright because the second `write()` waits on
31//! the first, which is still on-stack.
32//!
33//! To avoid both problems we layer a process-wide async mutex keyed by
34//! canonical pack path **in front of** the fd-lock acquire:
35//!
36//! * [`PackLock::acquire_async`] first takes the canonical-path mutex
37//! (`tokio::sync::Mutex`), which serialises in-process tasks without
38//! blocking workers and detects same-task re-entry as a
39//! [`PackLockError::Busy`] via `try_lock`.
40//! * Inside the async mutex it calls the blocking fd-lock `write()` —
41//! fast because the only remaining contention is cross-process, which
42//! is rare.
43//! * On `Drop` it releases the fd-lock guard first, then the async
44//! mutex guard — reverse acquire order.
45
46#![allow(unsafe_code)]
47
48use std::collections::HashMap;
49use std::fs::{File, OpenOptions};
50use std::io;
51use std::path::{Path, PathBuf};
52use std::sync::{Arc, Mutex, OnceLock};
53
54use fd_lock::{RwLock, RwLockWriteGuard};
55
56/// Stable name of the per-pack lock file created inside every pack root.
57/// Exported so the managed-gitignore writer can hide it from `git status`.
58pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
59
60/// Error surfaced by [`PackLock::open`], [`PackLock::acquire`], and
61/// [`PackLock::try_acquire`].
62#[non_exhaustive]
63#[derive(Debug, thiserror::Error)]
64pub enum PackLockError {
65 /// I/O error opening or locking the sidecar file.
66 #[error("pack lock i/o on `{}`: {source}", path.display())]
67 Io {
68 /// Resolved `<pack_path>/.grex-lock` path.
69 path: PathBuf,
70 /// Underlying OS error.
71 #[source]
72 source: io::Error,
73 },
74 /// Non-blocking probe returned busy. The blocking path
75 /// ([`PackLock::acquire_async`]) never produces this for cross-pack
76 /// contention — it waits. Emitted by:
77 ///
78 /// * [`PackLock::try_acquire`] on any contention.
79 /// * [`PackLock::acquire_async`] on same-process re-entry (a plugin
80 /// that recurses back into the same pack root). Cross-task
81 /// contention blocks on the async mutex and never surfaces here.
82 #[error("pack lock `{}` is busy", path.display())]
83 Busy {
84 /// Lock path that was contended.
85 path: PathBuf,
86 },
87}
88
89/// Outcome of [`PackLock::acquire_cancellable`] — either the
90/// underlying lock acquire failed, or the supplied cancellation token
91/// fired before the guard was returned. Distinct from
92/// [`crate::scheduler::Cancelled`]: that ZST signals semaphore-permit
93/// cancellation; this variant signals pack-lock cancellation. Verb
94/// bodies translate either into the same `PluginError::Cancelled` at
95/// the call site (feat-m7-1 Stages 6-7).
96#[non_exhaustive]
97#[derive(Debug, thiserror::Error)]
98pub enum PackLockErrorOrCancelled {
99 /// The cancellation token fired before the lock was acquired.
100 /// The spawned blocking thread (if launched) may still be parked
101 /// in `fd_lock::write()` — see [`PackLock::acquire_cancellable`]
102 /// for the OS-thread leak-window contract.
103 #[error("pack lock acquire cancelled")]
104 Cancelled,
105 /// The underlying lock acquire failed before cancellation fired.
106 #[error(transparent)]
107 Lock(#[from] PackLockError),
108}
109
110use std::sync::Weak;
111
112fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
113 static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
114 REG.get_or_init(|| Mutex::new(HashMap::new()))
115}
116
117/// feat-m6 H3 — prune entries whose only remaining reference was the
118/// registry itself. Called opportunistically on every `mutex_for` so
119/// long-running processes that open many distinct pack paths do not
120/// accumulate unbounded registry entries. Runs under the registry
121/// mutex so there is no race against a concurrent `mutex_for`.
122fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
123 reg.retain(|_, weak| weak.strong_count() > 0);
124}
125
126fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
127 let mut reg = path_mutex_registry()
128 .lock()
129 .expect("pack lock path registry poisoned — this indicates a prior panic");
130 // Try to reuse an existing live entry. If the Weak is dead
131 // (no outstanding holders) fall through to insert a fresh Arc.
132 if let Some(weak) = reg.get(canonical) {
133 if let Some(existing) = weak.upgrade() {
134 return existing;
135 }
136 }
137 prune_dead(&mut reg);
138 let m = Arc::new(tokio::sync::Mutex::new(()));
139 reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
140 m
141}
142
143fn canonical_or_raw(path: &Path) -> PathBuf {
144 std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
145}
146
147/// Per-pack file lock wrapper.
148///
149/// Construction via [`PackLock::open`] creates (or re-opens) the sidecar
150/// `<pack_path>/.grex-lock` but does **not** acquire the lock — call
151/// [`PackLock::acquire_async`] for the async-safe blocking path,
152/// [`PackLock::acquire`] for the thread-blocking synchronous path, or
153/// [`PackLock::try_acquire`] for a fail-fast probe.
154pub struct PackLock {
155 inner: RwLock<File>,
156 path: PathBuf,
157 canonical: PathBuf,
158}
159
160impl PackLock {
161 /// Open (and create if missing) the sidecar `<pack_path>/.grex-lock`.
162 /// Does **not** acquire the lock.
163 ///
164 /// # Errors
165 ///
166 /// Returns [`PackLockError::Io`] if the sidecar cannot be opened or
167 /// its parent directory cannot be created.
168 pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
169 let path = pack_path.join(PACK_LOCK_FILE_NAME);
170 if let Some(parent) = path.parent() {
171 std::fs::create_dir_all(parent)
172 .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
173 }
174 let file = OpenOptions::new()
175 .read(true)
176 .write(true)
177 .create(true)
178 .truncate(false)
179 .open(&path)
180 .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
181 let canonical = canonical_or_raw(pack_path);
182 Ok(Self { inner: RwLock::new(file), path, canonical })
183 }
184
185 /// Async acquire — pairs a process-wide [`tokio::sync::Mutex`] keyed
186 /// by canonical pack path with the sidecar `fd-lock`. Safe to call
187 /// from any tokio worker without blocking the runtime. Same-thread
188 /// re-entry (nested synchronous call chain that re-enters the same
189 /// pack root, e.g. meta-plugin recursion through a `..` child that
190 /// points back at the parent) returns [`PackLockError::Busy`] rather
191 /// than deadlocking.
192 ///
193 /// The returned [`PackLockHold`] drops the fd-lock guard and the
194 /// async mutex guard in reverse acquire order at end-of-scope.
195 ///
196 /// # Errors
197 ///
198 /// * [`PackLockError::Busy`] on same-thread re-entry.
199 /// * [`PackLockError::Io`] on any OS-level lock failure.
200 pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
201 // Same-thread re-entry detection — see module-level note on
202 // tokio tasks and thread affinity. This covers the common
203 // case where a nested plugin call runs on the same worker
204 // thread between `.await` points (meta-plugin recursion).
205 // Different threads holding the same pack root's mutex will
206 // queue on `lock_owned().await` below instead.
207 // Serialise in-process tasks on the canonical path via an
208 // async mutex — safe across tokio workers and non-blocking on
209 // the async runtime. Same-task re-entry (recursive plugin
210 // invocation on the same pack root) is the caller's
211 // responsibility to prevent via cycle detection; a same-task
212 // re-entry here would hang at `lock_owned().await` because
213 // tokio mutexes are non-reentrant.
214 //
215 // [`crate::plugin::pack_type::MetaPlugin`] threads the
216 // `visited_meta` set through recursion and inserts the pack
217 // root at every lifecycle entry so cycles halt with
218 // [`crate::execute::ExecError::MetaCycle`] before this mutex
219 // acquire runs. Other pack-type plugins are leaf by design
220 // (declarative, scripted) and cannot re-enter.
221 let mtx = mutex_for(&self.canonical);
222 let mutex_guard = Arc::clone(&mtx).lock_owned().await;
223
224 // Box `self` so its address is stable for the transmuted
225 // `'static` guard lifetime. Take the fd-lock guard from the
226 // boxed lock's `inner`.
227 let boxed = Box::new(self);
228 // feat-m6 H1 — `fd_lock::RwLock::write` is a synchronous
229 // blocking call that waits on the kernel flock. Running it
230 // directly on a tokio worker would park that worker until
231 // the kernel released the lock, starving the runtime when
232 // the only remaining contention is cross-process. Hop onto
233 // the blocking-thread pool so async workers stay free. The
234 // acquire happens inside `spawn_blocking` and the guard is
235 // transmuted to `'static` before leaving the closure so the
236 // box + guard can be returned as a pair.
237 let join = tokio::task::spawn_blocking(
238 move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
239 let mut boxed = boxed;
240 // SAFETY: see outer comment block — `boxed` is moved
241 // into the returned pair and never freed while the
242 // guard is live; field order in `PackLockHold` makes
243 // the guard drop first. Transmuting here (inside the
244 // closure) lets us return both the box and the guard.
245 let guard_ref = boxed
246 .inner
247 .write()
248 .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
249 let guard_static: RwLockWriteGuard<'static, File> = unsafe {
250 std::mem::transmute::<
251 RwLockWriteGuard<'_, File>,
252 RwLockWriteGuard<'static, File>,
253 >(guard_ref)
254 };
255 Ok((boxed, guard_static))
256 },
257 )
258 .await;
259 let (boxed, guard_static) = match join {
260 Ok(res) => res?,
261 Err(join_err) => {
262 return Err(PackLockError::Io {
263 path: PathBuf::new(),
264 source: io::Error::other(join_err.to_string()),
265 });
266 }
267 };
268
269 Ok(PackLockHold {
270 _fd_guard: Some(guard_static),
271 _mutex_guard: Some(mutex_guard),
272 _lock: boxed,
273 })
274 }
275
276 /// Cancellable async acquire — same semantics as
277 /// [`PackLock::acquire_async`] but races the acquire against a
278 /// [`tokio_util::sync::CancellationToken`]. Used by the embedded
279 /// MCP server (feat-m7-1) so a `notifications/cancelled` from the
280 /// client unblocks tool handlers that are parked on a contended
281 /// pack lock.
282 ///
283 /// **Consumes `self`** to mirror [`PackLock::acquire_async`] — the
284 /// same boxed-fd + transmute lifetime dance is needed to hand the
285 /// guard back across a `spawn_blocking` boundary, and reusing the
286 /// consumes-self contract preserves drop ordering against
287 /// [`PackLockHold`].
288 ///
289 /// ## OS-thread leak window — contract
290 ///
291 /// `fd_lock::write()` is a synchronous syscall that parks the
292 /// calling OS thread until the kernel releases the flock. Once the
293 /// blocking call has been launched on the
294 /// [`tokio::task::spawn_blocking`] pool, **the runtime cannot
295 /// interrupt it** — there is no portable way to unpark a thread
296 /// blocked in `flock(2)`. When the cancellation token fires we
297 /// resolve the outer `select!` with [`PackLockErrorOrCancelled::Cancelled`]
298 /// immediately, but the spawned OS thread stays parked until the
299 /// holder eventually releases. When that happens, the spawned
300 /// thread acquires the guard, the `JoinHandle` resolves to
301 /// `Ok((boxed, guard))`, and the tuple is dropped on the spot
302 /// (because the `select!` arm has already won) — at which point
303 /// the guard's `Drop` releases the kernel flock and a subsequent
304 /// acquirer can proceed.
305 ///
306 /// In other words: **cancellation is observable to the caller
307 /// instantly, but the underlying OS thread holds the lock briefly
308 /// past the cancel point, until the syscall returns**. Callers
309 /// that immediately re-attempt acquire on the same path may see
310 /// transient contention until that thread drains. See
311 /// `.omne/cfg/mcp.md` §Cancellation.
312 ///
313 /// # Errors
314 ///
315 /// * [`PackLockErrorOrCancelled::Cancelled`] — the token fired
316 /// before a guard was returned.
317 /// * [`PackLockErrorOrCancelled::Lock`] wrapping
318 /// [`PackLockError::Busy`] on same-thread re-entry, or
319 /// [`PackLockError::Io`] on any OS-level lock failure.
320 pub async fn acquire_cancellable(
321 self,
322 cancel: &::tokio_util::sync::CancellationToken,
323 ) -> Result<PackLockHold, PackLockErrorOrCancelled> {
324 // Mirror `acquire_async`: serialise on the canonical-path
325 // async mutex first. Race the mutex acquire itself against
326 // cancel — same-task re-entry would normally hang here, but
327 // the cancel arm gives the caller an out.
328 let mtx = mutex_for(&self.canonical);
329 let mutex_guard = tokio::select! {
330 biased;
331 () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
332 g = Arc::clone(&mtx).lock_owned() => g,
333 };
334
335 // Box `self` so the address is stable for the transmuted
336 // `'static` guard lifetime — same dance as `acquire_async`.
337 let boxed = Box::new(self);
338 // Capture the sidecar path before the move into the closure
339 // so the JoinError arm below can report it (the closure
340 // consumes `boxed`, so we cannot read it from there).
341 let join_err_path = boxed.path.clone();
342 // feat-m7-1 — replicates the `acquire_async` blocking-pool
343 // hop. The closure body is intentionally identical (do NOT
344 // refactor — see the SAFETY note in `acquire_async`).
345 let join = tokio::task::spawn_blocking(
346 move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
347 let mut boxed = boxed;
348 // SAFETY: see `acquire_async` — `boxed` is moved into
349 // the returned pair and never freed while the guard
350 // is live; field order in `PackLockHold` makes the
351 // guard drop first.
352 let guard_ref = boxed
353 .inner
354 .write()
355 .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
356 let guard_static: RwLockWriteGuard<'static, File> = unsafe {
357 std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
358 guard_ref,
359 )
360 };
361 Ok((boxed, guard_static))
362 },
363 );
364
365 // Race the blocking acquire against the cancellation token.
366 // If cancel wins, the JoinHandle is dropped on the spot — the
367 // spawned OS thread stays parked in `fd_lock::write()` until
368 // the kernel releases, at which point the returned tuple is
369 // dropped (see contract note above) and the flock is freed.
370 let join = tokio::select! {
371 biased;
372 () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
373 res = join => res,
374 };
375
376 let (boxed, guard_static) = match join {
377 Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
378 Err(join_err) => {
379 return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
380 path: join_err_path,
381 source: io::Error::other(join_err.to_string()),
382 }));
383 }
384 };
385
386 Ok(PackLockHold {
387 _fd_guard: Some(guard_static),
388 _mutex_guard: Some(mutex_guard),
389 _lock: boxed,
390 })
391 }
392
393 /// Thread-blocking acquire (no tokio integration). Waits on the
394 /// fd-lock synchronously. Suitable for synchronous call sites only
395 /// — async plugin methods MUST use [`PackLock::acquire_async`] to
396 /// avoid blocking a tokio worker.
397 ///
398 /// Returns a borrowed [`RwLockWriteGuard`]; the caller owns both
399 /// the outer [`PackLock`] and the guard in scope. Mirrors the
400 /// [`crate::fs::ScopedLock`] shape.
401 ///
402 /// # Errors
403 ///
404 /// Returns [`PackLockError::Io`] if the OS lock call fails.
405 pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
406 self.inner.write().map_err(|source| PackLockError::Io { path: self.path.clone(), source })
407 }
408
409 /// Non-blocking probe: return [`PackLockError::Busy`] instead of
410 /// waiting when another holder has the lock. Does not engage the
411 /// async mutex — purely a fail-fast diagnostics hook.
412 ///
413 /// # Errors
414 ///
415 /// * [`PackLockError::Busy`] when a concurrent holder owns the lock.
416 /// * [`PackLockError::Io`] on any other OS-level lock failure.
417 pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
418 match self.inner.try_write() {
419 Ok(g) => Ok(g),
420 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
421 Err(PackLockError::Busy { path: self.path.clone() })
422 }
423 Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
424 }
425 }
426
427 /// Sidecar path — `<pack_path>/.grex-lock`.
428 #[must_use]
429 pub fn path(&self) -> &Path {
430 &self.path
431 }
432}
433
434impl std::fmt::Debug for PackLock {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 f.debug_struct("PackLock").field("path", &self.path).finish()
437 }
438}
439
440/// RAII guard returned by [`PackLock::acquire_async`]. Holds the
441/// sidecar-file `fd-lock` guard plus the process-wide async mutex
442/// guard. Drops in reverse acquire order.
443#[repr(C)]
444pub struct PackLockHold {
445 // Field order is load-bearing: `_fd_guard` drops first (releasing
446 // the kernel flock), then `_mutex_guard` (releasing the async
447 // serialisation slot), then `_lock` (closing the file handle).
448 // `#[repr(C)]` pins source order to layout order so `offset_of!`
449 // assertions below stay meaningful.
450 _fd_guard: Option<RwLockWriteGuard<'static, File>>,
451 _mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
452 _lock: Box<PackLock>,
453}
454
455impl PackLockHold {
456 /// Sidecar path for diagnostics.
457 #[must_use]
458 pub fn path(&self) -> &Path {
459 self._lock.path()
460 }
461}
462
463impl std::fmt::Debug for PackLockHold {
464 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465 f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
466 }
467}
468
469// Field-order static assertion (feat-m6 B3) — the safety argument for the
470// unsafe lifetime extension in `acquire_async` depends on `_fd_guard`
471// dropping before `_lock`. Rust drops struct fields in declaration order,
472// so `_fd_guard` must sit at the lowest offset, then `_mutex_guard`, then
473// `_lock`. A refactor that reorders these fields would silently break the
474// Drop ordering and the transmuted `'static` borrow would outlive its box.
475const _: () = {
476 assert!(
477 std::mem::offset_of!(PackLockHold, _fd_guard)
478 < std::mem::offset_of!(PackLockHold, _mutex_guard),
479 "PackLockHold field order: _fd_guard must precede _mutex_guard"
480 );
481 assert!(
482 std::mem::offset_of!(PackLockHold, _mutex_guard)
483 < std::mem::offset_of!(PackLockHold, _lock),
484 "PackLockHold field order: _mutex_guard must precede _lock"
485 );
486};
487
488impl Drop for PackLockHold {
489 fn drop(&mut self) {
490 // Explicit take() on fd-lock guard first — the transmuted
491 // `'static` lifetime must expire before `_lock` drops.
492 self._fd_guard.take();
493 self._mutex_guard.take();
494 // `_lock` drops last when the struct itself drops, closing the
495 // underlying file handle.
496 }
497}
498
499// ---------------------------------------------------------------------------
500// Lock-ordering enforcement (debug builds).
501// ---------------------------------------------------------------------------
502
503/// Lock tier ordinals matching `.omne/cfg/concurrency.md`. Acquisitions
504/// must strictly increase; reversed order risks the deadlock class the
505/// feat-m6-3 Lean proof rules out.
506#[non_exhaustive]
507#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
508pub enum Tier {
509 /// Workspace sync lock — `<workspace>/.grex.sync.lock`.
510 WorkspaceSync = 0,
511 /// Scheduler semaphore permit — feat-m6-1.
512 Semaphore = 1,
513 /// Per-pack `.grex-lock` — feat-m6-2 (this module).
514 PerPack = 2,
515 /// Per-repo backend lock — `<dest>.grex-backend.lock`.
516 Backend = 3,
517 /// Manifest RW lock — `.grex/events.jsonl` sidecar.
518 Manifest = 4,
519}
520
521/// Run `f` while `tier` is pushed on the current thread's tier stack.
522/// Debug builds enforce strictly-increasing order across nested calls;
523/// release builds skip the check entirely.
524#[cfg(debug_assertions)]
525pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
526 tier::push(tier);
527 let out = f();
528 tier::pop_if_top(tier);
529 out
530}
531
532/// Release-build no-op mirror of [`with_tier`].
533#[cfg(not(debug_assertions))]
534#[inline]
535pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
536 f()
537}
538
539/// Wrap an async future in a task-local tier-stack scope so any
540/// [`TierGuard`] pushes inside it land in the correct frame even when
541/// the task migrates across tokio workers after `.await`. Release
542/// builds compile this down to the raw future — no scope, no cost.
543///
544/// Callers should wrap every top-level async dispatch (e.g. the
545/// per-pack plugin lifecycle calls driven by `rt.block_on(...)`) so
546/// the tier check can operate on a fresh stack per dispatch.
547#[cfg(debug_assertions)]
548pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
549 tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
550}
551
552/// Release-build no-op mirror of [`with_tier_scope`].
553#[cfg(not(debug_assertions))]
554#[inline]
555pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
556 f.await
557}
558
559/// RAII guard — pushes a tier onto the current-thread stack on
560/// construction, pops on drop. Lets lifecycle prologues enforce
561/// tier ordering across `.await` points without nesting the rest of
562/// the body inside a `with_tier` closure. Debug builds carry the
563/// ordering check; release builds compile to a zero-sized no-op.
564///
565/// Field/declaration-order note: callers must declare the guard
566/// **before** the permit/hold it is scoping. Rust drops locals in
567/// reverse declaration order, so `_tier` declared first drops last —
568/// after the lock/permit releases — matching `with_tier` semantics.
569#[must_use]
570pub struct TierGuard {
571 #[cfg(debug_assertions)]
572 tier: Tier,
573 // Sized placeholder for release so the type is still `Sized` and
574 // `must_use`-meaningful. Zero-sized once inlined.
575 #[cfg(not(debug_assertions))]
576 _private: (),
577}
578
579impl TierGuard {
580 /// Push `tier` onto the current-thread tier stack. Debug builds
581 /// assert strictly-increasing order against the existing top.
582 #[cfg(debug_assertions)]
583 pub fn push(tier: Tier) -> Self {
584 tier::push(tier);
585 TierGuard { tier }
586 }
587
588 /// Release-build no-op constructor.
589 #[cfg(not(debug_assertions))]
590 #[inline]
591 pub fn push(_tier: Tier) -> Self {
592 TierGuard { _private: () }
593 }
594}
595
596#[cfg(debug_assertions)]
597impl Drop for TierGuard {
598 fn drop(&mut self) {
599 tier::pop_if_top(self.tier);
600 }
601}
602
603#[cfg(debug_assertions)]
604pub(crate) mod tier {
605 use super::Tier;
606 use std::cell::RefCell;
607
608 // feat-m6 CI fix — previously this used `thread_local!`, but under a
609 // tokio multi-thread runtime a task can resume on a different worker
610 // after `.await`. A push on worker A followed by a yield and a pop on
611 // worker B left A's stack polluted and tripped the tier-ordering
612 // assertion on the next acquire. Migrating to `tokio::task_local!`
613 // pins the stack to the *task*, not the worker, so nested
614 // `TierGuard` bookkeeping follows the task across workers.
615 //
616 // `try_with` silently no-ops outside a `TIER_STACK.scope(...)`
617 // frame — that makes the module safe to use from synchronous
618 // (non-tokio) test harnesses and the module's own unit tests at
619 // the cost of debug-only tier enforcement being disabled there.
620 // Production dispatch wraps every pack-type plugin call in a scope
621 // (see `sync::dispatch_*`), so real runs retain enforcement.
622 tokio::task_local! {
623 pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
624 }
625
626 pub fn push(next: Tier) {
627 let _ = TIER_STACK.try_with(|s| {
628 let mut s = s.borrow_mut();
629 if let Some(&top) = s.last() {
630 assert!(
631 next > top,
632 "lock tier violation: trying to acquire {next:?} while already holding {top:?} \
633 (tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
634 );
635 }
636 s.push(next);
637 });
638 }
639
640 pub fn pop_if_top(expected: Tier) {
641 let _ = TIER_STACK.try_with(|s| {
642 let mut s = s.borrow_mut();
643 if s.last().copied() == Some(expected) {
644 s.pop();
645 } else {
646 tracing::error!(
647 target: "grex::concurrency",
648 "tier pop mismatch: expected {:?} at top, stack = {:?}",
649 expected,
650 *s
651 );
652 }
653 });
654 }
655}
656
657// ---------------------------------------------------------------------------
658// Tests.
659// ---------------------------------------------------------------------------
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664 use std::sync::{Arc, Barrier};
665 use std::thread;
666 use std::time::{Duration, Instant};
667 use tempfile::tempdir;
668 use tokio_util::sync::CancellationToken;
669
670 #[test]
671 fn pack_lock_acquires_creates_file() {
672 let dir = tempdir().unwrap();
673 let mut plock = PackLock::open(dir.path()).unwrap();
674 let expected = plock.path().to_path_buf();
675 let _guard = plock.acquire().unwrap();
676 assert!(expected.exists(), "open must create the sidecar file");
677 assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
678 }
679
680 #[test]
681 fn pack_lock_second_try_acquire_reports_busy_while_held() {
682 let dir = tempdir().unwrap();
683 let mut first = PackLock::open(dir.path()).unwrap();
684 let _held = first.acquire().unwrap();
685 let mut second = PackLock::open(dir.path()).unwrap();
686 let err = second.try_acquire().unwrap_err();
687 match err {
688 PackLockError::Busy { path } => {
689 assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
690 }
691 other => panic!("expected Busy, got {other:?}"),
692 }
693 }
694
695 #[test]
696 fn pack_lock_release_on_drop() {
697 let dir = tempdir().unwrap();
698 {
699 let mut first = PackLock::open(dir.path()).unwrap();
700 let _g = first.acquire().unwrap();
701 }
702 let mut second = PackLock::open(dir.path()).unwrap();
703 let _g = second.acquire().unwrap();
704 }
705
706 #[test]
707 fn pack_lock_path_contains_pack_path() {
708 let dir = tempdir().unwrap();
709 let plock = PackLock::open(dir.path()).unwrap();
710 let p = plock.path();
711 assert!(p.starts_with(dir.path()));
712 assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
713 }
714
715 #[test]
716 fn pack_lock_blocking_acquire_waits_for_holder() {
717 let dir = tempdir().unwrap();
718 let path = dir.path().to_path_buf();
719 let barrier = Arc::new(Barrier::new(2));
720 let holder_barrier = Arc::clone(&barrier);
721 let holder_path = path.clone();
722
723 let holder = thread::spawn(move || {
724 let mut lock = PackLock::open(&holder_path).unwrap();
725 let _g = lock.acquire().unwrap();
726 holder_barrier.wait();
727 thread::sleep(Duration::from_millis(100));
728 });
729
730 barrier.wait();
731 let start = Instant::now();
732 let mut second = PackLock::open(&path).unwrap();
733 let _g = second.acquire().unwrap();
734 let waited = start.elapsed();
735 holder.join().unwrap();
736 assert!(
737 waited >= Duration::from_millis(40),
738 "blocking acquire must have waited (observed {waited:?})"
739 );
740 }
741
742 #[test]
743 fn pack_lock_distinct_paths_do_not_contend() {
744 let a = tempdir().unwrap();
745 let b = tempdir().unwrap();
746 let mut la = PackLock::open(a.path()).unwrap();
747 let _ga = la.acquire().unwrap();
748 let mut lb = PackLock::open(b.path()).unwrap();
749 let _gb = lb.try_acquire().unwrap();
750 }
751
752 #[tokio::test]
753 async fn async_acquire_serialises_in_process() {
754 // Two concurrent acquire_async calls on the same pack path
755 // must serialise cleanly (no hang).
756 let dir = tempdir().unwrap();
757 let path = dir.path().to_path_buf();
758 let path_clone = path.clone();
759 let h1 = tokio::spawn(async move {
760 let lock = PackLock::open(&path).unwrap();
761 let _hold = lock.acquire_async().await.unwrap();
762 tokio::time::sleep(Duration::from_millis(30)).await;
763 });
764 let h2 = tokio::spawn(async move {
765 tokio::time::sleep(Duration::from_millis(5)).await;
766 let lock = PackLock::open(&path_clone).unwrap();
767 let _hold = lock.acquire_async().await.unwrap();
768 });
769 h1.await.unwrap();
770 h2.await.unwrap();
771 }
772
773 // --- tier ordering (debug-only) -----------------------------------------
774
775 // feat-m6 CI fix — tier enforcement now lives in a `tokio::task_local!`
776 // stack, so these tests drive the check through a scoped task to
777 // establish the frame. `try_with` outside a scope silently no-ops.
778
779 #[cfg(debug_assertions)]
780 #[tokio::test]
781 async fn tier_strictly_increasing_ok() {
782 tier::TIER_STACK
783 .scope(std::cell::RefCell::new(Vec::new()), async {
784 with_tier(Tier::Semaphore, || {
785 with_tier(Tier::PerPack, || {
786 with_tier(Tier::Backend, || {
787 with_tier(Tier::Manifest, || {});
788 });
789 });
790 });
791 })
792 .await;
793 }
794
795 #[cfg(debug_assertions)]
796 #[tokio::test]
797 async fn tier_reversed_panics_in_debug() {
798 use std::panic::{catch_unwind, AssertUnwindSafe};
799 let result = tier::TIER_STACK
800 .scope(std::cell::RefCell::new(Vec::new()), async {
801 catch_unwind(AssertUnwindSafe(|| {
802 with_tier(Tier::PerPack, || {
803 with_tier(Tier::Semaphore, || {});
804 });
805 }))
806 })
807 .await;
808 assert!(result.is_err(), "reversed tier order must panic in debug builds");
809 }
810
811 // --- acquire_cancellable (feat-m7-1 Stage 4) -----------------------------
812
813 /// 4.T1 — uncontended path returns Ok(PackLockHold).
814 #[tokio::test]
815 async fn acquire_cancellable_happy_path() {
816 let dir = tempdir().unwrap();
817 let lock = PackLock::open(dir.path()).unwrap();
818 let token = CancellationToken::new();
819 let result = lock.acquire_cancellable(&token).await;
820 assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
821 }
822
823 /// 4.T2 — task A holds the lock; task B's token fires after 10 ms;
824 /// B must surface `Err(Cancelled)` within 50 ms.
825 #[tokio::test]
826 async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
827 let dir = tempdir().unwrap();
828 let path = dir.path().to_path_buf();
829 let path_b = path.clone();
830
831 // Task A: acquire and hold for 500 ms (long enough to cover B's window).
832 let a_started = Arc::new(tokio::sync::Notify::new());
833 let a_started_clone = Arc::clone(&a_started);
834 let a = tokio::spawn(async move {
835 let lock = PackLock::open(&path).unwrap();
836 let _hold = lock.acquire_async().await.unwrap();
837 a_started_clone.notify_one();
838 tokio::time::sleep(Duration::from_millis(500)).await;
839 });
840 a_started.notified().await;
841
842 let token = CancellationToken::new();
843 let cancel_handle = token.clone();
844 let canceller = tokio::spawn(async move {
845 tokio::time::sleep(Duration::from_millis(10)).await;
846 cancel_handle.cancel();
847 });
848
849 let started = Instant::now();
850 let lock_b = PackLock::open(&path_b).unwrap();
851 let result =
852 tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
853 .await
854 .expect("acquire_cancellable must return within 50 ms after cancel");
855
856 let waited = started.elapsed();
857 assert!(
858 matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
859 "expected Err(Cancelled), got {result:?} after {waited:?}"
860 );
861
862 canceller.await.unwrap();
863 a.abort();
864 let _ = a.await;
865 }
866
867 // --- helpers for 4.T3 / 4.T4 -------------------------------------------
868
869 /// Spawn a "holder" task that acquires `path` and releases on
870 /// signal. Returns `(JoinHandle, started_notify, release_notify)`.
871 /// Caller awaits `started` to know the lock is held, then fires
872 /// `release` when ready to let the holder drop its guard.
873 fn spawn_holder(
874 path: PathBuf,
875 ) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
876 let started = Arc::new(tokio::sync::Notify::new());
877 let release = Arc::new(tokio::sync::Notify::new());
878 let started_c = Arc::clone(&started);
879 let release_c = Arc::clone(&release);
880 let h = tokio::spawn(async move {
881 let lock = PackLock::open(&path).unwrap();
882 let _hold = lock.acquire_async().await.unwrap();
883 started_c.notify_one();
884 release_c.notified().await;
885 });
886 (h, started, release)
887 }
888
889 /// Poll `acquire_async` against `path` until it succeeds or the
890 /// outer deadline expires. Returns `Ok(())` on success, `Err(())`
891 /// on timeout.
892 async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
893 tokio::time::timeout(deadline, async move {
894 loop {
895 let lock = PackLock::open(&path).unwrap();
896 if let Ok(Ok(_hold)) =
897 tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
898 {
899 return;
900 }
901 tokio::time::sleep(Duration::from_millis(20)).await;
902 }
903 })
904 .await
905 .map_err(|_| ())
906 }
907
908 /// 4.T3 — regression for the documented OS-thread leak window:
909 /// after B is cancelled, A releases its lock; the spawn_blocking
910 /// thread that B kicked off must eventually unblock and drop its
911 /// guard. We observe this by polling from a third task C — if B's
912 /// blocking thread leaked its guard, C would wait forever.
913 #[tokio::test]
914 async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
915 let dir = tempdir().unwrap();
916 let path = dir.path().to_path_buf();
917 let (a, a_started, release_a) = spawn_holder(path.clone());
918 a_started.notified().await;
919
920 // Task B: race against cancel while A holds the fd-lock.
921 let token = CancellationToken::new();
922 let cancel_handle = token.clone();
923 let path_b = path.clone();
924 let b = tokio::spawn(async move {
925 let lock = PackLock::open(&path_b).unwrap();
926 lock.acquire_cancellable(&token).await
927 });
928 tokio::time::sleep(Duration::from_millis(20)).await;
929 cancel_handle.cancel();
930 let b_result = tokio::time::timeout(Duration::from_millis(100), b)
931 .await
932 .expect("B must resolve quickly after cancel")
933 .expect("B task panicked");
934 assert!(
935 matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
936 "expected B to see Cancelled, got {b_result:?}"
937 );
938
939 // Release A — B's parked OS thread should drain.
940 release_a.notify_one();
941 a.await.unwrap();
942
943 assert!(
944 poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
945 "task C never acquired — spawn_blocking thread leaked its fd-lock guard"
946 );
947 }
948
949 /// 4.T4 — covers the outer-mutex cancel arm of the `select!` in
950 /// `acquire_cancellable`. Task A holds the in-process async mutex
951 /// (via `acquire_async`, which acquires both tiers); task B calls
952 /// `acquire_cancellable` and is parked on `lock_owned()` (NOT yet
953 /// at the fd-lock blocking call). Cancelling B's token must
954 /// short-circuit the mutex wait and return `Err(Cancelled)`.
955 #[tokio::test]
956 async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
957 let dir = tempdir().unwrap();
958 let path = dir.path().to_path_buf();
959 let (a, a_started, release_a) = spawn_holder(path.clone());
960 a_started.notified().await;
961
962 let token = CancellationToken::new();
963 let cancel_handle = token.clone();
964 let canceller = tokio::spawn(async move {
965 tokio::time::sleep(Duration::from_millis(10)).await;
966 cancel_handle.cancel();
967 });
968
969 let lock_b = PackLock::open(&path).unwrap();
970 let result =
971 tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
972 .await
973 .expect("acquire_cancellable must return within 50 ms after cancel");
974
975 assert!(
976 matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
977 "expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
978 );
979
980 canceller.await.unwrap();
981 release_a.notify_one();
982 a.await.unwrap();
983 }
984}