use std::path::{Path, PathBuf};
pub struct LockClaim {
gate: std::mem::ManuallyDrop<std::fs::File>,
announce: std::mem::ManuallyDrop<std::fs::File>,
}
impl LockClaim {
fn new(gate: std::fs::File, announce: std::fs::File) -> Self {
Self {
gate: std::mem::ManuallyDrop::new(gate),
announce: std::mem::ManuallyDrop::new(announce),
}
}
pub fn prepare_transfer(&self, cmd: &mut tokio::process::Command) {
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let gate_fd = self.gate.as_raw_fd();
let announce_fd = self.announce.as_raw_fd();
unsafe {
cmd.pre_exec(move || {
for fd in [gate_fd, announce_fd] {
nix::fcntl::fcntl(
fd,
nix::fcntl::FcntlArg::F_SETFD(nix::fcntl::FdFlag::empty()),
)
.map_err(std::io::Error::from)?;
}
Ok(())
});
}
}
#[cfg(windows)]
{
let _ = cmd;
}
}
pub fn release(mut self) -> std::io::Result<()> {
let announce = unsafe { std::mem::ManuallyDrop::take(&mut self.announce) };
let gate = unsafe { std::mem::ManuallyDrop::take(&mut self.gate) };
release_file(announce)?;
release_file(gate)
}
pub fn transfer(
mut self,
child: &tokio::process::Child,
) -> Result<(), (Self, std::io::Error)> {
#[cfg(windows)]
{
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::Foundation::{
DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE,
};
use windows_sys::Win32::System::Threading::GetCurrentProcess;
let Some(child_handle) = child.raw_handle() else {
return Err((
self,
std::io::Error::other(
"child has no process handle (already reaped)",
),
));
};
for source in [self.gate.as_raw_handle(), self.announce.as_raw_handle()] {
let mut injected: HANDLE = std::ptr::null_mut();
let ok = unsafe {
DuplicateHandle(
GetCurrentProcess(),
source as HANDLE,
child_handle as HANDLE,
&mut injected,
0,
0, DUPLICATE_SAME_ACCESS,
)
};
if ok == 0 {
return Err((self, std::io::Error::last_os_error()));
}
}
unsafe {
drop(std::mem::ManuallyDrop::take(&mut self.announce));
drop(std::mem::ManuallyDrop::take(&mut self.gate));
}
Ok(())
}
#[cfg(unix)]
{
let _ = child;
unsafe {
drop(std::mem::ManuallyDrop::take(&mut self.announce));
drop(std::mem::ManuallyDrop::take(&mut self.gate));
}
Ok(())
}
}
}
fn release_file(file: std::fs::File) -> std::io::Result<()> {
#[cfg(windows)]
{
use std::os::windows::io::IntoRawHandle;
use windows_sys::Win32::Foundation::CloseHandle;
let handle = file.into_raw_handle();
if unsafe { CloseHandle(handle as _) } == 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
#[cfg(unix)]
{
use nix::fcntl::{FlockArg, flock};
use std::os::unix::io::AsRawFd;
flock(file.as_raw_fd(), FlockArg::Unlock).map_err(std::io::Error::from)?;
Ok(())
}
}
pub async fn try_acquire(dir: &Path, key: &str, contents: &str) -> Option<LockClaim> {
tokio::fs::create_dir_all(dir).await.ok()?;
let mut gate = open_claim_file(&gate_path(dir, key))?;
write_contents(&mut gate, contents).ok()?;
let mut announce = open_claim_file(&announce_path(dir, key))?;
write_beacon(&mut announce).ok()?;
Some(LockClaim::new(gate, announce))
}
pub async fn wait_acquire(
dir: &Path,
key: &str,
contents: &str,
) -> std::io::Result<LockClaim> {
tokio::fs::create_dir_all(dir).await?;
let gate_path = gate_path(dir, key);
let announce_path = announce_path(dir, key);
loop {
#[cfg(windows)]
let mut gate = wait_acquire_windows(gate_path.clone()).await?;
#[cfg(unix)]
let mut gate = wait_acquire_unix(gate_path.clone()).await?;
write_contents(&mut gate, contents)?;
match open_claim_file(&announce_path) {
Some(mut announce) => {
write_beacon(&mut announce)?;
return Ok(LockClaim::new(gate, announce));
}
None => {
drop(gate);
#[cfg(windows)]
wait_release_windows(announce_path.clone()).await?;
#[cfg(unix)]
wait_release_unix(announce_path.clone()).await?;
}
}
}
}
const PARTIAL_STATE_POLL: std::time::Duration = std::time::Duration::from_micros(100);
pub async fn try_held(dir: &Path, key: &str) -> bool {
let gate = gate_path(dir, key);
let announce = announce_path(dir, key);
loop {
match (file_locked(&gate), file_locked(&announce)) {
(false, _) => return false,
(true, true) => return true,
(true, false) => tokio::time::sleep(PARTIAL_STATE_POLL).await,
}
}
}
pub async fn try_read(dir: &Path, key: &str) -> std::io::Result<Option<String>> {
let gate = gate_path(dir, key);
loop {
let watcher = match ChangeWatcher::arm(dir, &gate) {
Ok(w) => w,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e),
};
if !try_held(dir, key).await {
return Ok(None);
}
let contents = match tokio::fs::read_to_string(&gate).await {
Ok(c) => c,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(e),
};
if !try_held(dir, key).await {
continue;
}
if watcher.dirty()? {
continue;
}
return Ok(Some(contents));
}
}
pub async fn wait_held(dir: &Path, key: &str) -> std::io::Result<()> {
tokio::fs::create_dir_all(dir).await?;
let announce = announce_path(dir, key);
loop {
let watcher = HeldWatcher::arm(dir, &announce)?;
if try_held(dir, key).await {
return Ok(());
}
watcher.wait().await?;
}
}
pub async fn wait_read(dir: &Path, key: &str) -> std::io::Result<String> {
loop {
wait_held(dir, key).await?;
if let Some(contents) = try_read(dir, key).await? {
return Ok(contents);
}
}
}
pub async fn wait_released(dir: &Path, key: &str) -> std::io::Result<()> {
let path = announce_path(dir, key);
#[cfg(windows)]
{
wait_release_windows(path).await
}
#[cfg(unix)]
{
wait_release_unix(path).await
}
}
pub async fn owners(dir: &Path, key: &str) -> std::io::Result<Vec<u32>> {
let gate = gate_path(dir, key);
let announce = announce_path(dir, key);
tokio::task::spawn_blocking(move || {
let mut pids = file_owners(&gate)?;
for pid in file_owners(&announce)? {
if !pids.contains(&pid) {
pids.push(pid);
}
}
Ok(pids)
})
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
#[cfg(windows)]
fn file_owners(path: &Path) -> std::io::Result<Vec<u32>> {
use std::os::windows::ffi::OsStrExt;
use windows_sys::Win32::Foundation::ERROR_MORE_DATA;
use windows_sys::Win32::System::RestartManager::{
CCH_RM_SESSION_KEY, RM_PROCESS_INFO, RmEndSession, RmGetList,
RmRegisterResources, RmStartSession,
};
if !path.exists() {
return Ok(Vec::new());
}
let mut session: u32 = 0;
let mut session_key = [0u16; CCH_RM_SESSION_KEY as usize + 1];
if unsafe { RmStartSession(&mut session, 0, session_key.as_mut_ptr()) } != 0 {
return Ok(Vec::new());
}
struct Session(u32);
impl Drop for Session {
fn drop(&mut self) {
unsafe {
RmEndSession(self.0);
}
}
}
let _session = Session(session);
let wide: Vec<u16> = path
.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect();
let files = [wide.as_ptr()];
if unsafe {
RmRegisterResources(
session,
1,
files.as_ptr(),
0,
std::ptr::null(),
0,
std::ptr::null(),
)
} != 0
{
return Ok(Vec::new());
}
let mut needed: u32 = 0;
let mut count: u32 = 0;
let mut reason: u32 = 0;
let rc = unsafe {
RmGetList(
session,
&mut needed,
&mut count,
std::ptr::null_mut(),
&mut reason,
)
};
if rc != 0 && rc != ERROR_MORE_DATA {
return Ok(Vec::new());
}
if needed == 0 {
return Ok(Vec::new());
}
let mut infos: Vec<RM_PROCESS_INFO> =
vec![unsafe { std::mem::zeroed() }; needed as usize];
count = needed;
if unsafe {
RmGetList(
session,
&mut needed,
&mut count,
infos.as_mut_ptr(),
&mut reason,
)
} != 0
{
return Ok(Vec::new());
}
let me = std::process::id();
Ok(infos[..count as usize]
.iter()
.map(|i| i.Process.dwProcessId)
.filter(|&pid| pid != 0 && pid != me)
.collect())
}
#[cfg(target_os = "linux")]
fn file_owners(path: &Path) -> std::io::Result<Vec<u32>> {
use std::os::unix::fs::MetadataExt;
let Ok(meta) = std::fs::metadata(path) else {
return Ok(Vec::new());
};
let target_ino = meta.ino();
let dev = meta.dev();
let major = (dev >> 8) & 0xfff;
let minor = (dev & 0xff) | ((dev >> 12) & 0xfff_ff00);
let Ok(locks) = std::fs::read_to_string("/proc/locks") else {
return Ok(Vec::new());
};
let me = std::process::id();
let mut pids = Vec::new();
for line in locks.lines() {
let f: Vec<&str> = line.split_whitespace().collect();
if f.len() < 6 || f[1] != "FLOCK" || f[3] != "WRITE" {
continue;
}
let Ok(pid) = f[4].parse::<u32>() else {
continue;
};
if pid == 0 || pid == me {
continue;
}
let mut di = f[5].split(':');
let (Some(maj), Some(min), Some(ino)) = (di.next(), di.next(), di.next())
else {
continue;
};
let (Ok(maj), Ok(min), Ok(ino)) = (
u64::from_str_radix(maj, 16),
u64::from_str_radix(min, 16),
ino.parse::<u64>(),
) else {
continue;
};
if ino == target_ino && maj == major && min == minor && !pids.contains(&pid)
{
pids.push(pid);
}
}
Ok(pids)
}
#[cfg(target_os = "macos")]
mod libproc {
pub const PROC_ALL_PIDS: u32 = 1;
pub const PROC_PIDFDVNODEPATHINFO: i32 = 2;
#[repr(C)]
#[derive(Clone, Copy)]
pub struct VinfoStat {
pub vst_dev: u32,
pub vst_mode: u16,
pub vst_nlink: u16,
pub vst_ino: u64,
pub vst_uid: u32,
pub vst_gid: u32,
pub vst_atime: i64,
pub vst_atimensec: i64,
pub vst_mtime: i64,
pub vst_mtimensec: i64,
pub vst_ctime: i64,
pub vst_ctimensec: i64,
pub vst_birthtime: i64,
pub vst_birthtimensec: i64,
pub vst_size: i64,
pub vst_blocks: i64,
pub vst_blksize: i32,
pub vst_flags: u32,
pub vst_gen: u32,
pub vst_rdev: u32,
pub vst_qspare: [i64; 2],
}
#[repr(C)]
#[derive(Clone, Copy)]
pub struct VnodeInfo {
pub vi_stat: VinfoStat,
pub vi_type: i32,
pub vi_pad: i32,
pub vi_fsid: [i32; 2],
}
#[repr(C)]
#[derive(Clone, Copy)]
pub struct VnodeInfoPath {
pub vip_vi: VnodeInfo,
pub vip_path: [u8; 1024],
}
#[repr(C)]
#[derive(Clone, Copy)]
pub struct VnodeFdInfoWithPath {
pub pvip: VnodeInfoPath,
}
}
#[cfg(target_os = "macos")]
fn file_owners(path: &Path) -> std::io::Result<Vec<u32>> {
use std::os::unix::fs::MetadataExt;
let Ok(meta) = std::fs::metadata(path) else {
return Ok(Vec::new());
};
let (target_dev, target_ino) = (meta.dev() as u32, meta.ino());
let me = std::process::id();
let bytes = unsafe {
nix::libc::proc_listpids(libproc::PROC_ALL_PIDS, 0, std::ptr::null_mut(), 0)
};
if bytes <= 0 {
return Ok(Vec::new());
}
let cap = bytes as usize / std::mem::size_of::<i32>();
let mut all_pids = vec![0i32; cap];
let got = unsafe {
nix::libc::proc_listpids(
libproc::PROC_ALL_PIDS,
0,
all_pids.as_mut_ptr() as *mut nix::libc::c_void,
bytes,
)
};
if got <= 0 {
return Ok(Vec::new());
}
all_pids.truncate(got as usize / std::mem::size_of::<i32>());
let mut pids = Vec::new();
for pid in all_pids {
if pid <= 0 || pid as u32 == me {
continue;
}
let fbytes = unsafe {
nix::libc::proc_pidinfo(
pid,
nix::libc::PROC_PIDLISTFDS,
0,
std::ptr::null_mut(),
0,
)
};
if fbytes <= 0 {
continue;
}
let fcap = fbytes as usize / std::mem::size_of::<nix::libc::proc_fdinfo>();
let mut fds: Vec<nix::libc::proc_fdinfo> =
vec![unsafe { std::mem::zeroed() }; fcap];
let fgot = unsafe {
nix::libc::proc_pidinfo(
pid,
nix::libc::PROC_PIDLISTFDS,
0,
fds.as_mut_ptr() as *mut nix::libc::c_void,
fbytes,
)
};
if fgot <= 0 {
continue;
}
fds.truncate(fgot as usize / std::mem::size_of::<nix::libc::proc_fdinfo>());
for fd in fds {
if fd.proc_fdtype != nix::libc::PROX_FDTYPE_VNODE as u32 {
continue;
}
let mut vi: libproc::VnodeFdInfoWithPath = unsafe { std::mem::zeroed() };
let n = unsafe {
nix::libc::proc_pidfdinfo(
pid,
fd.proc_fd,
libproc::PROC_PIDFDVNODEPATHINFO,
&mut vi as *mut _ as *mut nix::libc::c_void,
std::mem::size_of::<libproc::VnodeFdInfoWithPath>() as i32,
)
};
if n <= 0 {
continue;
}
if vi.pvip.vip_vi.vi_stat.vst_ino == target_ino
&& vi.pvip.vip_vi.vi_stat.vst_dev == target_dev
&& !pids.contains(&(pid as u32))
{
pids.push(pid as u32);
}
}
}
Ok(pids)
}
fn gate_path(dir: &Path, key: &str) -> PathBuf {
dir.join(format!("{}.lock", filename_escape(key)))
}
fn announce_path(dir: &Path, key: &str) -> PathBuf {
dir.join(format!("{}.live.lock", filename_escape(key)))
}
fn filename_escape(key: &str) -> String {
let mut out = String::with_capacity(key.len());
for b in key.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'_' | b'-' => out.push(b as char),
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}
fn write_beacon(announce: &mut std::fs::File) -> std::io::Result<()> {
use std::io::Write;
announce.write_all(b"1")?;
announce.flush()
}
fn write_contents(file: &mut std::fs::File, contents: &str) -> std::io::Result<()> {
use std::io::{Seek, Write};
file.set_len(0)?;
file.seek(std::io::SeekFrom::Start(0))?;
file.write_all(contents.as_bytes())?;
file.flush()
}
fn file_locked(path: &Path) -> bool {
#[cfg(windows)]
{
path.exists()
}
#[cfg(unix)]
{
use nix::fcntl::{FlockArg, flock};
use std::os::unix::io::AsRawFd;
let Ok(file) = std::fs::OpenOptions::new().read(true).open(path) else {
return false;
};
if flock(file.as_raw_fd(), FlockArg::LockSharedNonblock).is_ok() {
let _ = flock(file.as_raw_fd(), FlockArg::Unlock);
false
} else {
true
}
}
}
#[cfg(windows)]
struct ChangeWatcher {
handle: isize,
}
#[cfg(windows)]
impl ChangeWatcher {
fn arm(dir: &Path, _gate: &Path) -> std::io::Result<Self> {
use std::os::windows::ffi::OsStrExt;
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
use windows_sys::Win32::Storage::FileSystem::{
FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_CHANGE_LAST_WRITE,
FILE_NOTIFY_CHANGE_SIZE, FindFirstChangeNotificationW,
};
let dir_wide: Vec<u16> = dir
.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect();
let handle = unsafe {
FindFirstChangeNotificationW(
dir_wide.as_ptr(),
0,
FILE_NOTIFY_CHANGE_FILE_NAME
| FILE_NOTIFY_CHANGE_LAST_WRITE
| FILE_NOTIFY_CHANGE_SIZE,
)
};
if handle == INVALID_HANDLE_VALUE {
return Err(std::io::Error::last_os_error());
}
Ok(Self {
handle: handle as isize,
})
}
fn dirty(&self) -> std::io::Result<bool> {
use windows_sys::Win32::Foundation::{WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT};
use windows_sys::Win32::System::Threading::WaitForSingleObject;
let rc = unsafe { WaitForSingleObject(self.handle as _, 0) };
match rc {
WAIT_OBJECT_0 => Ok(true),
WAIT_TIMEOUT => Ok(false),
WAIT_FAILED => Err(std::io::Error::last_os_error()),
other => Err(std::io::Error::other(format!(
"unexpected WaitForSingleObject result: {other}"
))),
}
}
}
#[cfg(windows)]
impl Drop for ChangeWatcher {
fn drop(&mut self) {
use windows_sys::Win32::Storage::FileSystem::FindCloseChangeNotification;
unsafe {
FindCloseChangeNotification(self.handle as _);
}
}
}
#[cfg(target_os = "linux")]
struct ChangeWatcher {
inotify: nix::sys::inotify::Inotify,
}
#[cfg(target_os = "linux")]
impl ChangeWatcher {
fn arm(_dir: &Path, gate: &Path) -> std::io::Result<Self> {
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify};
let inotify =
Inotify::init(InitFlags::IN_NONBLOCK).map_err(std::io::Error::from)?;
inotify
.add_watch(
gate,
AddWatchFlags::IN_MODIFY
| AddWatchFlags::IN_ATTRIB
| AddWatchFlags::IN_CLOSE_WRITE
| AddWatchFlags::IN_DELETE_SELF
| AddWatchFlags::IN_MOVE_SELF,
)
.map_err(std::io::Error::from)?;
Ok(Self { inotify })
}
fn dirty(&self) -> std::io::Result<bool> {
match self.inotify.read_events() {
Ok(events) => Ok(!events.is_empty()),
Err(nix::errno::Errno::EAGAIN) => Ok(false),
Err(e) => Err(std::io::Error::from(e)),
}
}
}
#[cfg(all(unix, not(target_os = "linux")))]
struct ChangeWatcher {
gate: PathBuf,
snapshot: (u64, u64, i64, i64, i64, i64),
}
#[cfg(all(unix, not(target_os = "linux")))]
impl ChangeWatcher {
fn snapshot_of(gate: &Path) -> std::io::Result<(u64, u64, i64, i64, i64, i64)> {
use std::os::unix::fs::MetadataExt;
let meta = std::fs::metadata(gate)?;
Ok((
meta.ino(),
meta.size(),
meta.mtime(),
meta.mtime_nsec(),
meta.ctime(),
meta.ctime_nsec(),
))
}
fn arm(_dir: &Path, gate: &Path) -> std::io::Result<Self> {
Ok(Self {
gate: gate.to_path_buf(),
snapshot: Self::snapshot_of(gate)?,
})
}
fn dirty(&self) -> std::io::Result<bool> {
match Self::snapshot_of(&self.gate) {
Ok(snapshot) => Ok(snapshot != self.snapshot),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(true),
Err(e) => Err(e),
}
}
}
#[cfg(windows)]
struct HeldWatcher {
handle: isize,
}
#[cfg(windows)]
impl HeldWatcher {
fn arm(dir: &Path, _announce: &Path) -> std::io::Result<Self> {
use std::os::windows::ffi::OsStrExt;
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
use windows_sys::Win32::Storage::FileSystem::{
FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_CHANGE_LAST_WRITE,
FILE_NOTIFY_CHANGE_SIZE, FindFirstChangeNotificationW,
};
let dir_wide: Vec<u16> = dir
.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect();
let handle = unsafe {
FindFirstChangeNotificationW(
dir_wide.as_ptr(),
0,
FILE_NOTIFY_CHANGE_FILE_NAME
| FILE_NOTIFY_CHANGE_LAST_WRITE
| FILE_NOTIFY_CHANGE_SIZE,
)
};
if handle == INVALID_HANDLE_VALUE {
return Err(std::io::Error::last_os_error());
}
Ok(Self {
handle: handle as isize,
})
}
async fn wait(&self) -> std::io::Result<()> {
use windows_sys::Win32::Foundation::{WAIT_FAILED, WAIT_OBJECT_0};
use windows_sys::Win32::System::Threading::{INFINITE, WaitForSingleObject};
let handle = self.handle;
tokio::task::spawn_blocking(move || {
let rc = unsafe { WaitForSingleObject(handle as _, INFINITE) };
match rc {
WAIT_OBJECT_0 => Ok(()),
WAIT_FAILED => Err(std::io::Error::last_os_error()),
other => Err(std::io::Error::other(format!(
"unexpected WaitForSingleObject result: {other}"
))),
}
})
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
}
#[cfg(windows)]
impl Drop for HeldWatcher {
fn drop(&mut self) {
use windows_sys::Win32::Storage::FileSystem::FindCloseChangeNotification;
unsafe {
FindCloseChangeNotification(self.handle as _);
}
}
}
#[cfg(target_os = "linux")]
struct HeldWatcher {
inotify: Option<nix::sys::inotify::Inotify>,
}
#[cfg(target_os = "linux")]
impl HeldWatcher {
fn arm(dir: &Path, _announce: &Path) -> std::io::Result<Self> {
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify};
let inotify = Inotify::init(InitFlags::empty()).map_err(std::io::Error::from)?;
inotify
.add_watch(
dir,
AddWatchFlags::IN_CREATE
| AddWatchFlags::IN_MODIFY
| AddWatchFlags::IN_CLOSE_WRITE
| AddWatchFlags::IN_MOVED_TO
| AddWatchFlags::IN_ATTRIB,
)
.map_err(std::io::Error::from)?;
Ok(Self {
inotify: Some(inotify),
})
}
async fn wait(mut self) -> std::io::Result<()> {
let inotify = self.inotify.take().expect("wait called once");
tokio::task::spawn_blocking(move || {
inotify.read_events().map(|_| ()).map_err(std::io::Error::from)
})
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
}
#[cfg(all(unix, not(target_os = "linux")))]
struct HeldWatcher {
kqueue: nix::sys::event::Kqueue,
_dir: std::fs::File,
_announce: Option<std::fs::File>,
}
#[cfg(all(unix, not(target_os = "linux")))]
impl HeldWatcher {
fn arm(dir: &Path, announce: &Path) -> std::io::Result<Self> {
use nix::sys::event::{EventFilter, EventFlag, FilterFlag, KEvent, Kqueue};
use std::os::unix::io::AsRawFd;
let dir_file = std::fs::File::open(dir)?;
let announce_file = match std::fs::File::open(announce) {
Ok(f) => Some(f),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => return Err(e),
};
let kqueue = Kqueue::new().map_err(std::io::Error::from)?;
let fflags = FilterFlag::NOTE_WRITE
| FilterFlag::NOTE_EXTEND
| FilterFlag::NOTE_ATTRIB
| FilterFlag::NOTE_DELETE
| FilterFlag::NOTE_RENAME
| FilterFlag::NOTE_LINK;
let mut changes = vec![KEvent::new(
dir_file.as_raw_fd() as usize,
EventFilter::EVFILT_VNODE,
EventFlag::EV_ADD | EventFlag::EV_CLEAR,
fflags,
0,
0,
)];
if let Some(f) = &announce_file {
changes.push(KEvent::new(
f.as_raw_fd() as usize,
EventFilter::EVFILT_VNODE,
EventFlag::EV_ADD | EventFlag::EV_CLEAR,
fflags,
0,
0,
));
}
kqueue
.kevent(
&changes,
&mut [],
Some(nix::libc::timespec {
tv_sec: 0,
tv_nsec: 0,
}),
)
.map_err(std::io::Error::from)?;
Ok(Self {
kqueue,
_dir: dir_file,
_announce: announce_file,
})
}
async fn wait(self) -> std::io::Result<()> {
use nix::sys::event::{EventFilter, EventFlag, FilterFlag, KEvent};
tokio::task::spawn_blocking(move || {
let mut events = [KEvent::new(
0,
EventFilter::EVFILT_VNODE,
EventFlag::empty(),
FilterFlag::empty(),
0,
0,
)];
self.kqueue
.kevent(&[], &mut events, None)
.map(|_| ())
.map_err(std::io::Error::from)
})
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
}
#[cfg(windows)]
fn open_claim_file(path: &Path) -> Option<std::fs::File> {
use std::os::windows::ffi::OsStrExt;
use std::os::windows::io::FromRawHandle;
use windows_sys::Win32::Foundation::{
GENERIC_READ, GENERIC_WRITE, INVALID_HANDLE_VALUE,
};
use windows_sys::Win32::Storage::FileSystem::{
CREATE_NEW, CreateFileW, FILE_ATTRIBUTE_NORMAL,
FILE_FLAG_DELETE_ON_CLOSE, FILE_SHARE_READ,
};
let wide: Vec<u16> = path
.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect();
let handle = unsafe {
CreateFileW(
wide.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ,
std::ptr::null(),
CREATE_NEW,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_DELETE_ON_CLOSE,
std::ptr::null_mut(),
)
};
if handle == INVALID_HANDLE_VALUE {
return None;
}
Some(unsafe { std::fs::File::from_raw_handle(handle as _) })
}
#[cfg(windows)]
async fn wait_release_windows(path: PathBuf) -> std::io::Result<()> {
tokio::task::spawn_blocking(move || windows_wait_for_file_gone(&path))
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
#[cfg(windows)]
async fn wait_acquire_windows(path: PathBuf) -> std::io::Result<std::fs::File> {
loop {
if let Some(file) = open_claim_file(&path) {
return Ok(file);
}
wait_release_windows(path.clone()).await?;
}
}
#[cfg(windows)]
fn windows_wait_for_file_gone(path: &Path) -> std::io::Result<()> {
use std::os::windows::ffi::OsStrExt;
use windows_sys::Win32::Foundation::{
INVALID_HANDLE_VALUE, WAIT_FAILED, WAIT_OBJECT_0,
};
use windows_sys::Win32::Storage::FileSystem::{
FILE_NOTIFY_CHANGE_FILE_NAME, FindCloseChangeNotification,
FindFirstChangeNotificationW, FindNextChangeNotification,
};
use windows_sys::Win32::System::Threading::{INFINITE, WaitForSingleObject};
let parent = match path.parent() {
Some(p) => p,
None => return Ok(()),
};
if !path.exists() {
return Ok(());
}
let parent_wide: Vec<u16> = parent
.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect();
let handle = unsafe {
FindFirstChangeNotificationW(
parent_wide.as_ptr(),
0,
FILE_NOTIFY_CHANGE_FILE_NAME,
)
};
if handle == INVALID_HANDLE_VALUE {
return Err(std::io::Error::last_os_error());
}
struct Guard(isize);
impl Drop for Guard {
fn drop(&mut self) {
unsafe {
FindCloseChangeNotification(self.0 as _);
}
}
}
let _guard = Guard(handle as isize);
loop {
if !path.exists() {
return Ok(());
}
let rc = unsafe { WaitForSingleObject(handle as _, INFINITE) };
if rc == WAIT_FAILED {
return Err(std::io::Error::last_os_error());
}
if rc != WAIT_OBJECT_0 {
return Err(std::io::Error::other(format!(
"unexpected WaitForSingleObject result: {rc}"
)));
}
unsafe { FindNextChangeNotification(handle as _) };
}
}
#[cfg(unix)]
fn open_claim_file(path: &Path) -> Option<std::fs::File> {
match try_create_locked(path) {
Ok(file) => return Some(file),
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(_) => return None,
}
take_existing_lock(path)
}
#[cfg(unix)]
fn try_create_locked(path: &Path) -> std::io::Result<std::fs::File> {
use nix::fcntl::{FlockArg, flock};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.mode(0o644)
.open(path)?;
if flock(file.as_raw_fd(), FlockArg::LockExclusiveNonblock).is_err() {
drop(file);
let _ = std::fs::remove_file(path);
return Err(std::io::Error::other("flock failed"));
}
Ok(file)
}
#[cfg(unix)]
fn take_existing_lock(path: &Path) -> Option<std::fs::File> {
use nix::fcntl::{FlockArg, flock};
use std::os::unix::io::AsRawFd;
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(path)
.ok()?;
if flock(file.as_raw_fd(), FlockArg::LockExclusiveNonblock).is_err() {
return None;
}
Some(file)
}
#[cfg(unix)]
async fn wait_release_unix(path: PathBuf) -> std::io::Result<()> {
tokio::task::spawn_blocking(move || unix_wait_for_release(&path))
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
#[cfg(unix)]
async fn wait_acquire_unix(path: PathBuf) -> std::io::Result<std::fs::File> {
tokio::task::spawn_blocking(move || unix_wait_for_acquire(&path))
.await
.map_err(|e| std::io::Error::other(format!("join: {e}")))?
}
#[cfg(unix)]
fn unix_wait_for_release(path: &Path) -> std::io::Result<()> {
use nix::fcntl::{FlockArg, flock};
use std::os::unix::io::AsRawFd;
let file = match std::fs::OpenOptions::new().read(true).open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e),
};
flock(file.as_raw_fd(), FlockArg::LockShared)
.map_err(|e| std::io::Error::other(format!("flock LOCK_SH: {e}")))?;
let _ = flock(file.as_raw_fd(), FlockArg::Unlock);
Ok(())
}
#[cfg(unix)]
fn unix_wait_for_acquire(path: &Path) -> std::io::Result<std::fs::File> {
use nix::fcntl::{FlockArg, flock};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.mode(0o644)
.open(path)?;
flock(file.as_raw_fd(), FlockArg::LockExclusive)
.map_err(|e| std::io::Error::other(format!("flock LOCK_EX: {e}")))?;
Ok(file)
}