#![allow(unsafe_code)]
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, OnceLock};
use fd_lock::{RwLock, RwLockWriteGuard};
pub const PACK_LOCK_FILE_NAME: &str = ".grex-lock";
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum PackLockError {
#[error("pack lock i/o on `{}`: {source}", path.display())]
Io {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("pack lock `{}` is busy", path.display())]
Busy {
path: PathBuf,
},
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum PackLockErrorOrCancelled {
#[error("pack lock acquire cancelled")]
Cancelled,
#[error(transparent)]
Lock(#[from] PackLockError),
}
use std::sync::Weak;
fn path_mutex_registry() -> &'static Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>> {
static REG: OnceLock<Mutex<HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>>> = OnceLock::new();
REG.get_or_init(|| Mutex::new(HashMap::new()))
}
fn prune_dead(reg: &mut HashMap<PathBuf, Weak<tokio::sync::Mutex<()>>>) {
reg.retain(|_, weak| weak.strong_count() > 0);
}
fn mutex_for(canonical: &Path) -> Arc<tokio::sync::Mutex<()>> {
let mut reg = path_mutex_registry()
.lock()
.expect("pack lock path registry poisoned — this indicates a prior panic");
if let Some(weak) = reg.get(canonical) {
if let Some(existing) = weak.upgrade() {
return existing;
}
}
prune_dead(&mut reg);
let m = Arc::new(tokio::sync::Mutex::new(()));
reg.insert(canonical.to_path_buf(), Arc::downgrade(&m));
m
}
fn canonical_or_raw(path: &Path) -> PathBuf {
std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
}
pub struct PackLock {
inner: RwLock<File>,
path: PathBuf,
canonical: PathBuf,
}
impl PackLock {
pub fn open(pack_path: &Path) -> Result<Self, PackLockError> {
let path = pack_path.join(PACK_LOCK_FILE_NAME);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|source| PackLockError::Io { path: path.clone(), source })?;
}
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)
.map_err(|source| PackLockError::Io { path: path.clone(), source })?;
let canonical = canonical_or_raw(pack_path);
Ok(Self { inner: RwLock::new(file), path, canonical })
}
pub async fn acquire_async(self) -> Result<PackLockHold, PackLockError> {
let mtx = mutex_for(&self.canonical);
let mutex_guard = Arc::clone(&mtx).lock_owned().await;
let boxed = Box::new(self);
let join = tokio::task::spawn_blocking(
move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
let mut boxed = boxed;
let guard_ref = boxed
.inner
.write()
.map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
let guard_static: RwLockWriteGuard<'static, File> = unsafe {
std::mem::transmute::<
RwLockWriteGuard<'_, File>,
RwLockWriteGuard<'static, File>,
>(guard_ref)
};
Ok((boxed, guard_static))
},
)
.await;
let (boxed, guard_static) = match join {
Ok(res) => res?,
Err(join_err) => {
return Err(PackLockError::Io {
path: PathBuf::new(),
source: io::Error::other(join_err.to_string()),
});
}
};
Ok(PackLockHold {
_fd_guard: Some(guard_static),
_mutex_guard: Some(mutex_guard),
_lock: boxed,
})
}
pub async fn acquire_cancellable(
self,
cancel: &::tokio_util::sync::CancellationToken,
) -> Result<PackLockHold, PackLockErrorOrCancelled> {
let mtx = mutex_for(&self.canonical);
let mutex_guard = tokio::select! {
biased;
() = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
g = Arc::clone(&mtx).lock_owned() => g,
};
let boxed = Box::new(self);
let join_err_path = boxed.path.clone();
let join = tokio::task::spawn_blocking(
move || -> Result<(Box<PackLock>, RwLockWriteGuard<'static, File>), PackLockError> {
let mut boxed = boxed;
let guard_ref = boxed
.inner
.write()
.map_err(|source| PackLockError::Io { path: boxed.path.clone(), source })?;
let guard_static: RwLockWriteGuard<'static, File> = unsafe {
std::mem::transmute::<RwLockWriteGuard<'_, File>, RwLockWriteGuard<'static, File>>(
guard_ref,
)
};
Ok((boxed, guard_static))
},
);
let join = tokio::select! {
biased;
() = cancel.cancelled() => return Err(PackLockErrorOrCancelled::Cancelled),
res = join => res,
};
let (boxed, guard_static) = match join {
Ok(res) => res.map_err(PackLockErrorOrCancelled::Lock)?,
Err(join_err) => {
return Err(PackLockErrorOrCancelled::Lock(PackLockError::Io {
path: join_err_path,
source: io::Error::other(join_err.to_string()),
}));
}
};
Ok(PackLockHold {
_fd_guard: Some(guard_static),
_mutex_guard: Some(mutex_guard),
_lock: boxed,
})
}
pub fn acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
self.inner.write().map_err(|source| PackLockError::Io { path: self.path.clone(), source })
}
pub fn try_acquire(&mut self) -> Result<RwLockWriteGuard<'_, File>, PackLockError> {
match self.inner.try_write() {
Ok(g) => Ok(g),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
Err(PackLockError::Busy { path: self.path.clone() })
}
Err(source) => Err(PackLockError::Io { path: self.path.clone(), source }),
}
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
}
impl std::fmt::Debug for PackLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PackLock").field("path", &self.path).finish()
}
}
#[repr(C)]
pub struct PackLockHold {
_fd_guard: Option<RwLockWriteGuard<'static, File>>,
_mutex_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
_lock: Box<PackLock>,
}
impl PackLockHold {
#[must_use]
pub fn path(&self) -> &Path {
self._lock.path()
}
}
impl std::fmt::Debug for PackLockHold {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PackLockHold").field("path", &self._lock.path()).finish()
}
}
const _: () = {
assert!(
std::mem::offset_of!(PackLockHold, _fd_guard)
< std::mem::offset_of!(PackLockHold, _mutex_guard),
"PackLockHold field order: _fd_guard must precede _mutex_guard"
);
assert!(
std::mem::offset_of!(PackLockHold, _mutex_guard)
< std::mem::offset_of!(PackLockHold, _lock),
"PackLockHold field order: _mutex_guard must precede _lock"
);
};
impl Drop for PackLockHold {
fn drop(&mut self) {
self._fd_guard.take();
self._mutex_guard.take();
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Tier {
WorkspaceSync = 0,
Semaphore = 1,
PerPack = 2,
Backend = 3,
Manifest = 4,
}
#[cfg(debug_assertions)]
pub fn with_tier<R>(tier: Tier, f: impl FnOnce() -> R) -> R {
tier::push(tier);
let out = f();
tier::pop_if_top(tier);
out
}
#[cfg(not(debug_assertions))]
#[inline]
pub fn with_tier<R>(_tier: Tier, f: impl FnOnce() -> R) -> R {
f()
}
#[cfg(debug_assertions)]
pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
tier::TIER_STACK.scope(std::cell::RefCell::new(Vec::new()), f).await
}
#[cfg(not(debug_assertions))]
#[inline]
pub async fn with_tier_scope<F: std::future::Future>(f: F) -> F::Output {
f.await
}
#[must_use]
pub struct TierGuard {
#[cfg(debug_assertions)]
tier: Tier,
#[cfg(not(debug_assertions))]
_private: (),
}
impl TierGuard {
#[cfg(debug_assertions)]
pub fn push(tier: Tier) -> Self {
tier::push(tier);
TierGuard { tier }
}
#[cfg(not(debug_assertions))]
#[inline]
pub fn push(_tier: Tier) -> Self {
TierGuard { _private: () }
}
}
#[cfg(debug_assertions)]
impl Drop for TierGuard {
fn drop(&mut self) {
tier::pop_if_top(self.tier);
}
}
#[cfg(debug_assertions)]
pub(crate) mod tier {
use super::Tier;
use std::cell::RefCell;
tokio::task_local! {
pub(crate) static TIER_STACK: RefCell<Vec<Tier>>;
}
pub fn push(next: Tier) {
let _ = TIER_STACK.try_with(|s| {
let mut s = s.borrow_mut();
if let Some(&top) = s.last() {
assert!(
next > top,
"lock tier violation: trying to acquire {next:?} while already holding {top:?} \
(tiers must be strictly increasing — see .omne/cfg/concurrency.md)"
);
}
s.push(next);
});
}
pub fn pop_if_top(expected: Tier) {
let _ = TIER_STACK.try_with(|s| {
let mut s = s.borrow_mut();
if s.last().copied() == Some(expected) {
s.pop();
} else {
tracing::error!(
target: "grex::concurrency",
"tier pop mismatch: expected {:?} at top, stack = {:?}",
expected,
*s
);
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
use tempfile::tempdir;
use tokio_util::sync::CancellationToken;
#[test]
fn pack_lock_acquires_creates_file() {
let dir = tempdir().unwrap();
let mut plock = PackLock::open(dir.path()).unwrap();
let expected = plock.path().to_path_buf();
let _guard = plock.acquire().unwrap();
assert!(expected.exists(), "open must create the sidecar file");
assert_eq!(expected, dir.path().join(PACK_LOCK_FILE_NAME));
}
#[test]
fn pack_lock_second_try_acquire_reports_busy_while_held() {
let dir = tempdir().unwrap();
let mut first = PackLock::open(dir.path()).unwrap();
let _held = first.acquire().unwrap();
let mut second = PackLock::open(dir.path()).unwrap();
let err = second.try_acquire().unwrap_err();
match err {
PackLockError::Busy { path } => {
assert_eq!(path, dir.path().join(PACK_LOCK_FILE_NAME));
}
other => panic!("expected Busy, got {other:?}"),
}
}
#[test]
fn pack_lock_release_on_drop() {
let dir = tempdir().unwrap();
{
let mut first = PackLock::open(dir.path()).unwrap();
let _g = first.acquire().unwrap();
}
let mut second = PackLock::open(dir.path()).unwrap();
let _g = second.acquire().unwrap();
}
#[test]
fn pack_lock_path_contains_pack_path() {
let dir = tempdir().unwrap();
let plock = PackLock::open(dir.path()).unwrap();
let p = plock.path();
assert!(p.starts_with(dir.path()));
assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(PACK_LOCK_FILE_NAME));
}
#[test]
fn pack_lock_blocking_acquire_waits_for_holder() {
let dir = tempdir().unwrap();
let path = dir.path().to_path_buf();
let barrier = Arc::new(Barrier::new(2));
let holder_barrier = Arc::clone(&barrier);
let holder_path = path.clone();
let holder = thread::spawn(move || {
let mut lock = PackLock::open(&holder_path).unwrap();
let _g = lock.acquire().unwrap();
holder_barrier.wait();
thread::sleep(Duration::from_millis(100));
});
barrier.wait();
let start = Instant::now();
let mut second = PackLock::open(&path).unwrap();
let _g = second.acquire().unwrap();
let waited = start.elapsed();
holder.join().unwrap();
assert!(
waited >= Duration::from_millis(40),
"blocking acquire must have waited (observed {waited:?})"
);
}
#[test]
fn pack_lock_distinct_paths_do_not_contend() {
let a = tempdir().unwrap();
let b = tempdir().unwrap();
let mut la = PackLock::open(a.path()).unwrap();
let _ga = la.acquire().unwrap();
let mut lb = PackLock::open(b.path()).unwrap();
let _gb = lb.try_acquire().unwrap();
}
#[tokio::test]
async fn async_acquire_serialises_in_process() {
let dir = tempdir().unwrap();
let path = dir.path().to_path_buf();
let path_clone = path.clone();
let h1 = tokio::spawn(async move {
let lock = PackLock::open(&path).unwrap();
let _hold = lock.acquire_async().await.unwrap();
tokio::time::sleep(Duration::from_millis(30)).await;
});
let h2 = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(5)).await;
let lock = PackLock::open(&path_clone).unwrap();
let _hold = lock.acquire_async().await.unwrap();
});
h1.await.unwrap();
h2.await.unwrap();
}
#[cfg(debug_assertions)]
#[tokio::test]
async fn tier_strictly_increasing_ok() {
tier::TIER_STACK
.scope(std::cell::RefCell::new(Vec::new()), async {
with_tier(Tier::Semaphore, || {
with_tier(Tier::PerPack, || {
with_tier(Tier::Backend, || {
with_tier(Tier::Manifest, || {});
});
});
});
})
.await;
}
#[cfg(debug_assertions)]
#[tokio::test]
async fn tier_reversed_panics_in_debug() {
use std::panic::{catch_unwind, AssertUnwindSafe};
let result = tier::TIER_STACK
.scope(std::cell::RefCell::new(Vec::new()), async {
catch_unwind(AssertUnwindSafe(|| {
with_tier(Tier::PerPack, || {
with_tier(Tier::Semaphore, || {});
});
}))
})
.await;
assert!(result.is_err(), "reversed tier order must panic in debug builds");
}
#[tokio::test]
async fn acquire_cancellable_happy_path() {
let dir = tempdir().unwrap();
let lock = PackLock::open(dir.path()).unwrap();
let token = CancellationToken::new();
let result = lock.acquire_cancellable(&token).await;
assert!(result.is_ok(), "expected Ok(PackLockHold) on uncontended pack");
}
#[tokio::test]
async fn acquire_cancellable_cancel_during_blocking_fd_lock_returns_cancelled() {
let dir = tempdir().unwrap();
let path = dir.path().to_path_buf();
let path_b = path.clone();
let a_started = Arc::new(tokio::sync::Notify::new());
let a_started_clone = Arc::clone(&a_started);
let a = tokio::spawn(async move {
let lock = PackLock::open(&path).unwrap();
let _hold = lock.acquire_async().await.unwrap();
a_started_clone.notify_one();
tokio::time::sleep(Duration::from_millis(500)).await;
});
a_started.notified().await;
let token = CancellationToken::new();
let cancel_handle = token.clone();
let canceller = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
cancel_handle.cancel();
});
let started = Instant::now();
let lock_b = PackLock::open(&path_b).unwrap();
let result =
tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
.await
.expect("acquire_cancellable must return within 50 ms after cancel");
let waited = started.elapsed();
assert!(
matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
"expected Err(Cancelled), got {result:?} after {waited:?}"
);
canceller.await.unwrap();
a.abort();
let _ = a.await;
}
fn spawn_holder(
path: PathBuf,
) -> (tokio::task::JoinHandle<()>, Arc<tokio::sync::Notify>, Arc<tokio::sync::Notify>) {
let started = Arc::new(tokio::sync::Notify::new());
let release = Arc::new(tokio::sync::Notify::new());
let started_c = Arc::clone(&started);
let release_c = Arc::clone(&release);
let h = tokio::spawn(async move {
let lock = PackLock::open(&path).unwrap();
let _hold = lock.acquire_async().await.unwrap();
started_c.notify_one();
release_c.notified().await;
});
(h, started, release)
}
async fn poll_acquire_until_free(path: PathBuf, deadline: Duration) -> Result<(), ()> {
tokio::time::timeout(deadline, async move {
loop {
let lock = PackLock::open(&path).unwrap();
if let Ok(Ok(_hold)) =
tokio::time::timeout(Duration::from_millis(100), lock.acquire_async()).await
{
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
})
.await
.map_err(|_| ())
}
#[tokio::test]
async fn acquire_cancellable_spawn_blocking_thread_releases_guard_when_it_finally_unblocks() {
let dir = tempdir().unwrap();
let path = dir.path().to_path_buf();
let (a, a_started, release_a) = spawn_holder(path.clone());
a_started.notified().await;
let token = CancellationToken::new();
let cancel_handle = token.clone();
let path_b = path.clone();
let b = tokio::spawn(async move {
let lock = PackLock::open(&path_b).unwrap();
lock.acquire_cancellable(&token).await
});
tokio::time::sleep(Duration::from_millis(20)).await;
cancel_handle.cancel();
let b_result = tokio::time::timeout(Duration::from_millis(100), b)
.await
.expect("B must resolve quickly after cancel")
.expect("B task panicked");
assert!(
matches!(b_result, Err(PackLockErrorOrCancelled::Cancelled)),
"expected B to see Cancelled, got {b_result:?}"
);
release_a.notify_one();
a.await.unwrap();
assert!(
poll_acquire_until_free(path, Duration::from_millis(2_000)).await.is_ok(),
"task C never acquired — spawn_blocking thread leaked its fd-lock guard"
);
}
#[tokio::test]
async fn acquire_cancellable_cancel_during_async_mutex_wait_returns_cancelled() {
let dir = tempdir().unwrap();
let path = dir.path().to_path_buf();
let (a, a_started, release_a) = spawn_holder(path.clone());
a_started.notified().await;
let token = CancellationToken::new();
let cancel_handle = token.clone();
let canceller = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
cancel_handle.cancel();
});
let lock_b = PackLock::open(&path).unwrap();
let result =
tokio::time::timeout(Duration::from_millis(50), lock_b.acquire_cancellable(&token))
.await
.expect("acquire_cancellable must return within 50 ms after cancel");
assert!(
matches!(result, Err(PackLockErrorOrCancelled::Cancelled)),
"expected Err(Cancelled) from outer-mutex cancel arm, got {result:?}"
);
canceller.await.unwrap();
release_a.notify_one();
a.await.unwrap();
}
}