use std::fs;
use std::fs::File;
use std::fs::TryLockError;
use std::io;
use std::io::Read;
use std::io::Write;
use std::ops::Deref;
use std::ops::DerefMut;
use std::path::Path;
use anyhow::Context;
use anyhow::Result;
use tokio::task::spawn_blocking;
use tracing::info;
pub struct LockedFile(File);
impl LockedFile {
pub async fn acquire_shared(path: impl AsRef<Path>, create: bool) -> Result<Option<Self>> {
let path = path.as_ref();
let file = if create {
let mut options = fs::OpenOptions::new();
options.create(true).write(true);
options.open(path).with_context(|| {
format!(
"failed to create call cache entry file `{path}`",
path = path.display()
)
})?
} else {
match fs::File::open(path)
.map(Some)
.or_else(|e| {
if e.kind() == io::ErrorKind::NotFound {
Ok(None)
} else {
Err(e)
}
})
.with_context(|| {
format!(
"failed to open call cache entry file `{path}`",
path = path.display()
)
})? {
Some(file) => file,
None => return Ok(None),
}
};
match file.try_lock_shared() {
Ok(_) => Ok(Some(Self(file))),
Err(TryLockError::WouldBlock) => {
let path = path.to_path_buf();
spawn_blocking(move || {
info!(
"waiting to acquire shared lock on cache entry file `{path}`",
path = path.display()
);
file.lock_shared().with_context(|| {
format!(
"failed to acquire shared lock on cache entry file `{path}`",
path = path.display()
)
})?;
Ok(Some(Self(file)))
})
.await
.context("failed to join lock task")?
}
Err(TryLockError::Error(e)) => Err(e).with_context(|| {
format!(
"failed to acquire shared lock on cache entry file `{path}`",
path = path.display()
)
}),
}
}
pub async fn acquire_exclusive_truncated(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let mut options = fs::OpenOptions::new();
options.create(true).write(true);
let file = options.open(path).with_context(|| {
format!(
"failed to create call cache entry file `{path}`",
path = path.display()
)
})?;
let file = match file.try_lock() {
Ok(_) => Ok(Self(file)),
Err(TryLockError::WouldBlock) => {
let path = path.to_path_buf();
spawn_blocking(move || {
info!(
"waiting to acquire exclusive lock on cache entry file `{path}`",
path = path.display()
);
file.lock().with_context(|| {
format!(
"failed to acquire exclusive lock on cache entry file `{path}`",
path = path.display()
)
})?;
Ok(Self(file))
})
.await
.context("failed to join lock task")?
}
Err(TryLockError::Error(e)) => Err(e).with_context(|| {
format!(
"failed to acquire exclusive lock on cache entry file `{path}`",
path = path.display()
)
}),
}?;
file.set_len(0).with_context(|| {
format!(
"failed to truncate cache entry file `{path}`",
path = path.display()
)
})?;
Ok(file)
}
}
impl Deref for LockedFile {
type Target = File;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for LockedFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Read for LockedFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl Write for LockedFile {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
#[cfg(test)]
mod test {
use tempfile::NamedTempFile;
use super::*;
#[tokio::test]
async fn acquire_shared_no_create() {
assert!(
LockedFile::acquire_shared("does-not-exist", false)
.await
.unwrap()
.is_none(),
"should not file"
);
}
#[tokio::test]
async fn acquire_shared() {
let file = NamedTempFile::new().unwrap();
let _first = LockedFile::acquire_shared(file.path(), true)
.await
.unwrap()
.expect("should have locked file");
let _second = LockedFile::acquire_shared(file.path(), true)
.await
.unwrap()
.expect("should have locked file");
}
#[tokio::test]
async fn acquire_exclusive() {
let file = NamedTempFile::new().unwrap();
let _exclusive = LockedFile::acquire_exclusive_truncated(file.path())
.await
.unwrap();
assert!(matches!(
file.as_file().try_lock_shared(),
Err(TryLockError::WouldBlock)
));
}
}