grex_core/pack_lock.rs
1//! Per-pack `.grex-lock` file lock — feat-m6-2.
2//!
3//! Acquires an exclusive `fd-lock` guard on `<pack_path>/.grex-lock` for the
4//! full duration of a pack-type plugin lifecycle method. Prevents two
5//! concurrent tasks (in-process) or processes (cross-process) from operating
6//! on the same pack at the same time.
7//!
8//! ## Lock ordering (tier 3 of 5)
9//!
10//! The spec fixes the global acquire order as:
11//!
12//! 1. workspace-sync lock (`<workspace>/.grex.sync.lock`)
13//! 2. scheduler semaphore permit (feat-m6-1)
14//! 3. **per-pack `.grex-lock`** — this module
15//! 4. per-repo backend lock (`<dest>.grex-backend.lock`)
16//! 5. manifest RW lock (`.grex/events.jsonl` sidecar)
17//!
18//! Plugins acquire tier 2 (permit) and tier 3 (pack lock) in that order
19//! inside every `PackTypePlugin` method. In debug builds, [`with_tier`]
20//! enforces strictly-increasing tiers on a thread-local stack; a reversal
21//! panics in debug and logs `tracing::error!` in release.
22//!
23//! ## In-process vs cross-process serialisation
24//!
25//! `fd-lock`'s `write()` is synchronous and blocks the calling OS thread
26//! until the kernel flock is free. Calling it directly inside an async
27//! plugin method would block a tokio worker thread; with a multi-thread
28//! runtime this scales poorly, and recursive re-entry on the same pack
29//! (meta-plugin walking into a symlinked child that points back at the
30//! parent) hangs the task outright because the second `write()` waits on
31//! the first, which is still on-stack.
32//!
33//! To avoid both problems we layer a process-wide async mutex keyed by
34//! canonical pack path **in front of** the fd-lock acquire:
35//!
36//! * [`PackLock::acquire_async`] first takes the canonical-path mutex
37//! (`tokio::sync::Mutex`), which serialises in-process tasks without
38//! blocking workers and detects same-task re-entry as a
39//! [`PackLockError::Busy`] via `try_lock`.
40//! * Inside the async mutex it calls the blocking fd-lock `write()` —
41//! fast because the only remaining contention is cross-process, which
42//! is rare.
43//! * On `Drop` it releases the fd-lock guard first, then the async
44//! mutex guard — reverse acquire order.
45
46#![allow(unsafe_code)]
47
48use std::collections::HashMap;
49use std::fs::{File, OpenOptions};
50use std::io;
51use std::path::{Path, PathBuf};
52use std::sync::{Arc, Mutex, OnceLock};
53
54use fd_lock::{RwLock, RwLockWriteGuard};
55
56// feat-v1.2.5 A3 — pool deadlock guard (debug-only, cross-thread).
57//
58// `HELD_PACK_LOCKS` is a *process-global* set of `(canonical_pack_path,
59// thread_id)` tuples currently held. The set is consulted from rayon
60// worker threads inside `phase3_recurse` (walker.rs) at the entry of
61// every parallel-iter closure to assert the lock-acquisition order
62// documented in `.omne/cfg/concurrency.md` §"Five cooperating
63// mechanisms" — namely that a `PackLock` is never held by a thread
64// while that same thread enters a nested `pool.install` closure. The
65// v1.2.4 design used a `thread_local!` here, but rayon's work-stealing
66// scheduler routes the assertion onto worker threads whose TL would
67// always be empty (the lock was acquired on the *outer* tokio worker,
68// not on the rayon worker). The global tuple-set fixes that gap so
69// the assertion actually fires when the Coffman cycle is reproducible.
70// Release builds compile this out entirely.
71//
72// Pairs with `POOL_INSTALL_DEPTH` (per-OS-thread, owned by
73// `scheduler.rs`): each `PackLock` acquire asserts the *current
74// thread* is not inside a `pool.install` closure, so a worker thread
75// inside a `pool.install` cannot newly acquire a `PackLock` during the
76// closure body either. See feat-v1.2.5 design.md §A3.
77#[cfg(debug_assertions)]
78use std::collections::HashSet;
79#[cfg(debug_assertions)]
80use std::thread::ThreadId;
81
82// W3 (feat-v1.2.5) landed `crate::scheduler::POOL_INSTALL_DEPTH` as a
83// per-thread `RefCell<usize>` driven by `PoolInstallDepthGuard` around
84// every `pool.install` boundary in `phase3_recurse` (see scheduler.rs
85// §A3). We import it here so the deadlock assertion in
86// `register_held_lock` can verify, at every `PackLock` acquire, that
87// the current OS thread is NOT inside a rayon `pool.install` closure.
88#[cfg(debug_assertions)]
89use crate::scheduler::POOL_INSTALL_DEPTH;
90
91/// Process-global set of `(canonical_pack_path, owning_thread_id)`
92/// tuples currently held by `PackLock` acquires anywhere in the
93/// process. Inspected by rayon worker threads inside `phase3_recurse`
94/// (walker.rs) before each per-child closure body to assert the
95/// lock-acquisition order. See feat-v1.2.5 design.md §A3.
96///
97/// Storage shape: `OnceLock<Mutex<HashSet<...>>>` rather than
98/// `LazyLock<...>` because the workspace MSRV is 1.79 and `LazyLock`
99/// stabilised in 1.80. The behavior is identical — the inner
100/// `HashSet` is constructed on first `held_pack_locks_storage()`
101/// access via `OnceLock::get_or_init` and shared from then on.
102///
103/// Contract: a tuple is inserted in `register_held_lock` after the
104/// pool-install-depth-zero precondition is asserted, and removed in
105/// `unregister_held_lock` from `PackLock::Drop`. The thread-id half
106/// of the tuple identifies which thread is responsible for the
107/// outstanding hold so the assertion can target "current thread holds
108/// any pack lock" without false positives from other concurrent acquires.
109#[cfg(debug_assertions)]
110fn held_pack_locks_storage() -> &'static Mutex<HashSet<(PathBuf, ThreadId)>> {
111 static REG: OnceLock<Mutex<HashSet<(PathBuf, ThreadId)>>> = OnceLock::new();
112 REG.get_or_init(|| Mutex::new(HashSet::new()))
113}
114
115/// Returns true iff the current OS thread is NOT inside any rayon
116/// `pool.install` boundary (i.e. `POOL_INSTALL_DEPTH == 0`). Used by
117/// `register_held_lock` to enforce the design.md §A3 lock-acquisition
118/// order: a `PackLock` may never be acquired from inside a
119/// `pool.install` closure (otherwise the worker can deadlock against
120/// itself when a nested `pool.install` re-enters the same thread).
121#[cfg(debug_assertions)]
122#[inline]
123fn pool_install_depth_is_zero() -> bool {
124 POOL_INSTALL_DEPTH.with(|d| *d.borrow() == 0)
125}
126
127/// Register the canonical pack-lock path against the current OS
128/// thread in the global `HELD_PACK_LOCKS` set. Asserts the
129/// pool-install depth invariant before insertion. Debug-only.
130#[cfg(debug_assertions)]
131fn register_held_lock(path: &Path) {
132 assert!(
133 pool_install_depth_is_zero(),
134 "pool deadlock guard: attempted to acquire PackLock({}) while \
135 inside a `pool.install` boundary — this risks the rayon \
136 re-entrancy deadlock pinned by feat-v1.2.5 design.md §A3 \
137 (see .omne/cfg/concurrency.md §\"Five cooperating mechanisms\")",
138 path.display()
139 );
140 let tid = std::thread::current().id();
141 held_pack_locks_storage()
142 .lock()
143 .expect("HELD_PACK_LOCKS poisoned — prior panic in deadlock-guard bookkeeping")
144 .insert((path.to_path_buf(), tid));
145}
146
147/// Remove the canonical pack-lock path for the current OS thread from
148/// the global held-pack-locks registry. Called from `PackLock::Drop`.
149/// Debug-only. Idempotent — extra calls (e.g. from a `PackLock` that
150/// was opened but never acquired) are no-ops.
151#[cfg(debug_assertions)]
152fn unregister_held_lock(path: &Path) {
153 let tid = std::thread::current().id();
154 if let Ok(mut held) = held_pack_locks_storage().lock() {
155 held.remove(&(path.to_path_buf(), tid));
156 }
157}
158
159/// Debug-only inspector: snapshot of pack lock paths currently held by
160/// the *current* OS thread. Used by the per-`pool.install`-closure
161/// assertion in `phase3_recurse` (walker.rs §A3) and by the
162/// deadlock-guard tests. Available under `#[cfg(debug_assertions)]`
163/// (not just `#[cfg(test)]`) because the walker production code path
164/// itself reads it inside a `debug_assert!` block.
165#[cfg(debug_assertions)]
166pub(crate) fn held_pack_locks_for_test() -> Vec<PathBuf> {
167 let tid = std::thread::current().id();
168 held_pack_locks_storage()
169 .lock()
170 .expect("HELD_PACK_LOCKS poisoned — prior panic in deadlock-guard bookkeeping")
171 .iter()
172 .filter(|(_, t)| *t == tid)
173 .map(|(p, _)| p.clone())
174 .collect()
175}
176
177/// Test-only reset: clear the held-pack-locks set entries owned by the
178/// current OS thread. Used by tests that want to start from a clean
179/// slate without constructing/dropping real `PackLock` guards. Gated on
180/// `debug_assertions` because the underlying global is itself
181/// debug-only.
182#[cfg(all(test, debug_assertions))]
183#[allow(dead_code)]
184pub(crate) fn clear_held_pack_locks_for_test() {
185 let tid = std::thread::current().id();
186 if let Ok(mut held) = held_pack_locks_storage().lock() {
187 held.retain(|(_, t)| *t != tid);
188 }
189}
190
191/// Test-only injector: simulate having acquired a `PackLock` on the
192/// given path WITHOUT going through the real lock-file machinery (so
193/// the deadlock-guard tests in `walker.rs` can stage a "lock held"
194/// precondition without touching the filesystem). The current
195/// thread's id is recorded as the owner. Bypasses the
196/// `pool_install_depth_is_zero` assertion in `register_held_lock` —
197/// callers MUST run this BEFORE entering any `pool.install` boundary.
198/// Gated on `debug_assertions` because the underlying global is
199/// itself debug-only.
200#[cfg(all(test, debug_assertions))]
201pub(crate) fn register_pack_lock_for_test(path: &Path) {
202 let tid = std::thread::current().id();
203 held_pack_locks_storage()
204 .lock()
205 .expect("HELD_PACK_LOCKS poisoned — prior panic in deadlock-guard bookkeeping")
206 .insert((path.to_path_buf(), tid));
207}
208
209/// Test-only remover: undo a `register_pack_lock_for_test` injection
210/// on the current OS thread without going through the real
211/// `PackLock::Drop` path. Used by the F1 deadlock-guard test in
212/// `walker.rs` to keep the global held-pack-locks registry clean
213/// across cargo-test thread reuse so a retry on the same OS thread
214/// starts fresh. Gated on `debug_assertions` because the underlying
215/// global is itself debug-only.
216#[cfg(all(test, debug_assertions))]
217pub(crate) fn unregister_pack_lock_for_test(path: &Path) {
218 let tid = std::thread::current().id();
219 if let Ok(mut held) = held_pack_locks_storage().lock() {
220 held.remove(&(path.to_path_buf(), tid));
221 }
222}
223
224/// Stable name of the per-pack lock file created inside every pack root.
225/// Exported so the managed-gitignore writer can hide it from `git status`.
226pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
227
228/// Error surfaced by [`PackLock::open`], [`PackLock::acquire`], and
229/// [`PackLock::try_acquire`].
230///
231/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
232/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`].
233#[non_exhaustive]
234#[derive(Debug, thiserror::Error)]
235pub enum PackLockError {
236 /// I/O error opening or locking the sidecar file.
237 #[error("pack lock i/o on `{}`: {source}", path.display())]
238 Io {
239 /// Resolved `<pack_path>/.grex-lock` path.
240 path: PathBuf,
241 /// Underlying OS error.
242 #[source]
243 source: io::Error,
244 },
245 /// Non-blocking probe returned busy. The blocking path
246 /// ([`PackLock::acquire_async`]) never produces this for cross-pack
247 /// contention — it waits. Emitted by:
248 ///
249 /// * [`PackLock::try_acquire`] on any contention.
250 /// * [`PackLock::acquire_async`] on same-process re-entry (a plugin
251 /// that recurses back into the same pack root). Cross-task
252 /// contention blocks on the async mutex and never surfaces here.
253 #[error("pack lock `{}` is busy", path.display())]
254 Busy {
255 /// Lock path that was contended.
256 path: PathBuf,
257 },
258}
259
260/// Outcome of [`PackLock::acquire_cancellable`] — either the
261/// underlying lock acquire failed, or the supplied cancellation token
262/// fired before the guard was returned. Distinct from
263/// [`crate::scheduler::Cancelled`]: that ZST signals semaphore-permit
264/// cancellation; this variant signals pack-lock cancellation. Verb
265/// bodies translate either into the same `PluginError::Cancelled` at
266/// the call site (feat-m7-1 Stages 6-7).
267#[non_exhaustive]
268#[derive(Debug, thiserror::Error)]
269pub enum PackLockErrorOrCancelled {
270 /// The cancellation token fired before the lock was acquired.
271 /// The spawned blocking thread (if launched) may still be parked
272 /// in `fd_lock::write()` — see [`PackLock::acquire_cancellable`]
273 /// for the OS-thread leak-window contract.
274 #[error("pack lock acquire cancelled")]
275 Cancelled,
276 /// The underlying lock acquire failed before cancellation fired.
277 #[error(transparent)]
278 Lock(#[from] PackLockError),
279}
280
281use std::sync::Weak;
282
283fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
284 static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
285 REG.get_or_init(|| Mutex::new(HashMap::new()))
286}
287
288/// feat-m6 H3 — prune entries whose only remaining reference was the
289/// registry itself. Called opportunistically on every `mutex_for` so
290/// long-running processes that open many distinct pack paths do not
291/// accumulate unbounded registry entries. Runs under the registry
292/// mutex so there is no race against a concurrent `mutex_for`.
293fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
294 reg.retain(|_, weak| weak.strong_count() > 0);
295}
296
297fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
298 let mut reg = path_mutex_registry()
299 .lock()
300 .expect("pack lock path registry poisoned — this indicates a prior panic");
301 // Try to reuse an existing live entry. If the Weak is dead
302 // (no outstanding holders) fall through to insert a fresh Arc.
303 if let Some(weak) = reg.get(canonical) {
304 if let Some(existing) = weak.upgrade() {
305 return existing;
306 }
307 }
308 prune_dead(&mut reg);
309 let m = Arc::new(tokio::sync::Mutex::new(()));
310 reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
311 m
312}
313
314fn canonical_or_raw(path: &Path) -> PathBuf {
315 std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
316}
317
318/// Per-pack file lock wrapper.
319///
320/// Construction via [`PackLock::open`] creates (or re-opens) the sidecar
321/// `<pack_path>/.grex-lock` but does **not** acquire the lock — call
322/// [`PackLock::acquire_async`] for the async-safe blocking path or
323/// [`PackLock::try_acquire`] for a fail-fast probe.
324///
325/// Note: [`PackLock::acquire`] is deprecated since v1.2.4; prefer
326/// [`PackLock::acquire_async`] or [`PackLock::try_acquire`]. The sync
327/// blocking variant will be removed in v1.3.0.
328pub struct PackLock {
329 inner: RwLock<File>,
330 path: PathBuf,
331 canonical: PathBuf,
332}
333
334impl PackLock {
335 /// Open (and create if missing) the sidecar `<pack_path>/.grex-lock`.
336 /// Does **not** acquire the lock.
337 ///
338 /// # Errors
339 ///
340 /// Returns [`PackLockError::Io`] if the sidecar cannot be opened or
341 /// its parent directory cannot be created.
342 pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
343 let path = pack_path.join(PACK_LOCK_FILE_NAME);
344 if let Some(parent) = path.parent() {
345 std::fs::create_dir_all(parent)
346 .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
347 }
348 let file = OpenOptions::new()
349 .read(true)
350 .write(true)
351 .create(true)
352 .truncate(false)
353 .open(&path)
354 .map_err(|source| PackLockError::Io { path: path.clone(), source })?;
355 let canonical = canonical_or_raw(pack_path);
356 Ok(Self { inner: RwLock::new(file), path, canonical })
357 }
358
359 /// Async acquire — pairs a process-wide [`tokio::sync::Mutex`] keyed
360 /// by canonical pack path with the sidecar `fd-lock`. Safe to call
361 /// from any tokio worker without blocking the runtime. Same-thread
362 /// re-entry (nested synchronous call chain that re-enters the same
363 /// pack root, e.g. meta-plugin recursion through a `..` child that
364 /// points back at the parent) returns [`PackLockError::Busy`] rather
365 /// than deadlocking.
366 ///
367 /// The returned [`PackLockHold`] drops the fd-lock guard and the
368 /// async mutex guard in reverse acquire order at end-of-scope.
369 ///
370 /// # Errors
371 ///
372 /// * [`PackLockError::Busy`] on same-thread re-entry.
373 /// * [`PackLockError::Io`] on any OS-level lock failure.
374 pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
375 // Same-thread re-entry detection — see module-level note on
376 // tokio tasks and thread affinity. This covers the common
377 // case where a nested plugin call runs on the same worker
378 // thread between `.await` points (meta-plugin recursion).
379 // Different threads holding the same pack root's mutex will
380 // queue on `lock_owned().await` below instead.
381 // Serialise in-process tasks on the canonical path via an
382 // async mutex — safe across tokio workers and non-blocking on
383 // the async runtime. Same-task re-entry (recursive plugin
384 // invocation on the same pack root) is the caller's
385 // responsibility to prevent via cycle detection; a same-task
386 // re-entry here would hang at `lock_owned().await` because
387 // tokio mutexes are non-reentrant.
388 //
389 // [`crate::plugin::pack_type::MetaPlugin`] threads the
390 // `visited_meta` set through recursion and inserts the pack
391 // root at every lifecycle entry so cycles halt with
392 // [`crate::execute::ExecError::MetaCycle`] before this mutex
393 // acquire runs. Other pack-type plugins are leaf by design
394 // (declarative, scripted) and cannot re-enter.
395 let mtx = mutex_for(&self.canonical);
396 let mutex_guard = Arc::clone(&mtx).lock_owned().await;
397
398 // Box `self` so its address is stable for the transmuted
399 // `'static` guard lifetime. Take the fd-lock guard from the
400 // boxed lock's `inner`.
401 let boxed = Box::new(self);
402 // feat-m6 H1 — `fd_lock::RwLock::write` is a synchronous
403 // blocking call that waits on the kernel flock. Running it
404 // directly on a tokio worker would park that worker until
405 // the kernel released the lock, starving the runtime when
406 // the only remaining contention is cross-process. Hop onto
407 // the blocking-thread pool so async workers stay free. The
408 // acquire happens inside `spawn_blocking` and the guard is
409 // transmuted to `'static` before leaving the closure so the
410 // box + guard can be returned as a pair.
411 let join = tokio::task::spawn_blocking(
412 move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
413 let mut boxed = boxed;
414 // SAFETY: see outer comment block — `boxed` is moved
415 // into the returned pair and never freed while the
416 // guard is live; field order in `PackLockHold` makes
417 // the guard drop first. Transmuting here (inside the
418 // closure) lets us return both the box and the guard.
419 let guard_ref = boxed
420 .inner
421 .write()
422 .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
423 let guard_static: RwLockWriteGuard<'static, File> = unsafe {
424 std::mem::transmute::<
425 RwLockWriteGuard<'_, File>,
426 RwLockWriteGuard<'static, File>,
427 >(guard_ref)
428 };
429 Ok((boxed, guard_static))
430 },
431 )
432 .await;
433 let (boxed, guard_static) = match join {
434 Ok(res) => res?,
435 Err(join_err) => {
436 return Err(PackLockError::Io {
437 path: PathBuf::new(),
438 source: io::Error::other(join_err.to_string()),
439 });
440 }
441 };
442
443 // feat-v1.2.5 A3 — debug-only deadlock guard registration.
444 // Asserts no `pool.install` is on this OS thread's stack and
445 // records the canonical pack path in `HELD_PACK_LOCKS`. The
446 // matching `unregister_held_lock` runs in `PackLock::Drop`
447 // which fires from `PackLockHold::Drop` (the boxed `_lock`
448 // field is the last to drop after the fd-lock guard releases).
449 #[cfg(debug_assertions)]
450 register_held_lock(&boxed.canonical);
451
452 Ok(PackLockHold {
453 _fd_guard: Some(guard_static),
454 _mutex_guard: Some(mutex_guard),
455 _lock: boxed,
456 })
457 }
458
459 /// Cancellable async acquire — same semantics as
460 /// [`PackLock::acquire_async`] but races the acquire against a
461 /// [`tokio_util::sync::CancellationToken`]. Used by the embedded
462 /// MCP server (feat-m7-1) so a `notifications/cancelled` from the
463 /// client unblocks tool handlers that are parked on a contended
464 /// pack lock.
465 ///
466 /// **Consumes `self`** to mirror [`PackLock::acquire_async`] — the
467 /// same boxed-fd + transmute lifetime dance is needed to hand the
468 /// guard back across a `spawn_blocking` boundary, and reusing the
469 /// consumes-self contract preserves drop ordering against
470 /// [`PackLockHold`].
471 ///
472 /// ## OS-thread leak window — contract
473 ///
474 /// `fd_lock::write()` is a synchronous syscall that parks the
475 /// calling OS thread until the kernel releases the flock. Once the
476 /// blocking call has been launched on the
477 /// [`tokio::task::spawn_blocking`] pool, **the runtime cannot
478 /// interrupt it** — there is no portable way to unpark a thread
479 /// blocked in `flock(2)`. When the cancellation token fires we
480 /// resolve the outer `select!` with [`PackLockErrorOrCancelled::Cancelled`]
481 /// immediately, but the spawned OS thread stays parked until the
482 /// holder eventually releases. When that happens, the spawned
483 /// thread acquires the guard, the `JoinHandle` resolves to
484 /// `Ok((boxed, guard))`, and the tuple is dropped on the spot
485 /// (because the `select!` arm has already won) — at which point
486 /// the guard's `Drop` releases the kernel flock and a subsequent
487 /// acquirer can proceed.
488 ///
489 /// In other words: **cancellation is observable to the caller
490 /// instantly, but the underlying OS thread holds the lock briefly
491 /// past the cancel point, until the syscall returns**. Callers
492 /// that immediately re-attempt acquire on the same path may see
493 /// transient contention until that thread drains. See
494 /// `.omne/cfg/mcp.md` §Cancellation.
495 ///
496 /// # Errors
497 ///
498 /// * [`PackLockErrorOrCancelled::Cancelled`] — the token fired
499 /// before a guard was returned.
500 /// * [`PackLockErrorOrCancelled::Lock`] wrapping
501 /// [`PackLockError::Busy`] on same-thread re-entry, or
502 /// [`PackLockError::Io`] on any OS-level lock failure.
503 pub async fn acquire_cancellable(
504 self,
505 cancel: &::tokio_util::sync::CancellationToken,
506 ) -> Result<PackLockHold, PackLockErrorOrCancelled> {
507 // Mirror `acquire_async`: serialise on the canonical-path
508 // async mutex first. Race the mutex acquire itself against
509 // cancel — same-task re-entry would normally hang here, but
510 // the cancel arm gives the caller an out.
511 let mtx = mutex_for(&self.canonical);
512 let mutex_guard = tokio::select! {
513 biased;
514 () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
515 g = Arc::clone(&mtx).lock_owned() => g,
516 };
517
518 // Box `self` so the address is stable for the transmuted
519 // `'static` guard lifetime — same dance as `acquire_async`.
520 let boxed = Box::new(self);
521 // Capture the sidecar path before the move into the closure
522 // so the JoinError arm below can report it (the closure
523 // consumes `boxed`, so we cannot read it from there).
524 let join_err_path = boxed.path.clone();
525 // feat-m7-1 — replicates the `acquire_async` blocking-pool
526 // hop. The closure body is intentionally identical (do NOT
527 // refactor — see the SAFETY note in `acquire_async`).
528 let join = tokio::task::spawn_blocking(
529 move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
530 let mut boxed = boxed;
531 // SAFETY: see `acquire_async` — `boxed` is moved into
532 // the returned pair and never freed while the guard
533 // is live; field order in `PackLockHold` makes the
534 // guard drop first.
535 let guard_ref = boxed
536 .inner
537 .write()
538 .map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
539 let guard_static: RwLockWriteGuard<'static, File> = unsafe {
540 std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
541 guard_ref,
542 )
543 };
544 Ok((boxed, guard_static))
545 },
546 );
547
548 // Race the blocking acquire against the cancellation token.
549 // If cancel wins, the JoinHandle is dropped on the spot — the
550 // spawned OS thread stays parked in `fd_lock::write()` until
551 // the kernel releases, at which point the returned tuple is
552 // dropped (see contract note above) and the flock is freed.
553 let join = tokio::select! {
554 biased;
555 () = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
556 res = join => res,
557 };
558
559 let (boxed, guard_static) = match join {
560 Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
561 Err(join_err) => {
562 return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
563 path: join_err_path,
564 source: io::Error::other(join_err.to_string()),
565 }));
566 }
567 };
568
569 // feat-v1.2.5 A3 — debug-only deadlock guard registration
570 // (same contract as `acquire_async`; see comment there).
571 #[cfg(debug_assertions)]
572 register_held_lock(&boxed.canonical);
573
574 Ok(PackLockHold {
575 _fd_guard: Some(guard_static),
576 _mutex_guard: Some(mutex_guard),
577 _lock: boxed,
578 })
579 }
580
581 /// Thread-blocking acquire (no tokio integration). Waits on the
582 /// fd-lock synchronously. Suitable for synchronous call sites only
583 /// — async plugin methods MUST use [`PackLock::acquire_async`] to
584 /// avoid blocking a tokio worker.
585 ///
586 /// Returns a borrowed [`RwLockWriteGuard`]; the caller owns both
587 /// the outer [`PackLock`] and the guard in scope. Mirrors the
588 /// [`crate::fs::ScopedLock`] shape.
589 ///
590 /// # Errors
591 ///
592 /// Returns [`PackLockError::Io`] if the OS lock call fails.
593 ///
594 /// # Deprecation
595 ///
596 /// Implementation strategy: restored as the original direct
597 /// blocking `fd_lock::write()` call (single-line body, no
598 /// behaviour change vs v1.2.3) — simpler and safer than a
599 /// busy-wait shim, since `fd-lock` already parks the calling OS
600 /// thread on the kernel flock and a busy-wait would burn CPU
601 /// without engaging the kernel waiter queue.
602 #[deprecated(
603 since = "1.2.4",
604 note = "use `acquire_async` for async contexts or `try_acquire` for non-blocking; sync `acquire` will be removed in v1.3.0"
605 )]
606 pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
607 let guard = self
608 .inner
609 .write()
610 .map_err(|source| PackLockError::Io { path: self.path.clone(), source })?;
611 // feat-v1.2.5 A3 — debug-only deadlock guard registration.
612 // Borrowed-guard fns (`acquire`, `try_acquire`) cannot
613 // intercept their guard's `Drop` (it's an `fd_lock` type), so
614 // the unregister hook lives in `PackLock::Drop` below. The
615 // typical usage pattern (`let mut p = PackLock::open(); let
616 // _g = p.acquire();`) drops `_g` first (releasing the OS
617 // flock) then `p` (closing the file + unregistering on this
618 // OS thread), which keeps `HELD_PACK_LOCKS` accurate for the
619 // duration the OS-level lock is held.
620 #[cfg(debug_assertions)]
621 register_held_lock(&self.canonical);
622 Ok(guard)
623 }
624
625 /// Non-blocking probe: return [`PackLockError::Busy`] instead of
626 /// waiting when another holder has the lock. Does not engage the
627 /// async mutex — purely a fail-fast diagnostics hook.
628 ///
629 /// # Errors
630 ///
631 /// * [`PackLockError::Busy`] when a concurrent holder owns the lock.
632 /// * [`PackLockError::Io`] on any other OS-level lock failure.
633 pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
634 match self.inner.try_write() {
635 Ok(g) => {
636 // feat-v1.2.5 A3 — debug-only deadlock guard
637 // registration. Same borrowed-guard caveat as
638 // `acquire`: unregister fires from `PackLock::Drop`.
639 #[cfg(debug_assertions)]
640 register_held_lock(&self.canonical);
641 Ok(g)
642 }
643 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
644 Err(PackLockError::Busy { path: self.path.clone() })
645 }
646 Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
647 }
648 }
649
650 /// Sidecar path — `<pack_path>/.grex-lock`.
651 #[must_use]
652 pub fn path(&self) -> &Path {
653 &self.path
654 }
655}
656
657impl std::fmt::Debug for PackLock {
658 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
659 f.debug_struct("PackLock").field("path", &self.path).finish()
660 }
661}
662
663// feat-v1.2.5 A3 — debug-only deadlock guard unregistration. Fires
664// when a `PackLock` is dropped, whether directly (sync `acquire` /
665// `try_acquire` call sites that own the `PackLock` in scope) or
666// transitively from `PackLockHold::Drop` (the boxed `_lock` field
667// drops last, after the fd-lock guard releases). The unregister is
668// idempotent and safe even if the lock was never acquired (e.g. an
669// `open` that never called any `acquire*`).
670#[cfg(debug_assertions)]
671impl Drop for PackLock {
672 fn drop(&mut self) {
673 unregister_held_lock(&self.canonical);
674 }
675}
676
677/// RAII guard returned by [`PackLock::acquire_async`]. Holds the
678/// sidecar-file `fd-lock` guard plus the process-wide async mutex
679/// guard. Drops in reverse acquire order.
680#[repr(C)]
681pub struct PackLockHold {
682 // Field order is load-bearing: `_fd_guard` drops first (releasing
683 // the kernel flock), then `_mutex_guard` (releasing the async
684 // serialisation slot), then `_lock` (closing the file handle).
685 // `#[repr(C)]` pins source order to layout order so `offset_of!`
686 // assertions below stay meaningful.
687 _fd_guard: Option<RwLockWriteGuard<'static, File>>,
688 _mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
689 _lock: Box<PackLock>,
690}
691
692impl PackLockHold {
693 /// Sidecar path for diagnostics.
694 #[must_use]
695 pub fn path(&self) -> &Path {
696 self._lock.path()
697 }
698}
699
700impl std::fmt::Debug for PackLockHold {
701 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702 f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
703 }
704}
705
706// Field-order static assertion (feat-m6 B3) — the safety argument for the
707// unsafe lifetime extension in `acquire_async` depends on `_fd_guard`
708// dropping before `_lock`. Rust drops struct fields in declaration order,
709// so `_fd_guard` must sit at the lowest offset, then `_mutex_guard`, then
710// `_lock`. A refactor that reorders these fields would silently break the
711// Drop ordering and the transmuted `'static` borrow would outlive its box.
712const _: () = {
713 assert!(
714 std::mem::offset_of!(PackLockHold, _fd_guard)
715 < std::mem::offset_of!(PackLockHold, _mutex_guard),
716 "PackLockHold field order: _fd_guard must precede _mutex_guard"
717 );
718 assert!(
719 std::mem::offset_of!(PackLockHold, _mutex_guard)
720 < std::mem::offset_of!(PackLockHold, _lock),
721 "PackLockHold field order: _mutex_guard must precede _lock"
722 );
723};
724
725impl Drop for PackLockHold {
726 fn drop(&mut self) {
727 // Explicit take() on fd-lock guard first — the transmuted
728 // `'static` lifetime must expire before `_lock` drops.
729 self._fd_guard.take();
730 self._mutex_guard.take();
731 // `_lock` drops last when the struct itself drops, closing the
732 // underlying file handle.
733 }
734}
735
736// ---------------------------------------------------------------------------
737// Lock-ordering enforcement (debug builds).
738// ---------------------------------------------------------------------------
739
740/// Lock tier ordinals matching `.omne/cfg/concurrency.md`. Acquisitions
741/// must strictly increase; reversed order risks the deadlock class the
742/// feat-m6-3 Lean proof rules out.
743#[non_exhaustive]
744#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
745pub enum Tier {
746 /// Workspace sync lock — `<workspace>/.grex.sync.lock`.
747 WorkspaceSync = 0,
748 /// Scheduler semaphore permit — feat-m6-1.
749 Semaphore = 1,
750 /// Per-pack `.grex-lock` — feat-m6-2 (this module).
751 PerPack = 2,
752 /// Per-repo backend lock — `<dest>.grex-backend.lock`.
753 Backend = 3,
754 /// Manifest RW lock — `.grex/events.jsonl` sidecar.
755 Manifest = 4,
756}
757
758/// Run `f` while `tier` is pushed on the current thread's tier stack.
759/// Debug builds enforce strictly-increasing order across nested calls;
760/// release builds skip the check entirely.
761#[cfg(debug_assertions)]
762pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
763 tier::push(tier);
764 let out = f();
765 tier::pop_if_top(tier);
766 out
767}
768
769/// Release-build no-op mirror of [`with_tier`].
770#[cfg(not(debug_assertions))]
771#[inline]
772pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
773 f()
774}
775
776/// Wrap an async future in a task-local tier-stack scope so any
777/// [`TierGuard`] pushes inside it land in the correct frame even when
778/// the task migrates across tokio workers after `.await`. Release
779/// builds compile this down to the raw future — no scope, no cost.
780///
781/// Callers should wrap every top-level async dispatch (e.g. the
782/// per-pack plugin lifecycle calls driven by `rt.block_on(...)`) so
783/// the tier check can operate on a fresh stack per dispatch.
784#[cfg(debug_assertions)]
785pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
786 tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
787}
788
789/// Release-build no-op mirror of [`with_tier_scope`].
790#[cfg(not(debug_assertions))]
791#[inline]
792pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
793 f.await
794}
795
796/// RAII guard — pushes a tier onto the current-thread stack on
797/// construction, pops on drop. Lets lifecycle prologues enforce
798/// tier ordering across `.await` points without nesting the rest of
799/// the body inside a `with_tier` closure. Debug builds carry the
800/// ordering check; release builds compile to a zero-sized no-op.
801///
802/// Field/declaration-order note: callers must declare the guard
803/// **before** the permit/hold it is scoping. Rust drops locals in
804/// reverse declaration order, so `_tier` declared first drops last —
805/// after the lock/permit releases — matching `with_tier` semantics.
806#[must_use]
807pub struct TierGuard {
808 #[cfg(debug_assertions)]
809 tier: Tier,
810 // Sized placeholder for release so the type is still `Sized` and
811 // `must_use`-meaningful. Zero-sized once inlined.
812 #[cfg(not(debug_assertions))]
813 _private: (),
814}
815
816impl TierGuard {
817 /// Push `tier` onto the current-thread tier stack. Debug builds
818 /// assert strictly-increasing order against the existing top.
819 #[cfg(debug_assertions)]
820 pub fn push(tier: Tier) -> Self {
821 tier::push(tier);
822 TierGuard { tier }
823 }
824
825 /// Release-build no-op constructor.
826 #[cfg(not(debug_assertions))]
827 #[inline]
828 pub fn push(_tier: Tier) -> Self {
829 TierGuard { _private: () }
830 }
831}
832
833#[cfg(debug_assertions)]
834impl Drop for TierGuard {
835 fn drop(&mut self) {
836 tier::pop_if_top(self.tier);
837 }
838}
839
840#[cfg(debug_assertions)]
841pub(crate) mod tier {
842 use super::Tier;
843 use std::cell::RefCell;
844
845 // feat-m6 CI fix — previously this used `thread_local!`, but under a
846 // tokio multi-thread runtime a task can resume on a different worker
847 // after `.await`. A push on worker A followed by a yield and a pop on
848 // worker B left A's stack polluted and tripped the tier-ordering
849 // assertion on the next acquire. Migrating to `tokio::task_local!`
850 // pins the stack to the *task*, not the worker, so nested
851 // `TierGuard` bookkeeping follows the task across workers.
852 //
853 // `try_with` silently no-ops outside a `TIER_STACK.scope(...)`
854 // frame — that makes the module safe to use from synchronous
855 // (non-tokio) test harnesses and the module's own unit tests at
856 // the cost of debug-only tier enforcement being disabled there.
857 // Production dispatch wraps every pack-type plugin call in a scope
858 // (see `sync::dispatch_*`), so real runs retain enforcement.
859 tokio::task_local! {
860 pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
861 }
862
863 pub fn push(next: Tier) {
864 let _ = TIER_STACK.try_with(|s| {
865 let mut s = s.borrow_mut();
866 if let Some(&top) = s.last() {
867 assert!(
868 next > top,
869 "lock tier violation: trying to acquire {next:?} while already holding {top:?} \
870 (tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
871 );
872 }
873 s.push(next);
874 });
875 }
876
877 pub fn pop_if_top(expected: Tier) {
878 let _ = TIER_STACK.try_with(|s| {
879 let mut s = s.borrow_mut();
880 if s.last().copied() == Some(expected) {
881 s.pop();
882 } else {
883 tracing::error!(
884 target: "grex::concurrency",
885 "tier pop mismatch: expected {:?} at top, stack = {:?}",
886 expected,
887 *s
888 );
889 }
890 });
891 }
892}
893
894// ---------------------------------------------------------------------------
895// Tests.
896// ---------------------------------------------------------------------------
897
898#[cfg(test)]
899#[allow(deprecated)] // exercising deprecated `PackLock::acquire` for back-compat
900mod tests {
901 use super::*;
902 use std::sync::{Arc, Barrier};
903 use std::thread;
904 use std::time::{Duration, Instant};
905 use tempfile::tempdir;
906 use tokio_util::sync::CancellationToken;
907
908 #[test]
909 fn pack_lock_acquires_creates_file() {
910 let dir = tempdir().unwrap();
911 let mut plock = PackLock::open(dir.path()).unwrap();
912 let expected = plock.path().to_path_buf();
913 let _guard = plock.acquire().unwrap();
914 assert!(expected.exists(), "open must create the sidecar file");
915 assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
916 }
917
918 #[test]
919 fn pack_lock_second_try_acquire_reports_busy_while_held() {
920 let dir = tempdir().unwrap();
921 let mut first = PackLock::open(dir.path()).unwrap();
922 let _held = first.acquire().unwrap();
923 let mut second = PackLock::open(dir.path()).unwrap();
924 let err = second.try_acquire().unwrap_err();
925 match err {
926 PackLockError::Busy { path } => {
927 assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
928 }
929 other => panic!("expected Busy, got {other:?}"),
930 }
931 }
932
933 #[test]
934 fn pack_lock_release_on_drop() {
935 let dir = tempdir().unwrap();
936 {
937 let mut first = PackLock::open(dir.path()).unwrap();
938 let _g = first.acquire().unwrap();
939 }
940 let mut second = PackLock::open(dir.path()).unwrap();
941 let _g = second.acquire().unwrap();
942 }
943
944 #[test]
945 fn pack_lock_path_contains_pack_path() {
946 let dir = tempdir().unwrap();
947 let plock = PackLock::open(dir.path()).unwrap();
948 let p = plock.path();
949 assert!(p.starts_with(dir.path()));
950 assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
951 }
952
953 #[test]
954 fn pack_lock_blocking_acquire_waits_for_holder() {
955 let dir = tempdir().unwrap();
956 let path = dir.path().to_path_buf();
957 let barrier = Arc::new(Barrier::new(2));
958 let holder_barrier = Arc::clone(&barrier);
959 let holder_path = path.clone();
960
961 let holder = thread::spawn(move || {
962 let mut lock = PackLock::open(&holder_path).unwrap();
963 let _g = lock.acquire().unwrap();
964 holder_barrier.wait();
965 thread::sleep(Duration::from_millis(100));
966 });
967
968 barrier.wait();
969 let start = Instant::now();
970 let mut second = PackLock::open(&path).unwrap();
971 let _g = second.acquire().unwrap();
972 let waited = start.elapsed();
973 holder.join().unwrap();
974 assert!(
975 waited >= Duration::from_millis(40),
976 "blocking acquire must have waited (observed {waited:?})"
977 );
978 }
979
980 #[test]
981 fn pack_lock_distinct_paths_do_not_contend() {
982 let a = tempdir().unwrap();
983 let b = tempdir().unwrap();
984 let mut la = PackLock::open(a.path()).unwrap();
985 let _ga = la.acquire().unwrap();
986 let mut lb = PackLock::open(b.path()).unwrap();
987 let _gb = lb.try_acquire().unwrap();
988 }
989
990 #[tokio::test]
991 async fn async_acquire_serialises_in_process() {
992 // Two concurrent acquire_async calls on the same pack path
993 // must serialise cleanly (no hang).
994 let dir = tempdir().unwrap();
995 let path = dir.path().to_path_buf();
996 let path_clone = path.clone();
997 let h1 = tokio::spawn(async move {
998 let lock = PackLock::open(&path).unwrap();
999 let _hold = lock.acquire_async().await.unwrap();
1000 tokio::time::sleep(Duration::from_millis(30)).await;
1001 });
1002 let h2 = tokio::spawn(async move {
1003 tokio::time::sleep(Duration::from_millis(5)).await;
1004 let lock = PackLock::open(&path_clone).unwrap();
1005 let _hold = lock.acquire_async().await.unwrap();
1006 });
1007 h1.await.unwrap();
1008 h2.await.unwrap();
1009 }
1010
1011 // --- tier ordering (debug-only) -----------------------------------------
1012
1013 // feat-m6 CI fix — tier enforcement now lives in a `tokio::task_local!`
1014 // stack, so these tests drive the check through a scoped task to
1015 // establish the frame. `try_with` outside a scope silently no-ops.
1016
1017 #[cfg(debug_assertions)]
1018 #[tokio::test]
1019 async fn tier_strictly_increasing_ok() {
1020 tier::TIER_STACK
1021 .scope(std::cell::RefCell::new(Vec::new()), async {
1022 with_tier(Tier::Semaphore, || {
1023 with_tier(Tier::PerPack, || {
1024 with_tier(Tier::Backend, || {
1025 with_tier(Tier::Manifest, || {});
1026 });
1027 });
1028 });
1029 })
1030 .await;
1031 }
1032
1033 #[cfg(debug_assertions)]
1034 #[tokio::test]
1035 async fn tier_reversed_panics_in_debug() {
1036 use std::panic::{catch_unwind, AssertUnwindSafe};
1037 let result = tier::TIER_STACK
1038 .scope(std::cell::RefCell::new(Vec::new()), async {
1039 catch_unwind(AssertUnwindSafe(|| {
1040 with_tier(Tier::PerPack, || {
1041 with_tier(Tier::Semaphore, || {});
1042 });
1043 }))
1044 })
1045 .await;
1046 assert!(result.is_err(), "reversed tier order must panic in debug builds");
1047 }
1048
1049 // --- acquire_cancellable (feat-m7-1 Stage 4) -----------------------------
1050
1051 /// 4.T1 — uncontended path returns Ok(PackLockHold).
1052 #[tokio::test]
1053 async fn acquire_cancellable_happy_path() {
1054 let dir = tempdir().unwrap();
1055 let lock = PackLock::open(dir.path()).unwrap();
1056 let token = CancellationToken::new();
1057 let result = lock.acquire_cancellable(&token).await;
1058 assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
1059 }
1060
1061 /// 4.T2 — task A holds the lock; task B's token fires after 10 ms;
1062 /// B must surface `Err(Cancelled)` within 50 ms.
1063 #[tokio::test]
1064 async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
1065 let dir = tempdir().unwrap();
1066 let path = dir.path().to_path_buf();
1067 let path_b = path.clone();
1068
1069 // Task A: acquire and hold for 500 ms (long enough to cover B's window).
1070 let a_started = Arc::new(tokio::sync::Notify::new());
1071 let a_started_clone = Arc::clone(&a_started);
1072 let a = tokio::spawn(async move {
1073 let lock = PackLock::open(&path).unwrap();
1074 let _hold = lock.acquire_async().await.unwrap();
1075 a_started_clone.notify_one();
1076 tokio::time::sleep(Duration::from_millis(500)).await;
1077 });
1078 a_started.notified().await;
1079
1080 let token = CancellationToken::new();
1081 let cancel_handle = token.clone();
1082 let canceller = tokio::spawn(async move {
1083 tokio::time::sleep(Duration::from_millis(10)).await;
1084 cancel_handle.cancel();
1085 });
1086
1087 let started = Instant::now();
1088 let lock_b = PackLock::open(&path_b).unwrap();
1089 let result =
1090 tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
1091 .await
1092 .expect("acquire_cancellable must return within 50 ms after cancel");
1093
1094 let waited = started.elapsed();
1095 assert!(
1096 matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
1097 "expected Err(Cancelled), got {result:?} after {waited:?}"
1098 );
1099
1100 canceller.await.unwrap();
1101 a.abort();
1102 let _ = a.await;
1103 }
1104
1105 // --- helpers for 4.T3 / 4.T4 -------------------------------------------
1106
1107 /// Spawn a "holder" task that acquires `path` and releases on
1108 /// signal. Returns `(JoinHandle, started_notify, release_notify)`.
1109 /// Caller awaits `started` to know the lock is held, then fires
1110 /// `release` when ready to let the holder drop its guard.
1111 fn spawn_holder(
1112 path: PathBuf,
1113 ) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
1114 let started = Arc::new(tokio::sync::Notify::new());
1115 let release = Arc::new(tokio::sync::Notify::new());
1116 let started_c = Arc::clone(&started);
1117 let release_c = Arc::clone(&release);
1118 let h = tokio::spawn(async move {
1119 let lock = PackLock::open(&path).unwrap();
1120 let _hold = lock.acquire_async().await.unwrap();
1121 started_c.notify_one();
1122 release_c.notified().await;
1123 });
1124 (h, started, release)
1125 }
1126
1127 /// Poll `acquire_async` against `path` until it succeeds or the
1128 /// outer deadline expires. Returns `Ok(())` on success, `Err(())`
1129 /// on timeout.
1130 async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
1131 tokio::time::timeout(deadline, async move {
1132 loop {
1133 let lock = PackLock::open(&path).unwrap();
1134 if let Ok(Ok(_hold)) =
1135 tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
1136 {
1137 return;
1138 }
1139 tokio::time::sleep(Duration::from_millis(20)).await;
1140 }
1141 })
1142 .await
1143 .map_err(|_| ())
1144 }
1145
1146 /// 4.T3 — regression for the documented OS-thread leak window:
1147 /// after B is cancelled, A releases its lock; the spawn_blocking
1148 /// thread that B kicked off must eventually unblock and drop its
1149 /// guard. We observe this by polling from a third task C — if B's
1150 /// blocking thread leaked its guard, C would wait forever.
1151 #[tokio::test]
1152 async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
1153 let dir = tempdir().unwrap();
1154 let path = dir.path().to_path_buf();
1155 let (a, a_started, release_a) = spawn_holder(path.clone());
1156 a_started.notified().await;
1157
1158 // Task B: race against cancel while A holds the fd-lock.
1159 let token = CancellationToken::new();
1160 let cancel_handle = token.clone();
1161 let path_b = path.clone();
1162 let b = tokio::spawn(async move {
1163 let lock = PackLock::open(&path_b).unwrap();
1164 lock.acquire_cancellable(&token).await
1165 });
1166 tokio::time::sleep(Duration::from_millis(20)).await;
1167 cancel_handle.cancel();
1168 let b_result = tokio::time::timeout(Duration::from_millis(100), b)
1169 .await
1170 .expect("B must resolve quickly after cancel")
1171 .expect("B task panicked");
1172 assert!(
1173 matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
1174 "expected B to see Cancelled, got {b_result:?}"
1175 );
1176
1177 // Release A — B's parked OS thread should drain.
1178 release_a.notify_one();
1179 a.await.unwrap();
1180
1181 assert!(
1182 poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
1183 "task C never acquired — spawn_blocking thread leaked its fd-lock guard"
1184 );
1185 }
1186
1187 /// 4.T4 — covers the outer-mutex cancel arm of the `select!` in
1188 /// `acquire_cancellable`. Task A holds the in-process async mutex
1189 /// (via `acquire_async`, which acquires both tiers); task B calls
1190 /// `acquire_cancellable` and is parked on `lock_owned()` (NOT yet
1191 /// at the fd-lock blocking call). Cancelling B's token must
1192 /// short-circuit the mutex wait and return `Err(Cancelled)`.
1193 #[tokio::test]
1194 async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
1195 let dir = tempdir().unwrap();
1196 let path = dir.path().to_path_buf();
1197 let (a, a_started, release_a) = spawn_holder(path.clone());
1198 a_started.notified().await;
1199
1200 let token = CancellationToken::new();
1201 let cancel_handle = token.clone();
1202 let canceller = tokio::spawn(async move {
1203 tokio::time::sleep(Duration::from_millis(10)).await;
1204 cancel_handle.cancel();
1205 });
1206
1207 let lock_b = PackLock::open(&path).unwrap();
1208 let result =
1209 tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
1210 .await
1211 .expect("acquire_cancellable must return within 50 ms after cancel");
1212
1213 assert!(
1214 matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
1215 "expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
1216 );
1217
1218 canceller.await.unwrap();
1219 release_a.notify_one();
1220 a.await.unwrap();
1221 }
1222}