use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Condvar, Mutex},
};
struct PathOrderState {
current_seq: Mutex<usize>,
condvar: Condvar,
}
struct PathRegistration {
next_seq: usize,
state: Arc<PathOrderState>,
}
#[derive(Default)]
pub(crate) struct OrderedPathLocks {
registry: Mutex<HashMap<PathBuf, PathRegistration>>,
}
impl std::fmt::Debug for OrderedPathLocks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OrderedPathLocks").finish_non_exhaustive()
}
}
#[must_use = "ticket must be consumed via wait_for_turn() to avoid delaying other entries"]
pub(crate) struct PathOrderTicket {
seq: usize,
state: Option<Arc<PathOrderState>>,
}
pub(crate) struct PathOrderGuard(Arc<PathOrderState>, usize);
impl Drop for PathOrderTicket {
fn drop(&mut self) {
if let Some(state) = self.state.take() {
advance_seq(&state, self.seq);
}
}
}
impl Drop for PathOrderGuard {
fn drop(&mut self) {
advance_seq(&self.0, self.1);
}
}
fn advance_seq(state: &PathOrderState, seq: usize) {
let mut current = state.current_seq.lock().unwrap_or_else(|e| e.into_inner());
while *current < seq {
current = state
.condvar
.wait(current)
.unwrap_or_else(|e| e.into_inner());
}
*current = seq + 1;
state.condvar.notify_all();
}
impl OrderedPathLocks {
pub(crate) fn register(&self, path: &Path) -> PathOrderTicket {
let mut map = self
.registry
.lock()
.expect("path lock registry mutex poisoned");
let reg = map
.entry(path.to_path_buf())
.or_insert_with(|| PathRegistration {
next_seq: 0,
state: Arc::new(PathOrderState {
current_seq: Mutex::new(0),
condvar: Condvar::new(),
}),
});
let seq = reg.next_seq;
reg.next_seq += 1;
PathOrderTicket {
seq,
state: Some(Arc::clone(®.state)),
}
}
}
impl PathOrderTicket {
pub(crate) fn wait_for_turn(mut self) -> PathOrderGuard {
let state = self
.state
.take()
.expect("wait_for_turn called on consumed ticket");
let mut current = state.current_seq.lock().unwrap_or_else(|e| e.into_inner());
while *current != self.seq {
current = state
.condvar
.wait(current)
.unwrap_or_else(|e| e.into_inner());
}
drop(current);
PathOrderGuard(state, self.seq)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn single_path_no_contention() {
let locks = OrderedPathLocks::default();
let ticket = locks.register(Path::new("a.txt"));
let _guard = ticket.wait_for_turn();
}
#[test]
fn independent_paths_no_blocking() {
let locks = OrderedPathLocks::default();
let t1 = locks.register(Path::new("a.txt"));
let t2 = locks.register(Path::new("b.txt"));
let _g2 = t2.wait_for_turn();
let _g1 = t1.wait_for_turn();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn duplicate_path_ordering() {
let locks = Arc::new(OrderedPathLocks::default());
let order = Arc::new(AtomicUsize::new(0));
let t0 = locks.register(Path::new("file.txt"));
let t1 = locks.register(Path::new("file.txt"));
let t2 = locks.register(Path::new("file.txt"));
std::thread::scope(|s| {
let order_clone = Arc::clone(&order);
s.spawn(move || {
let _g = t0.wait_for_turn();
assert_eq!(order_clone.fetch_add(1, Ordering::SeqCst), 0);
});
let order_clone = Arc::clone(&order);
s.spawn(move || {
let _g = t1.wait_for_turn();
assert_eq!(order_clone.fetch_add(1, Ordering::SeqCst), 1);
});
let order_clone = Arc::clone(&order);
s.spawn(move || {
let _g = t2.wait_for_turn();
assert_eq!(order_clone.fetch_add(1, Ordering::SeqCst), 2);
});
});
assert_eq!(order.load(Ordering::SeqCst), 3);
}
#[test]
fn guard_advances_on_drop() {
let locks = OrderedPathLocks::default();
let t0 = locks.register(Path::new("x"));
let t1 = locks.register(Path::new("x"));
let guard = t0.wait_for_turn();
drop(guard);
let _g1 = t1.wait_for_turn();
}
#[test]
fn dropped_ticket_does_not_block_successor() {
let locks = OrderedPathLocks::default();
let t0 = locks.register(Path::new("x"));
let t1 = locks.register(Path::new("x"));
drop(t0);
let _g1 = t1.wait_for_turn();
}
}