use scoped_pool::{Pool, Scope};
use std::io::Write;
use std::iter;
use local::{LocalFs, IoResult, Options};
use sparse::BLOCK_SIZE;
use {Storage, Cache, VolumeId, VolumeName, VolumeMetadata, BlockIndex};
pub struct Fs<'id> {
storage: Box<Storage>,
caches: Vec<Box<Cache>>,
local: LocalFs<'id>
}
impl<'id> Fs<'id> {
pub fn new(storage: Box<Storage>,
caches: Vec<Box<Cache>>,
local: LocalFs<'id>) -> Self {
Fs {
storage: storage,
caches: caches,
local: local
}
}
pub fn run<F, R>(threads: usize, local: Options, storage: Box<Storage>,
caches: Vec<Box<Cache>>, fun: F) -> ::Result<R>
where F: for<'fs> FnOnce(&'fs Fs<'id>, &Scope<'fs>) -> R {
debug!("Running new Fs with: threads={:?}, options={:?}", threads, local);
let pool = Pool::new(threads);
let localfs = try!(LocalFs::new(local));
let fs = &Fs::new(storage, caches, localfs);
defer!(pool.shutdown());
pool.scoped(move |scope| {
defer!(fs.shutdown());
try!(fs.init(scope));
debug!("Initialized Fs, running callback.");
Ok(scope.zoom(|scope| fun(fs, scope)))
})
}
pub fn init<'fs>(&'fs self, scope: &Scope<'fs>) -> ::Result<()> {
debug!("Initializing Fs threads.");
self.local().init(self, scope)
}
pub fn create(&self, name: &VolumeName, metadata: VolumeMetadata) -> ::Result<VolumeId> {
debug!("Creating new volume with: name={:?}, metadata={:?}", name, metadata);
self.local.create(name.clone(), metadata)
}
pub fn fork(&self, original: &VolumeName) -> ::Result<VolumeId> {
debug!("Forking volume with: original name={:?}", original);
let snapshot = try!(self.storage().get_snapshot(original));
debug!("Got snapshot for existing volume: {:?}", snapshot);
let vol_id = try!(self.local.open(original.clone(), snapshot));
debug!("Forked volume {:?} to id {:?}", original, vol_id);
Ok(vol_id)
}
pub fn read(&self, volume: &VolumeId, block: BlockIndex,
offset: usize, mut buffer: &mut [u8]) -> ::Result<()> {
debug!("Reading block with: volume={:?}, block={:?}, offset={:?}",
volume, block, offset);
let ioresult = try!(self.local.read(volume, block, offset, buffer));
debug!("Local read of {:?}/{:?} resulted in {:?}", volume, block, ioresult);
match ioresult {
IoResult::Complete => {
debug!("Read of {:?}/{:?} complete!", volume, block);
Ok(())
},
IoResult::Reserved(id) => {
let read_buffer: &mut [u8] = &mut [0; BLOCK_SIZE];
self.caches.iter().map(|c| &**c)
.chain(iter::once(&self.storage as &Cache))
.fold(Err(::Error::NotFound), |res, cache| {
debug!("Got {:?}", res);
res.or_else(|_| {
debug!("Trying {:?}", cache);
cache.read(id, read_buffer)
})
}).and_then(|_| {
debug!("Read data from cache, writing back.");
self.local.write_immutable(id, read_buffer)
}).and_then(|_| {
debug!("Wrote data locally, copying to user buffer.");
Ok(try!(buffer.write(&read_buffer[offset..]).map(|_| ())))
})
}
}
}
pub fn write(&self, volume: &VolumeId, block: BlockIndex,
offset: usize, data: &[u8]) -> ::Result<()> {
match try!(self.local.write_mutable(volume, block, offset, data)) {
IoResult::Complete => Ok(()),
IoResult::Reserved(id) => {
let read_buffer = &mut [0; BLOCK_SIZE];
self.caches.iter().map(|c| &**c)
.chain(iter::once(&self.storage as &Cache))
.fold(Err(::Error::NotFound), |res, cache| {
res.or_else(|_| cache.read(id, read_buffer))
}).and_then(|_| {
self.local.finish_mutable_write(volume, block,
read_buffer,
offset, data)
})
}
}
}
pub fn shutdown(&self) {
self.local().shutdown();
}
pub fn snapshot(&self, id: &VolumeId, name: VolumeName) -> ::Result<()> {
self.local().snapshot(id)
.and_then(|snapshot| self.storage.snapshot(&name, snapshot))
}
pub fn local(&self) -> &LocalFs<'id> { &self.local }
pub fn storage(&self) -> &Storage { &self.storage }
}