mod key;
mod internal_memory;
pub mod memory;
#[cfg(target_arch = "wasm32")]
pub mod web_locks;
#[cfg(all(any(unix, windows), not(target_arch = "wasm32")))]
pub mod pid_flock;
#[cfg(all(any(unix), not(target_arch = "wasm32")))]
pub mod pid_fcntl;
use std::time::Duration;
use std::path::PathBuf;
use anyhow::bail;
const POLL_INTERVAL: Duration = Duration::from_millis(50);
#[derive(thiserror::Error, Debug)]
pub enum PidLockError {
#[error("another process is already using datadir {datadir}{}",
match pid {
Some(p) => format!(" (holder PID: {})", p),
None => String::new(),
})]
AlreadyHeld {
datadir: PathBuf,
pid: Option<u32>,
},
#[error("failed to set up datadir {datadir}")]
SetupFailed {
datadir: PathBuf,
#[source]
source: anyhow::Error,
},
}
pub trait LockGuard: Send + Sync + std::fmt::Debug {}
#[async_trait::async_trait]
pub trait LockManager: Send + Sync + std::fmt::Debug {
async fn try_lock(&self, key: &str) -> Option<Box<dyn LockGuard>>;
async fn lock(&self, key: &str, timeout: Duration)
-> anyhow::Result<Box<dyn LockGuard>>
{
let result = tokio::time::timeout(timeout, async {
loop {
if let Some(g) = self.try_lock(key).await {
return g;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}).await;
match result {
Ok(g) => Ok(g),
Err(_) => bail!("timed out acquiring lock {:?} after {:?}", key, timeout),
}
}
}
pub fn platform_default(datadir: impl Into<PathBuf>) -> anyhow::Result<Box<dyn LockManager>> {
#[cfg(target_arch = "wasm32")]
{
let _ = datadir;
return Ok(Box::new(web_locks::WebLockManager::new()));
}
#[cfg(all(unix, not(target_arch = "wasm32")))]
{
return Ok(Box::new(pid_fcntl::FcntlPidLockManager::new(datadir)?));
}
#[cfg(all(windows, not(target_arch = "wasm32")))]
{
return Ok(Box::new(pid_flock::FlockPidLockManager::new(datadir)?));
}
#[cfg(not(any(target_arch = "wasm32", unix, windows)))]
panic!("lock_manager::platform_default: no default backend for this target");
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod test {
use super::*;
use std::path::PathBuf;
use std::fs;
use std::sync::Arc;
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
struct TestBackend {
name: &'static str,
mgr: Arc<dyn LockManager>,
dir: Option<PathBuf>,
}
impl Drop for TestBackend {
fn drop(&mut self) {
if let Some(d) = &self.dir {
let _ = fs::remove_dir_all(d);
}
}
}
fn tmp_dir() -> PathBuf {
let dir = std::env::temp_dir()
.join(format!("bark-lock-test-{}", rand::random::<u64>()));
fs::create_dir_all(&dir).unwrap();
dir
}
fn managers() -> Vec<TestBackend> {
let mut v = Vec::new();
v.push(TestBackend {
name: "InternalMemory",
mgr: Arc::new(internal_memory::InternalMemoryLockManager::new()),
dir: None,
});
v.push(TestBackend {
name: "Memory",
mgr: Arc::new(memory::MemoryLockManager::new()),
dir: None,
});
#[cfg(all(any(unix, windows), not(target_arch = "wasm32")))]
{
let dir = tmp_dir();
v.push(TestBackend {
name: "FlockPidLock",
mgr: Arc::new(pid_flock::FlockPidLockManager::new(&dir).unwrap()),
dir: Some(dir),
});
}
#[cfg(all(unix, not(target_arch = "wasm32")))]
{
let dir = tmp_dir();
v.push(TestBackend {
name: "FcntlPidLock",
mgr: Arc::new(pid_fcntl::FcntlPidLockManager::new(&dir).unwrap()),
dir: Some(dir),
});
}
#[cfg(target_arch = "wasm32")]
{
v.push(TestBackend {
name: "Web",
mgr: Arc::new(web_locks::WebLockManager::new()),
dir: None,
});
}
v
}
#[tokio::test]
async fn acquire_and_release() {
for tb in managers() {
let g = tb.mgr.lock("bark.ln_receive.1", TEST_TIMEOUT).await.unwrap();
drop(g);
let _g2 = tb.mgr.lock("bark.ln_receive.1", TEST_TIMEOUT).await.unwrap();
}
}
#[tokio::test]
async fn try_lock_returns_none_when_held() {
for tb in managers() {
let g = tb.mgr.lock("k", TEST_TIMEOUT).await.unwrap();
let busy = tb.mgr.try_lock("k").await;
assert!(busy.is_none(), "{}: second try_lock should be blocked", tb.name);
drop(g);
let g2 = tb.mgr.try_lock("k").await;
assert!(g2.is_some(), "{}: try_lock should succeed after release", tb.name);
}
}
#[tokio::test]
async fn distinct_keys_dont_block() {
for tb in managers() {
let _g1 = tb.mgr.lock("a", TEST_TIMEOUT).await.unwrap();
let _g2 = tb.mgr.lock("b", TEST_TIMEOUT).await.unwrap();
}
}
#[tokio::test]
async fn lock_returns_timeout_error() {
for tb in managers() {
let _held = tb.mgr.lock("k", TEST_TIMEOUT).await.unwrap();
let mgr = Arc::clone(&tb.mgr);
let result = tokio::spawn(async move {
mgr.lock("k", Duration::from_millis(150)).await
}).await.unwrap();
assert!(result.is_err(), "{}: expected timeout, got {:?}", tb.name, result);
assert!(result.unwrap_err().to_string().contains("timed out"));
}
}
#[tokio::test]
async fn waiter_unblocks_after_drop() {
for tb in managers() {
let g = tb.mgr.lock("k", TEST_TIMEOUT).await.unwrap();
let mgr = Arc::clone(&tb.mgr);
let waiter = tokio::spawn(async move {
mgr.lock("k", TEST_TIMEOUT).await.unwrap()
});
tokio::time::sleep(Duration::from_millis(150)).await;
drop(g);
let result = tokio::time::timeout(Duration::from_secs(2), waiter).await;
assert!(result.is_ok(), "{}: waiter should succeed after holder dropped", tb.name);
}
}
#[tokio::test]
async fn ten_concurrent_try_lock_only_one_wins() {
use tokio::sync::Barrier;
const N: usize = 10;
for tb in managers() {
let barrier = Arc::new(Barrier::new(N));
let mut handles = Vec::with_capacity(N);
for _ in 0..N {
let mgr = Arc::clone(&tb.mgr);
let barrier = Arc::clone(&barrier);
handles.push(tokio::spawn(async move {
barrier.wait().await;
let guard = mgr.try_lock("contested").await;
let acquired = guard.is_some();
if acquired {
tokio::time::sleep(Duration::from_millis(100)).await;
}
acquired
}));
}
let mut successes = 0usize;
for h in handles {
successes += h.await.unwrap() as usize;
}
assert_eq!(
successes, 1,
"{}: expected exactly 1 successful try_lock out of {}, got {}",
tb.name, N, successes,
);
}
}
#[tokio::test]
async fn reject_bad_keys() {
for tb in managers() {
assert!(tb.mgr.try_lock("").await.is_none(), "{}: empty", tb.name);
assert!(tb.mgr.try_lock("a/b").await.is_none(), "{}: slash", tb.name);
assert!(tb.mgr.try_lock("a<b>").await.is_none(), "{}: angle", tb.name);
assert!(tb.mgr.try_lock(".abc").await.is_none(), "{}: leading dot", tb.name);
assert!(tb.mgr.try_lock("_abc").await.is_none(), "{}: leading underscore", tb.name);
assert!(tb.mgr.try_lock("abc-").await.is_none(), "{}: trailing dash", tb.name);
assert!(tb.mgr.try_lock("abc.").await.is_none(), "{}: trailing dot", tb.name);
assert!(tb.mgr.try_lock(".").await.is_none(), "{}: dot", tb.name);
assert!(tb.mgr.try_lock("..").await.is_none(), "{}: dotdot", tb.name);
assert!(tb.mgr.try_lock("bark.lightning.send.42").await.is_some(),
"{}: bark.lightning.send.42 should be valid", tb.name);
assert!(tb.mgr.try_lock("01abcdef.round.7").await.is_some(),
"{}: 01abcdef.round.7 should be valid", tb.name);
}
}
#[test]
fn managers_covers_every_compiled_backend() {
let names: Vec<_> = managers().iter().map(|tb| tb.name).collect();
assert!(names.contains(&"Memory"), "missing Memory: {:?}", names);
#[cfg(target_arch = "wasm32")]
assert!(names.contains(&"Web"), "missing Web: {:?}", names);
}
#[tokio::test]
async fn platform_default_returns_a_working_manager() {
let dir = tmp_dir();
let mgr = super::platform_default(&dir)
.expect("platform_default should construct a manager");
let g = mgr.try_lock("bark.platform.default.test").await;
assert!(g.is_some(), "platform_default's manager should grant a fresh lock");
drop(g);
let _ = fs::remove_dir_all(&dir);
}
}