use page::{Page, PAGE_SIZE};
use pagedfile::PagedFile;
use error::HammersbaldError;
use pref::PRef;
use std::sync::{Mutex, Arc, Condvar};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::collections::VecDeque;
pub struct AsyncFile {
inner: Arc<AsyncFileInner>
}
struct AsyncFileInner {
file: Mutex<Box<PagedFile + Send + Sync>>,
work: Condvar,
flushed: Condvar,
run: AtomicBool,
queue: Mutex<VecDeque<Page>>
}
impl AsyncFileInner {
pub fn new (file: Box<PagedFile + Send + Sync>) -> Result<AsyncFileInner, HammersbaldError> {
Ok(AsyncFileInner { file: Mutex::new(file), flushed: Condvar::new(), work: Condvar::new(),
run: AtomicBool::new(true),
queue: Mutex::new(VecDeque::new())})
}
}
impl AsyncFile {
pub fn new (file: Box<PagedFile + Send + Sync>) -> Result<AsyncFile, HammersbaldError> {
let inner = Arc::new(AsyncFileInner::new(file)?);
let inner2 = inner.clone();
thread::spawn(move || { AsyncFile::background(inner2) });
Ok(AsyncFile { inner })
}
fn background (inner: Arc<AsyncFileInner>) {
let mut queue = inner.queue.lock().expect("page queue lock poisoned");
while inner.run.load(Ordering::Acquire) {
while queue.is_empty() {
queue = inner.work.wait(queue).expect("page queue lock poisoned");
}
let mut file = inner.file.lock().expect("file lock poisoned");
while let Some(page) = queue.pop_front() {
file.append_page(page).expect("can not extend data file");
}
inner.flushed.notify_all();
}
}
fn read_in_queue (&self, pref: PRef) -> Result<Option<Page>, HammersbaldError> {
let queue = self.inner.queue.lock().expect("page queue lock poisoned");
if queue.len () > 0 {
let file = self.inner.file.lock().expect("file lock poisoned");
let len = file.len()?;
if pref.as_u64() >= len {
let index = ((pref.as_u64() - len) / PAGE_SIZE as u64) as usize;
if index < queue.len() {
let page = queue[index].clone();
return Ok(Some(page));
}
}
}
Ok(None)
}
}
impl PagedFile for AsyncFile {
fn read_page(&self, pref: PRef) -> Result<Option<Page>, HammersbaldError> {
if let Some(page) = self.read_in_queue(pref)? {
return Ok(Some(page));
}
self.inner.file.lock().unwrap().read_page(pref)
}
fn len(&self) -> Result<u64, HammersbaldError> {
self.inner.file.lock().unwrap().len()
}
fn truncate(&mut self, new_len: u64) -> Result<(), HammersbaldError> {
self.inner.file.lock().unwrap().truncate(new_len)
}
fn sync(&self) -> Result<(), HammersbaldError> {
self.inner.file.lock().unwrap().sync()
}
fn shutdown (&mut self) {
let mut queue = self.inner.queue.lock().unwrap();
self.inner.work.notify_one();
while !queue.is_empty() {
queue = self.inner.flushed.wait(queue).unwrap();
}
let mut file = self.inner.file.lock().unwrap();
file.flush().unwrap();
self.inner.run.store(false, Ordering::Release)
}
fn append_page(&mut self, page: Page) -> Result<(), HammersbaldError> {
let mut queue = self.inner.queue.lock().unwrap();
queue.push_back(page);
self.inner.work.notify_one();
Ok(())
}
fn update_page(&mut self, _: Page) -> Result<u64, HammersbaldError> {
unimplemented!()
}
fn flush(&mut self) -> Result<(), HammersbaldError> {
let mut queue = self.inner.queue.lock().unwrap();
self.inner.work.notify_one();
while !queue.is_empty() {
queue = self.inner.flushed.wait(queue).unwrap();
}
let mut file = self.inner.file.lock().unwrap();
file.flush()
}
}