use futures::{Async, Future, Poll};
use std::sync::Arc;
pub use self::builder::DeviceBuilder;
pub use self::long_queue_policy::LongQueuePolicy;
pub use self::request::DeviceRequest;
pub(crate) use self::command::Command;
use self::thread::{DeviceThreadHandle, DeviceThreadMonitor};
use deadline::Deadline;
use lump::{LumpData, LumpId};
use metrics::DeviceMetrics;
use nvm::NonVolatileMemory;
use storage::Storage;
use {Error, Result};
mod builder;
mod command;
mod long_queue_policy;
mod probabilistic;
mod queue;
mod request;
mod thread;
#[must_use]
#[derive(Debug)]
pub struct Device {
monitor: DeviceThreadMonitor,
handle: DeviceHandle,
is_stopped: bool,
}
impl Device {
pub fn spawn<F, N>(init_storage: F) -> Device
where
F: FnOnce() -> Result<Storage<N>> + Send + 'static,
N: NonVolatileMemory + Send + 'static,
{
DeviceBuilder::new().spawn(init_storage)
}
pub fn handle(&self) -> DeviceHandle {
self.handle.clone()
}
pub fn stop(&self, deadline: Deadline) {
self.handle()
.request()
.wait_for_running()
.deadline(deadline)
.stop();
}
pub fn wait_for_running(self) -> impl Future<Item = Self, Error = Error> {
let handle = self.handle();
let future = handle.request().wait_for_running().head(LumpId::new(0)); track_err!(future.map(move |_| self))
}
pub(crate) fn new(monitor: DeviceThreadMonitor, handle: DeviceHandle) -> Self {
Device {
monitor,
handle,
is_stopped: false,
}
}
}
impl Future for Device {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let result = track!(self.monitor.poll());
if let Ok(Async::NotReady) = result {
} else {
self.is_stopped = true;
}
result
}
}
impl Drop for Device {
fn drop(&mut self) {
if !self.is_stopped {
self.stop(Deadline::Immediate);
}
}
}
#[derive(Debug, Clone)]
pub struct DeviceHandle(DeviceThreadHandle);
impl DeviceHandle {
pub fn request(&self) -> DeviceRequest {
DeviceRequest::new(&self.0)
}
pub fn metrics(&self) -> &Arc<DeviceMetrics> {
self.0.metrics()
}
pub fn allocate_lump_data(&self, size: usize) -> Result<LumpData> {
if let Some(storage) = self.metrics().storage() {
track!(LumpData::aligned_allocate(
size,
storage.header().block_size
))
} else {
let mut data = Vec::with_capacity(size);
unsafe {
data.set_len(size);
}
track!(LumpData::new(data))
}
}
pub fn allocate_lump_data_with_bytes(&self, bytes: &[u8]) -> Result<LumpData> {
let mut data = track!(self.allocate_lump_data(bytes.len()))?;
data.as_bytes_mut().copy_from_slice(bytes);
Ok(data)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DeviceStatus {
Starting = 1,
Running = 2,
Stopped = 0,
}
#[cfg(test)]
mod tests {
use fibers_global::execute;
use std::ops::Range;
use trackable::result::TestResult;
use super::*;
use lump::{LumpData, LumpId};
use nvm::{MemoryNvm, SharedMemoryNvm};
use std::time::Duration;
use storage::StorageBuilder;
use ErrorKind;
#[test]
fn device_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new().journal_region_ratio(0.99).create(nvm))?;
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list());
track!(execute(d.request().put(id(0), data(b"foo"))))?;
track!(execute(d.request().put(id(1), data(b"bar"))))?;
track!(execute(d.request().put(id(2), data(b"baz"))))?;
assert_eq!(
track!(execute(d.request().list()))?,
vec![id(0), id(1), id(2)]
);
assert_eq!(track!(execute(d.request().delete(id(1))))?, true);
assert_eq!(track!(execute(d.request().delete(id(1))))?, false);
assert_eq!(track!(execute(d.request().list()))?, vec![id(0), id(2)]);
Ok(())
}
#[test]
fn delete_range_all_data_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new().journal_region_ratio(0.99).create(nvm))?;
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list());
track!(execute(d.request().put(id(0), data(b"foo"))))?;
track!(execute(d.request().put(id(1), data(b"bar"))))?;
track!(execute(d.request().put(id(2), data(b"baz"))))?;
assert_eq!(
track!(execute(d.request().list()))?,
vec![id(0), id(1), id(2)]
);
assert_eq!(
track!(execute(d.request().delete_range(Range {
start: id(0),
end: id(3)
})))?,
vec![id(0), id(1), id(2)]
);
assert_eq!(track!(execute(d.request().list()))?, vec![]);
Ok(())
}
#[test]
fn delete_range_no_data_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new().journal_region_ratio(0.99).create(nvm))?;
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list());
track!(execute(d.request().put(id(0), data(b"foo"))))?;
track!(execute(d.request().put(id(1), data(b"bar"))))?;
track!(execute(d.request().put(id(2), data(b"baz"))))?;
assert_eq!(
track!(execute(d.request().list()))?,
vec![id(0), id(1), id(2)]
);
assert_eq!(
track!(execute(d.request().delete_range(Range {
start: id(3),
end: id(9)
})))?,
vec![]
);
assert_eq!(
track!(execute(d.request().list()))?,
vec![id(0), id(1), id(2)]
);
Ok(())
}
#[test]
fn delete_range_partial_data_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new().journal_region_ratio(0.99).create(nvm))?;
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list());
track!(execute(d.request().put(id(0), data(b"foo"))))?;
track!(execute(d.request().put(id(1), data(b"bar"))))?;
track!(execute(d.request().put(id(2), data(b"baz"))))?;
track!(execute(d.request().put(id(3), data(b"hoge"))))?;
assert_eq!(
track!(execute(d.request().list()))?,
vec![id(0), id(1), id(2), id(3)]
);
assert_eq!(
track!(execute(d.request().delete_range(Range {
start: id(1),
end: id(3)
})))?,
vec![id(1), id(2)]
);
assert_eq!(track!(execute(d.request().list()))?, vec![id(0), id(3)]);
Ok(())
}
#[test]
fn list_range_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new().journal_region_ratio(0.99).create(nvm))?;
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list());
for i in 2..7 {
track!(execute(
d.request().put(id(i), data(i.to_string().as_bytes()))
))?;
}
assert_eq!(
track!(execute(d.request().list()))?,
vec![id(2), id(3), id(4), id(5), id(6)]
);
assert_eq!(
track!(execute(d.request().list_range(Range {
start: id(0),
end: id(2)
})))?,
vec![]
);
assert_eq!(
track!(execute(d.request().list_range(Range {
start: id(1),
end: id(5)
})))?,
vec![id(2), id(3), id(4)]
);
assert_eq!(
track!(execute(d.request().list_range(Range {
start: id(3),
end: id(4)
})))?,
vec![id(3)]
);
assert_eq!(
track!(execute(d.request().list_range(Range {
start: id(0),
end: id(10000)
})))?,
vec![id(2), id(3), id(4), id(5), id(6)]
);
Ok(())
}
#[test]
fn usage_range_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new().journal_region_ratio(0.99).create(nvm))?;
let header = storage.header().clone();
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list()); let usage = track!(execute(d.request().usage_range(Range {
start: id(0),
end: id(10)
})))?;
assert_eq!(512u16, header.block_size.as_u16());
assert_eq!(0, usage.bytecount().unwrap());
track!(execute(d.request().put(id(0), data(&[0; 510]))))?;
track!(execute(d.request().put(id(1), data(&[0; 511]))))?;
track!(execute(d.request().put(id(12), data(b"baz"))))?;
let usage = track!(execute(d.request().usage_range(Range {
start: id(0),
end: id(0)
})))?;
assert_eq!(0, usage.bytecount().unwrap());
let usage = track!(execute(d.request().usage_range(Range {
start: id(0),
end: id(1)
})))?;
assert_eq!(
header.block_size.as_u16(),
usage.bytecount().unwrap() as u16
);
let usage = track!(execute(d.request().usage_range(Range {
start: id(0),
end: id(10)
})))?;
assert_eq!(
header.block_size.as_u16() * 3,
usage.bytecount().unwrap() as u16
);
let usage = track!(execute(d.request().usage_range(Range {
start: id(0),
end: id(13)
})))?;
assert_eq!(
header.block_size.as_u16() * 4,
usage.bytecount().unwrap() as u16
);
Ok(())
}
fn id(id: usize) -> LumpId {
LumpId::new(id as u128)
}
fn data(data: &[u8]) -> LumpData {
LumpData::new(Vec::from(data)).unwrap()
}
fn embedded_data(data: &[u8]) -> LumpData {
LumpData::new_embedded(Vec::from(data)).unwrap()
}
#[test]
fn journal_sync_works() -> TestResult {
{
let nvm = SharedMemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(StorageBuilder::new()
.journal_region_ratio(0.99)
.create(nvm.clone()))?;
let v = nvm.to_bytes();
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list());
track!(execute(d.request().put(id(1234), embedded_data(b"hoge"))))?;
assert_eq!(v, nvm.to_bytes()); }
{
let nvm = SharedMemoryNvm::new(vec![0; 4 * 1024]);
let storage = track!(StorageBuilder::new()
.journal_region_ratio(0.5)
.create(nvm.clone()))?;
let v = nvm.to_bytes();
let device = DeviceBuilder::new().spawn(|| Ok(storage));
let d = device.handle();
let _ = execute(d.request().wait_for_running().list()); track!(execute(
d.request()
.journal_sync()
.put(id(1234), embedded_data(b"hoge"))
))?;
assert_ne!(v, nvm.to_bytes()); }
Ok(())
}
#[test]
fn device_stop_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(Storage::create(nvm))?;
let device = Device::spawn(|| Ok(storage));
device.stop(Deadline::Immediate); track!(execute(device))?;
Ok(())
}
#[test]
fn device_long_queue_policy_refuse_request_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(Storage::create(nvm))?;
let device = DeviceBuilder::new()
.busy_threshold(0)
.max_keep_busy_duration(Duration::from_secs(0))
.long_queue_policy(LongQueuePolicy::RefuseNewRequests { ratio: 1.0 })
.spawn(|| Ok(storage));
let handle = device.handle();
let result = execute(
handle
.request()
.wait_for_running()
.put(id(1234), embedded_data(b"hoge")),
);
assert!(result.unwrap(), true);
let result = execute(
handle
.request()
.wait_for_running()
.put(id(1234), embedded_data(b"hoge")),
);
assert!(result.is_err());
assert_eq!(*result.unwrap_err().kind(), ErrorKind::RequestRefused);
let result = execute(
handle
.request()
.wait_for_running()
.prioritized()
.put(id(1234), embedded_data(b"hoge")),
);
assert_eq!(result.unwrap(), false);
Ok(())
}
#[test]
fn device_long_queue_policy_stop_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(Storage::create(nvm))?;
let device = DeviceBuilder::new()
.busy_threshold(0)
.max_keep_busy_duration(Duration::from_secs(0))
.long_queue_policy(LongQueuePolicy::Stop)
.spawn(|| Ok(storage));
let handle = device.handle();
let result = execute(
handle
.request()
.wait_for_running()
.put(id(1234), embedded_data(b"hoge")),
);
assert_eq!(*result.unwrap_err().kind(), ErrorKind::DeviceTerminated);
Ok(())
}
#[test]
fn device_long_queue_policy_drop_works() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024 * 1024]);
let storage = track!(Storage::create(nvm))?;
let device = DeviceBuilder::new()
.busy_threshold(0)
.max_keep_busy_duration(Duration::from_secs(0))
.long_queue_policy(LongQueuePolicy::Drop { ratio: 1.0 })
.spawn(|| Ok(storage));
let handle = device.handle();
let result = execute(
handle
.request()
.wait_for_running()
.put(id(1234), embedded_data(b"hoge")),
);
assert_eq!(*result.unwrap_err().kind(), ErrorKind::RequestDropped);
let result = execute(
handle
.request()
.wait_for_running()
.prioritized()
.put(id(1234), embedded_data(b"hoge")),
);
assert_eq!(result.unwrap(), true);
Ok(())
}
}