use crate::prelude::*;
use crate::util::retry_interrupted;
use auto_impl::auto_impl;
use fs2::FileExt;
use ring::digest;
use std::fs::{self, File};
use std::io::SeekFrom;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::{Path, PathBuf};
const DIR_NEST_DEPTH: usize = 3;
fn bytes_to_path_suffix(bytes: &[u8]) -> PathBuf {
let mut path = PathBuf::new();
let enc = data_encoding::BASE64URL_NOPAD.encode(&bytes);
for i in 0..DIR_NEST_DEPTH {
path.push(&enc[i..i + 1]);
}
path.push(&enc[DIR_NEST_DEPTH..]);
path
}
#[auto_impl(&)]
pub trait PathKey {
fn key(&self) -> PathBuf;
}
impl PathKey for [u8] {
fn key(&self) -> PathBuf {
let scrambled_key = digest::digest(&digest::SHA256, self);
bytes_to_path_suffix(scrambled_key.as_ref())
}
}
impl PathKey for ArtifactHash {
fn key(&self) -> PathBuf {
let mut path = PathBuf::new();
path.push(&self.mode);
path.push(bytes_to_path_suffix(&self.raw_data));
path
}
}
enum LockMode {
Lock,
IfExists,
}
fn lock(path: &Path, mode: LockMode) -> Result<File> {
let mut lock_path = path.to_path_buf();
let mut basename = lock_path.file_name().unwrap().to_os_string();
basename.push(".lock");
lock_path.set_file_name(basename);
let mut open_options = fs::OpenOptions::new();
open_options.append(true);
match mode {
LockMode::Lock => {
let dir = lock_path.parent().unwrap();
fs::create_dir_all(dir).wrap_err_with(|| {
format!("Failed to create directory {}", dir.display())
})?;
open_options.create(true);
}
LockMode::IfExists => {
}
};
let lock = open_options.open(&lock_path)?;
retry_interrupted(|| lock.lock_exclusive())?;
Ok(lock)
}
#[derive(Debug)]
pub struct KVFileStore {
base: PathBuf,
tmp: PathBuf,
}
impl KVFileStore {
pub fn new(base: &Path) -> Result<KVFileStore> {
let tmp = base.join("tmp");
fs::create_dir_all(&base)?;
fs::create_dir_all(&tmp)?;
Ok(KVFileStore {
base: base.into(),
tmp,
})
}
pub fn get_or_set<K: PathKey, F>(
&self,
key: &K,
f: F,
) -> Result<Box<dyn ReadPlusSeek>>
where
F: FnOnce(&mut dyn Write) -> Result<()>,
{
let handle = self.lock(key)?;
if let Some(reader) = handle.reader() {
Ok(Box::new(reader.detach_unlocked()))
} else {
let mut writer = handle.begin()?;
f(&mut writer)?;
Ok(Box::new(writer.commit()?.detach_unlocked()))
}
}
pub fn get<K: PathKey>(&self, key: &K) -> Option<Box<dyn ReadPlusSeek>> {
if let Some(handle) = self.lock_if_exists(key) {
if let Some(reader) = handle.reader() {
return Some(Box::new(reader.detach_unlocked()));
}
}
return None;
}
pub fn lock<K: PathKey>(&self, key: &K) -> Result<KVFileLock> {
let path = self.base.join(key.key());
let lock = lock(&path, LockMode::Lock)?;
Ok(KVFileLock {
tmp: self.tmp.clone(),
_lock: lock,
path,
})
}
pub fn lock_if_exists<K: PathKey>(&self, key: &K) -> Option<KVFileLock> {
let path = self.base.join(key.key());
if let Ok(lock) = lock(&path, LockMode::IfExists) {
Some(KVFileLock {
tmp: self.tmp.clone(),
_lock: lock,
path,
})
} else {
None
}
}
}
pub struct KVFileLock {
tmp: PathBuf,
_lock: File,
path: PathBuf,
}
impl KVFileLock {
pub fn reader<'a>(&'a self) -> Option<LockedRead<'a>> {
Some(LockedRead {
f: File::open(&self.path).ok()?,
_lifetime: Default::default(),
})
}
pub fn begin<'a>(&'a self) -> Result<LockedWrite<'a>> {
Ok(LockedWrite {
path: &self.path,
f: tempfile::NamedTempFile::new_in(&self.tmp)?,
_lifetime: Default::default(),
})
}
pub fn remove(self) -> Result<()> {
fs::remove_file(self.path)?;
Ok(())
}
}
pub struct LockedRead<'a> {
f: File,
_lifetime: PhantomData<&'a ()>,
}
impl<'a> Read for LockedRead<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.f.read(buf)
}
}
impl<'a> Seek for LockedRead<'a> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.f.seek(pos)
}
}
impl<'a> LockedRead<'a> {
pub fn detach_unlocked(self) -> File {
self.f
}
}
pub struct LockedWrite<'a> {
path: &'a Path,
f: tempfile::NamedTempFile,
_lifetime: PhantomData<&'a ()>,
}
impl<'a> Write for LockedWrite<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.f.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.f.flush()
}
}
impl<'a> Seek for LockedWrite<'a> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.f.seek(pos)
}
}
impl<'a> LockedWrite<'a> {
pub fn commit(self) -> Result<LockedRead<'a>> {
self.f.as_file().sync_data()?;
let mut f = self.f.persist(&self.path)?;
f.rewind()?;
Ok(LockedRead {
f,
_lifetime: self._lifetime,
})
}
}
pub struct KVDirStore {
base: PathBuf,
tmp: PathBuf,
}
impl KVDirStore {
pub fn new(base: &Path) -> Result<KVDirStore> {
let tmp = base.join("tmp");
fs::create_dir_all(&base)?;
fs::create_dir_all(&tmp)?;
Ok(KVDirStore {
base: base.into(),
tmp,
})
}
pub fn lock<K: PathKey>(&self, key: &K) -> Result<KVDirLock> {
let path = self.base.join(key.key());
let lock = lock(&path, LockMode::Lock)?;
Ok(KVDirLock {
tmp: self.tmp.clone(),
_lock: lock,
path,
})
}
pub fn get_or_set<K, F>(&self, key: &K, f: F) -> Result<PathBuf>
where
K: PathKey,
F: FnOnce(&Path) -> Result<()>,
{
let lock = self.lock(&key)?;
if !lock.exists() {
let tmp = lock.tempdir()?;
f(tmp.as_ref())?;
fs::rename(&tmp.into_path(), &*lock)?;
}
Ok(lock.path)
}
}
pub struct KVDirLock {
tmp: PathBuf,
_lock: File,
path: PathBuf,
}
impl KVDirLock {
pub fn tempdir(&self) -> Result<tempfile::TempDir> {
Ok(tempfile::tempdir_in(&self.tmp)?)
}
}
impl Deref for KVDirLock {
type Target = Path;
fn deref(&self) -> &Self::Target {
self.path.deref()
}
}
impl AsRef<Path> for KVDirLock {
fn as_ref(&self) -> &Path {
&self.path
}
}