1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
//! A persistent mutex implementation using the atomicity of [`OpenOptions::create_new`]
//!
//! Please note that this `Mutex` just really works if other processes in your system
//! are willing to "play nice" with you. In most systems (Unix-like), locks are mostly
//! advisory.
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, Write};
use std::path::{Path, PathBuf};
pub use crate::sync::FileGuard;
/// A persistent mutex implementation using the atomicity of [`OpenOptions::create_new`].
/// This structure, as opposed to [`FileGuard`], holds some content in a separate file.
pub struct Mutex {
path: PathBuf,
}
impl Mutex {
/// Opens a new mutex, given the path for a folder in which the mutex will be mounted.
/// This will create a new folder if one does not exist yet.
///
/// # Errors
///
/// This function fails if it cannot create the folder which is going to contain the
/// mutex.
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Mutex> {
fs::create_dir_all(&path)?;
Ok(Mutex {
path: path.as_ref().to_owned(),
})
}
/// Locks this mutex, awaiting for it to unlock if it is locked.
pub async fn lock(&self) -> io::Result<MutexGuard> {
let file_guard = FileGuard::lock(self.path.join("lock")).await?;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(self.path.join("contents"))?;
Ok(MutexGuard {
_file_guard: file_guard,
file,
})
}
/// Tries to lock this mutex, returning `None` if it is locked.
pub fn try_lock(&self) -> io::Result<Option<MutexGuard>> {
let file_guard = FileGuard::try_lock(self.path.join("lock"))?;
if let Some(file_guard) = file_guard {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(self.path.join("contents"))?;
Ok(Some(MutexGuard {
_file_guard: file_guard,
file,
}))
} else {
Ok(None)
}
}
}
/// A guard to the the [`Mutex`] when it is locked. This structure give access to
/// the contents of the mutex.
pub struct MutexGuard {
_file_guard: FileGuard,
file: File,
}
impl MutexGuard {
/// Reads all the contents of the content file into a vector.
pub fn read(&self) -> io::Result<Vec<u8>> {
(&self.file).seek(io::SeekFrom::Start(0))?;
(&self.file).bytes().collect::<io::Result<Vec<_>>>()
}
/// Writes some data to the content file, overwriting all the previous content.
pub fn write<D: AsRef<[u8]>>(&self, data: D) -> io::Result<()> {
(&self.file).seek(io::SeekFrom::Start(0))?;
self.file.set_len(0)?;
(&self.file).write_all(data.as_ref())?;
(&self.file).flush()
}
/// Gives direct access to the underlying content file.
pub fn file(&self) -> &File {
&self.file
}
}
// The drop order doesn't matter. Therefore, no `Drop` implementation.
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_mutex() {
futures::executor::block_on(async move {
let mutex = Mutex::open("data/mutex").unwrap();
let guard = mutex.lock().await.unwrap();
guard.write(b"some data").unwrap();
drop(guard);
let guard = mutex.lock().await.unwrap();
assert_eq!(
String::from_utf8_lossy(&*guard.read().unwrap()),
String::from_utf8_lossy(b"some data")
);
});
}
#[test]
fn test_mutex_longer_data_first() {
futures::executor::block_on(async move {
let mutex = Mutex::open("data/mutex-longer-data-first").unwrap();
let guard = mutex.lock().await.unwrap();
guard.write(b"some long data").unwrap();
drop(guard);
let guard = mutex.lock().await.unwrap();
guard.write(b"some data").unwrap();
drop(guard);
let guard = mutex.lock().await.unwrap();
assert_eq!(guard.read().unwrap(), b"some data");
});
}
}