#![allow(dead_code)]
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::thread;
use std::time::{Duration, Instant};
use fs2::FileExt;
use thiserror::Error;
use ulid::Ulid;
pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
const LOCK_POLL_INTERVAL: Duration = Duration::from_millis(25);
#[derive(Debug, Error)]
pub enum Error {
#[error("ULID lock {path} was not released within the acquire budget")]
LockTimeout { path: PathBuf },
#[error("ULID lock {path} I/O error: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("ULID lock {path} holds a malformed value: {value:?}")]
MalformedState { path: PathBuf, value: String },
}
pub fn allocate(lock_path: &Path) -> Result<Ulid, Error> {
allocate_with_timeout(lock_path, DEFAULT_LOCK_TIMEOUT)
}
pub fn allocate_with_timeout(lock_path: &Path, timeout: Duration) -> Result<Ulid, Error> {
if let Some(parent) = lock_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})?;
}
}
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(lock_path)
.map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})?;
acquire_lock(&file, lock_path, timeout)?;
let result = mint(&file, lock_path);
let _ = FileExt::unlock(&file);
result
}
fn acquire_lock(file: &File, lock_path: &Path, timeout: Duration) -> Result<(), Error> {
let contended_os = fs2::lock_contended_error().raw_os_error();
let deadline = Instant::now() + timeout;
loop {
if Instant::now() >= deadline {
return Err(Error::LockTimeout {
path: lock_path.to_path_buf(),
});
}
match FileExt::try_lock_exclusive(file) {
Ok(()) => return Ok(()),
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.raw_os_error() == contended_os =>
{
let remaining = deadline.saturating_duration_since(Instant::now());
thread::sleep(LOCK_POLL_INTERVAL.min(remaining));
}
Err(source) => {
return Err(Error::Io {
path: lock_path.to_path_buf(),
source,
});
}
}
}
}
fn mint(mut file: &File, lock_path: &Path) -> Result<Ulid, Error> {
let prior = read_state(&mut file, lock_path)?;
let candidate = Ulid::new();
let next = match prior {
Some(p) if candidate <= p => p.increment().ok_or_else(|| Error::MalformedState {
path: lock_path.to_path_buf(),
value: "ULID space exhausted at persisted value".to_string(),
})?,
_ => candidate,
};
write_state(&mut file, lock_path, next)?;
Ok(next)
}
fn read_state(file: &mut &File, lock_path: &Path) -> Result<Option<Ulid>, Error> {
let mut buf = String::new();
(*file)
.seek(SeekFrom::Start(0))
.map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})?;
(*file)
.read_to_string(&mut buf)
.map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})?;
let trimmed = buf.trim();
if trimmed.is_empty() {
return Ok(None);
}
Ulid::from_str(trimmed)
.map(Some)
.map_err(|_| Error::MalformedState {
path: lock_path.to_path_buf(),
value: trimmed.to_string(),
})
}
fn write_state(file: &mut &File, lock_path: &Path, ulid: Ulid) -> Result<(), Error> {
(*file)
.seek(SeekFrom::Start(0))
.map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})?;
let bytes = ulid.to_string();
debug_assert_eq!(bytes.len(), 26, "ULID string must be exactly 26 chars");
(*file)
.write_all(bytes.as_bytes())
.map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})?;
(*file).flush().map_err(|source| Error::Io {
path: lock_path.to_path_buf(),
source,
})
}