use libc::{c_int, c_short};
use std::collections::HashSet;
use std::fs::{self, File};
use std::io::{Error as IoError, ErrorKind};
use std::os::fd::AsRawFd;
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::process;
use std::sync::{LazyLock, Mutex};
use std::thread::sleep;
use std::time::{Duration, Instant};
use tracing::{error, info};
use crate::storage::backend::StorageError;
#[cfg(test)]
mod test;
static LOCKS: LazyLock<Mutex<HashSet<(u64, u64)>>> = LazyLock::new(|| Mutex::new(HashSet::new()));
#[derive(Debug)]
pub struct LockedDirectory {
base: PathBuf,
_file: File,
dev_ino: (u64, u64),
}
impl Drop for LockedDirectory {
fn drop(&mut self) {
assert!(LOCKS.lock().unwrap().remove(&self.dev_ino));
}
}
fn fcntl_lock(file: &File, cmd: c_int) -> Result<libc::flock, IoError> {
let mut flock = libc::flock {
l_type: libc::F_WRLCK as c_short,
l_whence: libc::SEEK_SET as c_short,
l_start: 0,
l_len: 0,
l_pid: 0,
};
match unsafe { libc::fcntl(file.as_raw_fd(), cmd, &mut flock as *mut libc::flock) } {
-1 => Err(IoError::last_os_error()),
_ => Ok(flock),
}
}
fn write_lock(file: &File) -> Result<(), IoError> {
fcntl_lock(file, libc::F_SETLK).map(|_| ())
}
fn get_lock(file: &File) -> Result<Option<u32>, IoError> {
const F_UNLCK: c_int = libc::F_UNLCK as c_int;
fcntl_lock(file, libc::F_GETLK).map(|flock| match flock.l_type as c_int {
F_UNLCK => None,
_ => Some(flock.l_pid as u32),
})
}
impl LockedDirectory {
const LOCKFILE_NAME: &'static str = "feldera.pidlock";
pub fn new_blocking<P: AsRef<Path>>(
base_path: P,
patience: Duration,
) -> Result<LockedDirectory, StorageError> {
let base = base_path.as_ref().to_path_buf();
let pid_file = base.join(LockedDirectory::LOCKFILE_NAME);
let start = Instant::now();
let mut blocked = false;
loop {
let mut locks = LOCKS.lock().unwrap();
match fs::metadata(&pid_file) {
Ok(metadata) => {
let dev_ino = (metadata.dev(), metadata.ino());
if locks.contains(&dev_ino) {
return Err(StorageError::StorageLocked(process::id(), base));
}
}
Err(error) if error.kind() == ErrorKind::NotFound => (),
Err(error) => {
return Err(StorageError::stdio(
error.kind(),
"stat",
pid_file.display(),
));
}
}
let file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&pid_file)
.map_err(|e| StorageError::stdio(e.kind(), "create", pid_file.display()))?;
let metadata = file
.metadata()
.map_err(|e| StorageError::stdio(e.kind(), "fstat", pid_file.display()))?;
let dev_ino = (metadata.dev(), metadata.ino());
match write_lock(&file) {
Err(error)
if error.kind() == ErrorKind::PermissionDenied
|| error.kind() == ErrorKind::WouldBlock =>
{
let pid = get_lock(&file).unwrap_or(None).unwrap_or(0);
if start.elapsed() >= patience {
if blocked {
error!(
"{}: gave up waiting for process {pid} to release lock after {:.1} seconds",
pid_file.display(),
start.elapsed().as_secs_f64()
);
}
return Err(StorageError::StorageLocked(pid, base));
}
if !blocked {
info!(
"{}: waiting up to {:.1} seconds for process {pid} to release lock",
pid_file.display(),
patience.as_secs_f64(),
);
blocked = true;
}
sleep(Duration::from_millis(100));
}
Err(error) => {
return Err(StorageError::stdio(
error.kind(),
"exclusive lock",
pid_file.display(),
));
}
Ok(()) => {
if blocked {
info!(
"{}: acquired lock after {:.1} seconds",
pid_file.display(),
start.elapsed().as_secs_f64()
);
}
locks.insert(dev_ino);
return Ok(Self {
base,
_file: file,
dev_ino,
});
}
};
}
}
pub fn new<P: AsRef<Path>>(base_path: P) -> Result<LockedDirectory, StorageError> {
Self::new_blocking(base_path, Duration::ZERO)
}
pub fn base(&self) -> &Path {
self.base.as_path()
}
}