extern crate fnv;
extern crate num_cpus;
extern crate pi_async_file;
extern crate pi_hash;
#[macro_use]
extern crate lazy_static;
use async_lock::{Mutex, RwLock};
use pi_async_rt::lock::spin_lock::SpinLock;
use pi_async_rt::rt::multi_thread::{MultiTaskRuntime, MultiTaskRuntimeBuilder, StealableTaskPool};
use pi_async_file::file::{AsyncFile, AsyncFileOptions, WriteOptions};
use pi_hash::XHashMap;
use std::collections::hash_map::Entry;
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::io::Result;
use std::ops::Deref;
use std::{
env,
path::{Path, PathBuf},
sync::Arc,
sync::Weak,
};
lazy_static! {
pub static ref FILE_RUNTIME: MultiTaskRuntime<()> = {
let count = match env::var("_ver") {
Ok(r) => usize::from_str_radix(r.as_str(), 10).unwrap(),
_ => num_cpus::get()
};
let pool = StealableTaskPool::with(count, 100000, [1,1], 3000);
let builder = MultiTaskRuntimeBuilder::new(pool)
.thread_prefix("File-Runtime")
.thread_stack_size(1024 * 1024)
.init_worker_size(count)
.set_worker_limit(count, count)
.set_timeout(10)
.set_timer_interval(10);
builder.build()
};
static ref OPEN_FILE_MAP: Table = Table(Mutex::new(XHashMap::default()));
}
struct Table(Mutex<XHashMap<PathBuf, Weak<InnerSafeFile>>>);
#[derive(Debug, Clone)]
pub struct SafeFile(Arc<InnerSafeFile>);
impl Deref for SafeFile {
type Target = AsyncFile<()>;
#[inline(always)]
fn deref(&self) -> &AsyncFile<()> {
&(*self.0).file
}
}
enum LockType {
Rw(RwLock<()>),
Lock(Mutex<()>),
}
struct InnerSafeFile {
file: AsyncFile<()>,
lock: LockType,
buff: SpinLock<(Arc<[u8]>, usize)>,
}
impl Debug for InnerSafeFile {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "{:?}", self.file)
}
}
impl InnerSafeFile {
fn new(file: AsyncFile<()>, lock: LockType) -> Self {
let vec = Vec::new();
InnerSafeFile {
file,
lock,
buff: SpinLock::new((Arc::from(&vec[..]), 0)),
}
}
}
impl SafeFile {
pub async fn open<P>(path: P, options: AsyncFileOptions) -> Result<Self>
where
P: AsRef<Path> + Send + 'static,
{
let path = path.as_ref().to_path_buf();
{
let tab = OPEN_FILE_MAP.0.lock().await;
match tab.get(&path) {
Some(r) => match r.upgrade() {
Some(rr) => return Ok(SafeFile(rr)),
_ => (),
},
_ => (),
}
}
let lock = match options {
AsyncFileOptions::TruncateWrite => LockType::Lock(Mutex::new(())),
_ => LockType::Rw(RwLock::new(())),
};
let file = match AsyncFile::open(FILE_RUNTIME.clone(), path.clone(), options).await {
Ok(file) => Arc::new(InnerSafeFile::new(file, lock)),
Err(r) => return Err(r),
};
let mut tab = OPEN_FILE_MAP.0.lock().await;
match tab.entry(path) {
Entry::Occupied(mut e) => match e.get().upgrade() {
Some(rr) => return Ok(SafeFile(rr)),
_ => {
e.insert(Arc::downgrade(&file));
Ok(SafeFile(file))
}
},
Entry::Vacant(e) => {
e.insert(Arc::downgrade(&file));
Ok(SafeFile(file))
}
}
}
pub async fn read(&self, pos: u64, len: usize) -> Result<Vec<u8>> {
if len == 0 {
return Ok(Vec::with_capacity(0));
}
match self.0.lock {
LockType::Lock(ref lock) => {
let data = {
let lock = self.0.buff.lock();
lock.0.clone()
};
lock.lock().await;
if data.len() > 0 {
Ok(Vec::from([])) } else {
match self.0.file.read(pos, len).await {
Ok(r) => {
let mut lock = self.0.buff.lock();
lock.0 = Arc::from(&r[..]);
Ok(r)
}
Err(r) => Err(r),
}
}
}
LockType::Rw(ref lock) => {
lock.read().await;
self.0.file.read(pos, len).await
}
}
}
pub async fn write(&self, pos: u64, buf: Arc<[u8]>, options: WriteOptions) -> Result<usize> {
if buf.len() == 0 {
return Ok(0);
}
match self.0.lock {
LockType::Lock(ref lock) => {
{
let mut lock = self.0.buff.lock();
lock.0 = buf;
lock.1 += 1;
lock.1
};
lock.lock().await;
let data_ver = {
let lock = self.0.buff.lock();
(lock.0.clone(), lock.1)
};
if data_ver.1 == 0 {
Ok(data_ver.0.len())
} else {
match self.0.file.write(pos, data_ver.0, options).await {
Ok(r) => {
let mut lock = self.0.buff.lock();
if lock.1 == data_ver.1 {
lock.1 = 0;
}
Ok(r)
}
Err(r) => Err(r),
}
}
}
LockType::Rw(ref lock) => {
lock.write().await;
self.0.file.write(pos, buf, options).await
}
}
}
}
pub async fn open<P>(path: P, options: AsyncFileOptions) -> Result<AsyncFile<()>>
where
P: AsRef<Path> + Send + 'static,
{
AsyncFile::open(FILE_RUNTIME.clone(), path, options).await
}
pub async fn create_dir<P>(path: P) -> Result<()>
where
P: AsRef<Path> + Send + 'static,
{
pi_async_file::file::create_dir(FILE_RUNTIME.clone(), path).await
}
pub async fn remove_file<P>(path: P) -> Result<()>
where
P: AsRef<Path> + Send + 'static,
{
pi_async_file::file::remove_file(FILE_RUNTIME.clone(), path).await
}
pub async fn remove_dir<P>(path: P) -> Result<()>
where
P: AsRef<Path> + Send + 'static,
{
pi_async_file::file::remove_dir(FILE_RUNTIME.clone(), path).await
}
pub async fn rename<P>(from: P, to: P) -> Result<()>
where
P: AsRef<Path> + Send + 'static,
{
pi_async_file::file::rename(FILE_RUNTIME.clone(), from, to).await
}
pub async fn copy_file<P>(from: P, to: P) -> Result<u64>
where
P: AsRef<Path> + Send + 'static,
{
pi_async_file::file::copy_file(FILE_RUNTIME.clone(), from, to).await
}
pub async fn remove_dir_all<P>(path: P) -> Result<()>
where
P: AsRef<Path> + Send + 'static,
{
pi_async_file::file::remove_dir(FILE_RUNTIME.clone(), path).await
}
pub async fn collect() {}