bark/lock_manager/
pid_fcntl.rs1use std::collections::HashSet;
33use std::fs::{self, File};
34use std::io::{Read, Write};
35use std::os::unix::io::AsRawFd;
36use std::path::{Path, PathBuf};
37use std::sync::{Mutex, OnceLock};
38
39use anyhow::Context;
40
41use super::{LockGuard, LockManager, PidLockError};
42use super::memory::MemoryLockManager;
43
44pub const LOCK_FILE: &str = "LOCK";
46
47fn open_lock_file(path: &Path) -> anyhow::Result<File> {
48 File::options()
49 .read(true)
50 .write(true)
51 .create(true)
52 .truncate(false)
53 .open(path)
54 .with_context(|| format!("failed to open lock file {}", path.display()))
55}
56
57fn try_fcntl_lock(file: &File) -> anyhow::Result<bool> {
59 let mut lk: libc::flock = unsafe { std::mem::zeroed() };
60 lk.l_type = libc::F_WRLCK as libc::c_short;
61 lk.l_whence = libc::SEEK_SET as libc::c_short;
62 lk.l_start = 0;
63 lk.l_len = 0;
64
65 let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_SETLK, &lk) };
66 if ret == 0 {
67 return Ok(true);
68 }
69 let err = std::io::Error::last_os_error();
70 match err.raw_os_error() {
71 Some(libc::EAGAIN) | Some(libc::EACCES) => Ok(false),
72 _ => Err(err).context("fcntl F_SETLK failed"),
73 }
74}
75
76
77pub struct FcntlPidLockManager {
78 _pid_file: File,
81 _registration: Registration,
84 datadir: PathBuf,
85 in_process: MemoryLockManager,
86}
87
88impl FcntlPidLockManager {
89 pub fn new(datadir: impl Into<PathBuf>) -> Result<Self, PidLockError> {
95 let datadir = datadir.into();
96 let setup = |source: anyhow::Error| PidLockError::SetupFailed {
97 datadir: datadir.clone(),
98 source,
99 };
100
101 fs::create_dir_all(&datadir)
102 .with_context(|| format!("failed to create datadir {}", datadir.display()))
103 .map_err(&setup)?;
104
105 let registration = Registration::try_register(&datadir)?;
110
111 let path = datadir.join(LOCK_FILE);
112 let mut file = open_lock_file(&path).map_err(&setup)?;
113
114 if !try_fcntl_lock(&file).map_err(&setup)? {
115 let mut holder = String::new();
116 let _ = file.read_to_string(&mut holder);
117 let pid = holder.trim().parse::<u32>().ok();
118 return Err(PidLockError::AlreadyHeld { datadir, pid });
119 }
120
121 file.set_len(0).context("failed to truncate pid lock").map_err(&setup)?;
124 write!(file, "{}", std::process::id())
125 .context("failed to write pid lock").map_err(&setup)?;
126 file.flush().context("failed to flush pid lock").map_err(&setup)?;
127
128 Ok(Self {
129 _pid_file: file,
130 _registration: registration,
131 datadir,
132 in_process: MemoryLockManager::new(),
133 })
134 }
135
136 pub fn datadir(&self) -> &Path {
137 &self.datadir
138 }
139}
140
141struct Registration {
144 datadir: PathBuf,
145}
146
147impl Registration {
148 fn try_register(datadir: &Path) -> Result<Self, PidLockError> {
149 let mut held = held_datadirs().lock().expect("FcntlPidLockManager registry poisoned");
150 if !held.insert(datadir.to_path_buf()) {
151 return Err(PidLockError::AlreadyHeld {
152 datadir: datadir.to_path_buf(),
153 pid: Some(std::process::id()),
154 });
155 }
156 Ok(Self { datadir: datadir.to_path_buf() })
157 }
158}
159
160impl Drop for Registration {
161 fn drop(&mut self) {
162 if let Ok(mut held) = held_datadirs().lock() {
163 held.remove(&self.datadir);
164 }
165 }
166}
167
168fn held_datadirs() -> &'static Mutex<HashSet<PathBuf>> {
169 static HELD: OnceLock<Mutex<HashSet<PathBuf>>> = OnceLock::new();
170 HELD.get_or_init(|| Mutex::new(HashSet::new()))
171}
172
173impl std::fmt::Debug for FcntlPidLockManager {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("FcntlPidLockManager").field("datadir", &self.datadir).finish()
176 }
177}
178
179#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
180#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
181impl LockManager for FcntlPidLockManager {
182 async fn try_lock(&self, key: &str) -> Option<Box<dyn LockGuard>> {
183 self.in_process.try_lock(key).await
184 }
185
186 async fn lock(
187 &self,
188 key: &str,
189 timeout: std::time::Duration,
190 ) -> anyhow::Result<Box<dyn LockGuard>> {
191 self.in_process.lock(key, timeout).await
192 }
193}
194
195#[cfg(test)]
196mod test {
197 use super::*;
198
199 fn tmp_dir() -> PathBuf {
200 let dir = std::env::temp_dir()
201 .join(format!("bark-pid-fcntl-lockmgr-{}", std::process::id()))
202 .join(format!("{}", std::time::SystemTime::now()
203 .duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()));
204 let _ = fs::remove_dir_all(&dir);
205 dir
206 }
207
208 #[tokio::test]
209 async fn acquire_writes_holder_pid() {
210 let dir = tmp_dir();
211 let mgr = FcntlPidLockManager::new(&dir).unwrap();
212 let contents = fs::read_to_string(dir.join(LOCK_FILE)).unwrap();
213 assert_eq!(contents, std::process::id().to_string());
214 drop(mgr);
215 let _ = fs::remove_dir_all(&dir);
216 }
217
218 #[tokio::test]
219 async fn second_acquire_in_same_process_is_refused() {
220 let dir = tmp_dir();
221 let _held = FcntlPidLockManager::new(&dir).unwrap();
222 let err = FcntlPidLockManager::new(&dir).unwrap_err();
223 assert!(
224 err.to_string().contains("another process is already using datadir"),
225 "unexpected error: {}", err,
226 );
227 drop(_held);
228 let _ = fs::remove_dir_all(&dir);
229 }
230
231 #[tokio::test]
232 async fn reacquire_after_drop_succeeds() {
233 let dir = tmp_dir();
234 let first = FcntlPidLockManager::new(&dir).unwrap();
235 drop(first);
236 let _second = FcntlPidLockManager::new(&dir).unwrap();
237 let _ = fs::remove_dir_all(&dir);
238 }
239
240 #[tokio::test]
241 async fn per_key_locking_works_in_process() {
242 let dir = tmp_dir();
243 let mgr = FcntlPidLockManager::new(&dir).unwrap();
244
245 let g = mgr.try_lock("foo").await;
246 assert!(g.is_some());
247
248 let busy = mgr.try_lock("foo").await;
249 assert!(busy.is_none(), "same key should be blocked");
250
251 let g2 = mgr.try_lock("bar").await;
252 assert!(g2.is_some(), "different key should be free");
253
254 let _ = fs::remove_dir_all(&dir);
255 }
256}