#[cfg(target_os = "windows")]
extern crate winapi;
use super::*;
use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
#[cfg(not(target_os = "windows"))]
use std::os::unix::io::AsRawFd;
use lightning::util::persist::KVStorePersister;
use lightning::util::ser::Writeable;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
#[cfg(target_os = "windows")]
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
#[cfg(target_os = "windows")]
macro_rules! call {
($e: expr) => {
if $e != 0 {
return Ok(());
} else {
return Err(std::io::Error::last_os_error());
}
};
}
#[cfg(target_os = "windows")]
fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::WCHAR> {
path.as_ref().encode_wide().chain(Some(0)).collect()
}
pub struct FilesystemStore {
dest_dir: PathBuf,
locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
}
impl FilesystemStore {
pub(crate) fn new(mut dest_dir: PathBuf) -> Self {
dest_dir.push("fs_store");
let locks = Mutex::new(HashMap::new());
Self { dest_dir, locks }
}
}
impl KVStore for FilesystemStore {
type Reader = FilesystemReader;
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
let mut outer_lock = self.locks.lock().unwrap();
let lock_key = (namespace.to_string(), key.to_string());
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
let mut dest_file_path = self.dest_dir.clone();
dest_file_path.push(namespace);
dest_file_path.push(key);
FilesystemReader::new(dest_file_path, inner_lock_ref)
}
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
let mut outer_lock = self.locks.lock().unwrap();
let lock_key = (namespace.to_string(), key.to_string());
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
let _guard = inner_lock_ref.write().unwrap();
let mut dest_file_path = self.dest_dir.clone();
dest_file_path.push(namespace);
dest_file_path.push(key);
let parent_directory = dest_file_path
.parent()
.ok_or_else(|| {
let msg =
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
})?
.to_path_buf();
fs::create_dir_all(parent_directory.clone())?;
let mut tmp_file_path = dest_file_path.clone();
let mut rng = thread_rng();
let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let ext = format!("{}.tmp", rand_str);
tmp_file_path.set_extension(ext);
let mut tmp_file = fs::File::create(&tmp_file_path)?;
tmp_file.write_all(&buf)?;
tmp_file.sync_all()?;
#[cfg(not(target_os = "windows"))]
{
fs::rename(&tmp_file_path, &dest_file_path)?;
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory.clone())?;
unsafe {
libc::fsync(dir_file.as_raw_fd());
}
}
#[cfg(target_os = "windows")]
{
if dest_file_path.exists() {
unsafe {
winapi::um::winbase::ReplaceFileW(
path_to_windows_str(dest_file_path).as_ptr(),
path_to_windows_str(tmp_file_path).as_ptr(),
std::ptr::null(),
winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
)
};
} else {
call!(unsafe {
winapi::um::winbase::MoveFileExW(
path_to_windows_str(tmp_file_path).as_ptr(),
path_to_windows_str(dest_file_path).as_ptr(),
winapi::um::winbase::MOVEFILE_WRITE_THROUGH
| winapi::um::winbase::MOVEFILE_REPLACE_EXISTING,
)
});
}
}
Ok(())
}
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
let mut outer_lock = self.locks.lock().unwrap();
let lock_key = (namespace.to_string(), key.to_string());
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
let _guard = inner_lock_ref.write().unwrap();
let mut dest_file_path = self.dest_dir.clone();
dest_file_path.push(namespace);
dest_file_path.push(key);
if !dest_file_path.is_file() {
return Ok(false);
}
fs::remove_file(&dest_file_path)?;
#[cfg(not(target_os = "windows"))]
{
let parent_directory = dest_file_path.parent().ok_or_else(|| {
let msg =
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
})?;
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
unsafe {
libc::fsync(dir_file.as_raw_fd());
}
}
if dest_file_path.is_file() {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
}
if Arc::strong_count(&inner_lock_ref) == 2 {
outer_lock.remove(&lock_key);
}
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
Ok(true)
}
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
let mut prefixed_dest = self.dest_dir.clone();
prefixed_dest.push(namespace);
let mut keys = Vec::new();
if !Path::new(&prefixed_dest).exists() {
return Ok(Vec::new());
}
for entry in fs::read_dir(prefixed_dest.clone())? {
let entry = entry?;
let p = entry.path();
if !p.is_file() {
continue;
}
if let Some(ext) = p.extension() {
if ext == "tmp" {
continue;
}
}
if let Ok(relative_path) = p.strip_prefix(prefixed_dest.clone()) {
keys.push(relative_path.display().to_string())
}
}
Ok(keys)
}
}
pub struct FilesystemReader {
inner: BufReader<fs::File>,
lock_ref: Arc<RwLock<()>>,
}
impl FilesystemReader {
fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
let f = fs::File::open(dest_file_path.clone())?;
let inner = BufReader::new(f);
Ok(Self { inner, lock_ref })
}
}
impl Read for FilesystemReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let _guard = self.lock_ref.read().unwrap();
self.inner.read(buf)
}
}
impl KVStorePersister for FilesystemStore {
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?;
self.write(&namespace, &key, &object.encode())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::utils::random_storage_path;
use proptest::prelude::*;
proptest! {
#[test]
fn read_write_remove_list_persist(data in any::<[u8; 32]>()) {
let rand_dir = random_storage_path();
let fs_store = FilesystemStore::new(rand_dir.into());
do_read_write_remove_list_persist(&data, &fs_store);
}
}
}