use scoped_pool::Scope;
use fs::Fs;
use sparse::BLOCK_SIZE;
use {VolumeId, BlockIndex, Version, ContentId, Cache};
#[derive(Debug, Clone)]
pub enum FlushMessage {
Quit,
Flush(VolumeId, BlockIndex, Version)
}
#[derive(Copy, Clone)]
pub struct FlushPool<'fs, 'id: 'fs> {
fs: &'fs Fs<'id>
}
impl<'fs, 'id> FlushPool<'fs, 'id> {
pub fn new(fs: &'fs Fs<'id>) -> Self {
FlushPool {
fs: fs
}
}
pub fn run(self, threads: usize, scope: &Scope<'fs>) {
for _ in 0..threads {
scope.recurse(move |next| self.task(next));
}
}
fn task(self, scope: &Scope<'fs>) {
let mut sentinel = Sentinel::new(self, scope);
let local = self.fs.local();
let storage = self.fs.storage();
let channel = &local.flush;
loop {
let res: ::Result<bool> = (|| {
match channel.pop() {
FlushMessage::Quit => {
channel.push(FlushMessage::Quit);
return Ok(false)
},
FlushMessage::Flush(volume, block, version) => {
sentinel.activate(FlushMessage::Flush(volume.clone(),
block,
version.clone()));
let passed_version = version.load();
let current_version = local.version(&volume, block);
if current_version != Some(passed_version) { return Ok(true) }
let mut buffer: &mut [u8] = &mut [0; BLOCK_SIZE];
try!(local.read(&volume, block, 0, buffer));
let current_version = local.version(&volume, block);
if current_version != Some(passed_version) { return Ok(true) }
let content_id = ContentId::hash(&buffer);
try!(storage.create(content_id, buffer));
try!(local.complete_flush(&volume, block, content_id, version));
Ok(true)
}
}
})();
match res {
Ok(continue_) => {
sentinel.cancel();
if continue_ { continue } else { break }
},
Err(e) => error!("Encountered error during flushing: {:?}", e),
}
}
}
}
struct Sentinel<'fs: 'ctx, 'id: 'fs, 'ctx> {
pool: FlushPool<'fs, 'id>,
message: Option<FlushMessage>,
scope: &'ctx Scope<'fs>
}
impl<'fs, 'id, 'ctx> Sentinel<'fs, 'id, 'ctx> {
fn new(pool: FlushPool<'fs, 'id>, scope: &'ctx Scope<'fs>) -> Self {
Sentinel {
pool: pool,
message: None,
scope: scope
}
}
fn activate(&mut self, message: FlushMessage) {
self.message = Some(message);
}
fn cancel(&mut self) {
self.message.take();
}
}
impl<'fs, 'id, 'ctx> Drop for Sentinel<'fs, 'id, 'ctx> {
fn drop(&mut self) {
self.message.take().map(|message| {
self.pool.fs.local().flush.push(message);
let pool = self.pool.clone();
self.scope.recurse(move |next| pool.task(next));
});
}
}