use std::{
io::{Error, ErrorKind},
marker::PhantomData,
path::Path,
};
use tokio::{
fs::{File, OpenOptions},
io::AsyncWriteExt,
time::sleep_until,
};
pub type Result<T> = std::result::Result<T, std::io::Error>;
pub const FLAG_LOCKED: u64 = 77;
pub const FLAG_UNLOCKED: u64 = 88;
pub(crate) async fn open_rw(path: impl AsRef<str>) -> Result<File> {
let exists = Path::new(path.as_ref()).exists();
OpenOptions::new()
.write(true)
.read(true)
.create(!exists)
.open(path.as_ref())
.await
}
pub enum LockState {
Unlocked,
Locked,
NotALock,
}
impl LockState {
pub const fn from_const(constant: u64) -> Self {
match constant {
FLAG_LOCKED => Self::Locked,
FLAG_UNLOCKED => Self::Unlocked,
_ => Self::NotALock,
}
}
pub const fn locked(&self) -> bool {
match self {
Self::Locked => true,
Self::Unlocked => false,
Self::NotALock => false,
}
}
}
pub struct Lock<'lock> {
parent: &'lock LockFile<'lock>,
}
impl<'lock> Lock<'lock> {
pub async fn new(parent: &'lock LockFile<'lock>) -> Result<Lock<'lock>> {
let is_ok = parent.try_lock().await?;
if !is_ok {
return Err(Error::new(ErrorKind::Other, "The file is already locked."));
}
Ok(Self { parent })
}
}
impl<'lock> Drop for Lock<'lock> {
fn drop(&mut self) {
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&self.parent._path);
if let Err(e) = file {
panic!(
"failed to open lockfile to unlock it in Lock::drop(): {} (file: {})",
e, self.parent._path,
);
}
let file = file.unwrap();
let _ = file.sync_all();
match file.set_len(FLAG_UNLOCKED) {
Ok(_) => {}
Err(e) => panic!("failed to unlock file by `file.set_len(...)`: {}", e),
};
}
}
pub struct LockFile<'a> {
_path: String,
_fh: tokio::fs::File,
_phantom: PhantomData<&'a ()>,
}
impl<'a> LockFile<'a> {
pub async fn temp(name: impl AsRef<str>) -> Result<LockFile<'a>> {
let true_path = {
let tmp_dir = temporary_directory();
format!("{}{}.lock", tmp_dir, name.as_ref())
};
let file_handle = open_rw(&true_path).await?;
file_handle.set_len(FLAG_UNLOCKED).await?;
file_handle.sync_all().await?;
Ok(Self {
_path: true_path,
_fh: file_handle,
_phantom: PhantomData,
})
}
pub async fn at(path: impl AsRef<Path>, name: impl AsRef<str>) -> Result<LockFile<'a>> {
let path = format!("{}{}.lock", path.as_ref().display(), name.as_ref());
let file_handle = open_rw(&path).await?;
file_handle.sync_all().await?;
Ok(Self {
_path: path,
_fh: file_handle,
_phantom: PhantomData,
})
}
pub fn exists(identifier: impl AsRef<str>) -> bool {
let path = format!("{}{}.lock", temporary_directory(), identifier.as_ref());
Path::new(&path).exists()
}
pub async fn is_locked(&self) -> Result<bool> {
self._fh.sync_all().await?;
let metadata = self._fh.metadata().await?;
Ok(metadata.len() == FLAG_LOCKED)
}
pub fn path(&self) -> &String {
&self._path
}
pub async fn try_lock(&self) -> Result<bool> {
let metadata = self._fh.metadata().await?;
if metadata.len() == FLAG_LOCKED {
return Ok(false);
}
self._fh.set_len(FLAG_LOCKED).await?;
self._fh.sync_all().await?;
Ok(true)
}
pub async fn lock(&'a self) -> Result<Lock<'a>> {
self._fh.sync_all().await?;
let lock = Lock::new(self).await?;
Ok(lock)
}
pub async fn unlock(&self) -> Result<()> {
self._fh.sync_all().await?;
let metadata = self._fh.metadata().await?;
if metadata.len() != FLAG_LOCKED {
return Err(Error::new(ErrorKind::Other, "the file is not locked."));
}
self._fh.set_len(FLAG_UNLOCKED).await?;
self._fh.sync_all().await?;
Ok(())
}
pub async fn unlock_arbitrary_file(fully_qual_path: &str) -> Result<()> {
let path = Path::new(fully_qual_path);
if !path.exists() {
return Err(Error::new(
ErrorKind::NotFound,
"the specified file to lock does not exist.",
));
}
let file = tokio::fs::File::open(fully_qual_path).await?;
file.set_len(FLAG_UNLOCKED).await?;
Ok(())
}
pub fn underlying_file(&'a self) -> &'a tokio::fs::File {
&self._fh
}
pub async fn state(&self) -> Result<LockState> {
self.underlying_file().sync_all().await?;
let metadata = self.underlying_file().metadata().await?;
Ok(LockState::from_const(metadata.len()))
}
pub async fn lock_arbitrary_file(fully_qual_path: &str) -> Result<()> {
let path = Path::new(fully_qual_path);
if !path.exists() {
return Err(Error::new(
ErrorKind::NotFound,
"the specified file to lock does not exist.",
));
}
let file = tokio::fs::File::open(&path).await?;
let metadata = file.metadata().await?;
if metadata.len() == FLAG_LOCKED {
return Err(Error::new(
ErrorKind::WouldBlock,
"the lockfile is currently locked.",
));
}
file.set_len(FLAG_LOCKED).await?;
Ok(())
}
}
pub struct NamedPipe<'a> {
_pipe: File,
_pipe_path: String,
_lock: LockFile<'a>,
}
pub fn temporary_directory() -> String {
#[cfg(not(windows))]
return String::from("/tmp/");
#[cfg(windows)]
{
let user_name =
std::env::var("USERPROFILE").expect("could not get user profile on windows.");
return format!("{}\\AppData\\Local\\Temp\\");
}
#[cfg(not(windows))]
#[cfg(not(not(windows)))]
compile_error!("not sure how to compile the path for this platform.");
}
pub enum WaitResult {
Written { count: u64 },
TimeoutHit,
}
pub type Duration = tokio::time::Duration;
pub type Instant = std::time::Instant;
impl<'a> NamedPipe<'a> {
pub async fn connect(identifier: impl AsRef<str>) -> Result<NamedPipe<'a>> {
let lock = LockFile::temp(&identifier).await?;
let full_path = format!("{}{}.pipe.v1", temporary_directory(), identifier.as_ref());
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(&full_path)
.await?;
Ok(Self {
_pipe: file,
_pipe_path: full_path,
_lock: lock,
})
}
pub async fn write(&mut self, data: &[u8]) -> Result<u64> {
let state = self._lock.state().await?;
if state.locked() {
return Err(Error::new(
ErrorKind::Other,
"cannot write, the file is locked.",
));
}
let _lock = self._lock.lock().await?;
self._pipe.write_all(data).await?;
Ok(data.len() as u64)
}
pub async fn write_timeout(&'a mut self, data: &[u8], timeout: Instant) -> Result<WaitResult> {
if self._lock.state().await?.locked() {
let _ = sleep_until(timeout.into()).await;
if self._lock.state().await?.locked() {
return Ok(WaitResult::TimeoutHit);
}
}
let count = self.write(data).await?;
Ok(WaitResult::Written { count })
}
pub async fn read_all(&mut self, buffer: &mut [u8]) -> Result<u64> {
let data = self.read_to_string().await?;
let mut pos = 0;
if data.len() > buffer.len() {
while pos < buffer.len() {
if let Some(ch) = data.chars().nth(pos) {
buffer[pos] = ch as u8;
}
pos += 1;
}
} else {
while pos < data.len() {
if let Some(ch) = data.chars().nth(pos) {
buffer[pos] = ch as u8;
}
}
}
Ok(pos as u64)
}
pub fn path(&'a self) -> &'a Path {
Path::new(&self._pipe_path)
}
pub async fn read_buf(&mut self, buffer: &mut Vec<u8>) -> Result<u64> {
let data = self.read_to_string().await?;
for ch in data.chars() {
buffer.push(ch as u8);
}
Ok(data.len() as u64)
}
pub async fn read_to_string(&self) -> Result<String> {
let path = Path::new(&self._pipe_path);
if !path.exists() {
return Err(Error::new(
ErrorKind::NotFound,
"named pipe has been deleted.",
));
}
tokio::fs::read_to_string(&path).await
}
}
impl<'a> From<NamedPipe<'a>> for LockFile<'a> {
fn from(value: NamedPipe<'a>) -> Self {
value._lock
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_create_lockfile() -> Result<()> {
LockFile::temp("shared_resource").await?;
Ok(())
}
#[tokio::test]
async fn test_lockfile_lock() -> Result<()> {
let resource = LockFile::temp("shared_resource_2").await?;
let lock = resource.lock().await?;
assert!(resource.is_locked().await?);
drop(lock);
assert!(!resource.is_locked().await?);
Ok(())
}
#[tokio::test]
async fn test_lockfile_multiple_opens() -> Result<()> {
let resource = LockFile::temp("shared_resource_3").await?;
let _lock = resource.lock().await?;
assert!(resource.is_locked().await?);
let faulty_lock = resource.lock().await;
assert!(faulty_lock.is_err());
Ok(())
}
#[tokio::test]
async fn test_shared_pipe_write() -> Result<()> {
let data = b"hello, world!";
let mut pipe = NamedPipe::connect("shared_resource").await?;
let count = pipe.write(data).await?;
assert!(count == 13, "count was {}", count);
Ok(())
}
#[tokio::test]
async fn test_shared_pipe_write_read() -> Result<()> {
let data = b"other data!";
let mut pipe = NamedPipe::connect("shared_rw").await?;
let amount_written = pipe.write(data).await?;
eprintln!("written {amount_written} bytes");
let data = pipe.read_to_string().await?;
assert!(data.len() == 11, "data length is invalid. {}", data.len());
assert_eq!(data, "other data!");
Ok(())
}
}