use fibers::sync::oneshot;
use futures::{Future, Poll};
use std::ops::Range;
use std::sync::mpsc::{Receiver, Sender};
use trackable::error::ErrorKindExt;
use deadline::Deadline;
use lump::{LumpData, LumpHeader, LumpId};
use storage::StorageUsage;
use {Error, ErrorKind, Result};
pub type CommandSender = Sender<Command>;
pub type CommandReceiver = Receiver<Command>;
#[derive(Debug)]
pub enum Command {
Put(PutLump),
Get(GetLump),
Head(HeadLump),
Delete(DeleteLump),
DeleteRange(DeleteLumpRange),
List(ListLump),
ListRange(ListLumpRange),
UsageRange(UsageLumpRange),
Stop(StopDevice),
}
impl Command {
pub fn deadline(&self) -> Deadline {
match *self {
Command::Put(ref c) => c.deadline,
Command::Get(ref c) => c.deadline,
Command::Head(ref c) => c.deadline,
Command::Delete(ref c) => c.deadline,
Command::DeleteRange(ref c) => c.deadline,
Command::List(ref c) => c.deadline,
Command::ListRange(ref c) => c.deadline,
Command::UsageRange(ref c) => c.deadline,
Command::Stop(ref c) => c.deadline,
}
}
pub fn prioritized(&self) -> bool {
match *self {
Command::Put(ref c) => c.prioritized,
Command::Get(ref c) => c.prioritized,
Command::Head(ref c) => c.prioritized,
Command::Delete(ref c) => c.prioritized,
Command::DeleteRange(ref c) => c.prioritized,
Command::List(ref c) => c.prioritized,
Command::ListRange(ref c) => c.prioritized,
Command::UsageRange(ref c) => c.prioritized,
Command::Stop(ref c) => c.prioritized,
}
}
pub fn failed(self, error: Error) {
match self {
Command::Put(c) => c.reply.send(Err(error)),
Command::Get(c) => c.reply.send(Err(error)),
Command::Head(c) => c.reply.send(Err(error)),
Command::Delete(c) => c.reply.send(Err(error)),
Command::DeleteRange(c) => c.reply.send(Err(error)),
Command::List(c) => c.reply.send(Err(error)),
Command::ListRange(c) => c.reply.send(Err(error)),
Command::UsageRange(c) => c.reply.send(Err(error)),
Command::Stop(_) => {}
}
}
}
#[derive(Debug)]
pub struct AsyncResult<T>(oneshot::Monitor<T, Error>);
impl<T> AsyncResult<T> {
#[allow(clippy::new_ret_no_self)]
fn new() -> (AsyncReply<T>, Self) {
let (tx, rx) = oneshot::monitor();
(AsyncReply(tx), AsyncResult(rx))
}
}
impl<T> Future for AsyncResult<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
track!(self
.0
.poll()
.map_err(|e| e.unwrap_or_else(|| ErrorKind::DeviceTerminated
.cause("monitoring channel disconnected")
.into())))
}
}
#[derive(Debug)]
struct AsyncReply<T>(oneshot::Monitored<T, Error>);
impl<T> AsyncReply<T> {
fn send(self, result: Result<T>) {
self.0.exit(result);
}
}
#[derive(Debug)]
pub struct PutLump {
lump_id: LumpId,
lump_data: LumpData,
deadline: Deadline,
prioritized: bool,
journal_sync: bool,
reply: AsyncReply<bool>,
}
impl PutLump {
#[allow(clippy::new_ret_no_self)]
pub fn new(
lump_id: LumpId,
lump_data: LumpData,
deadline: Deadline,
prioritized: bool,
journal_sync: bool,
) -> (Self, AsyncResult<bool>) {
let (reply, result) = AsyncResult::new();
let command = PutLump {
lump_id,
lump_data,
deadline,
prioritized,
journal_sync,
reply,
};
(command, result)
}
pub fn lump_id(&self) -> &LumpId {
&self.lump_id
}
pub fn lump_data(&self) -> &LumpData {
&self.lump_data
}
pub fn do_sync_journal(&self) -> bool {
self.journal_sync
}
pub fn reply(self, result: Result<bool>) {
self.reply.send(result)
}
}
#[derive(Debug)]
pub struct GetLump {
lump_id: LumpId,
deadline: Deadline,
prioritized: bool,
reply: AsyncReply<Option<LumpData>>,
}
impl GetLump {
#[allow(clippy::new_ret_no_self)]
pub fn new(
lump_id: LumpId,
deadline: Deadline,
prioritized: bool,
) -> (Self, AsyncResult<Option<LumpData>>) {
let (reply, result) = AsyncResult::new();
let command = GetLump {
lump_id,
deadline,
prioritized,
reply,
};
(command, result)
}
pub fn lump_id(&self) -> &LumpId {
&self.lump_id
}
pub fn reply(self, result: Result<Option<LumpData>>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct HeadLump {
lump_id: LumpId,
deadline: Deadline,
prioritized: bool,
reply: AsyncReply<Option<LumpHeader>>,
}
impl HeadLump {
#[allow(clippy::new_ret_no_self)]
pub fn new(
lump_id: LumpId,
deadline: Deadline,
prioritized: bool,
) -> (Self, AsyncResult<Option<LumpHeader>>) {
let (reply, result) = AsyncResult::new();
let command = HeadLump {
lump_id,
deadline,
prioritized,
reply,
};
(command, result)
}
pub fn lump_id(&self) -> &LumpId {
&self.lump_id
}
pub fn reply(self, result: Result<Option<LumpHeader>>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct DeleteLump {
lump_id: LumpId,
deadline: Deadline,
prioritized: bool,
journal_sync: bool,
reply: AsyncReply<bool>,
}
impl DeleteLump {
#[allow(clippy::new_ret_no_self)]
pub fn new(
lump_id: LumpId,
deadline: Deadline,
prioritized: bool,
journal_sync: bool,
) -> (Self, AsyncResult<bool>) {
let (reply, result) = AsyncResult::new();
let command = DeleteLump {
lump_id,
deadline,
prioritized,
journal_sync,
reply,
};
(command, result)
}
pub fn lump_id(&self) -> &LumpId {
&self.lump_id
}
pub fn do_sync_journal(&self) -> bool {
self.journal_sync
}
pub fn reply(self, result: Result<bool>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct DeleteLumpRange {
range: Range<LumpId>,
deadline: Deadline,
prioritized: bool,
journal_sync: bool,
reply: AsyncReply<Vec<LumpId>>,
}
impl DeleteLumpRange {
#[allow(clippy::new_ret_no_self)]
pub fn new(
range: Range<LumpId>,
deadline: Deadline,
prioritized: bool,
journal_sync: bool,
) -> (Self, AsyncResult<Vec<LumpId>>) {
let (reply, result) = AsyncResult::new();
let command = DeleteLumpRange {
range,
deadline,
prioritized,
journal_sync,
reply,
};
(command, result)
}
pub fn lump_range(&self) -> Range<LumpId> {
self.range.clone()
}
pub fn do_sync_journal(&self) -> bool {
self.journal_sync
}
pub fn reply(self, result: Result<Vec<LumpId>>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct ListLump {
deadline: Deadline,
prioritized: bool,
reply: AsyncReply<Vec<LumpId>>,
}
impl ListLump {
#[allow(clippy::new_ret_no_self)]
pub fn new(deadline: Deadline, prioritized: bool) -> (Self, AsyncResult<Vec<LumpId>>) {
let (reply, result) = AsyncResult::new();
let command = ListLump {
deadline,
prioritized,
reply,
};
(command, result)
}
pub fn reply(self, result: Result<Vec<LumpId>>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct ListLumpRange {
range: Range<LumpId>,
deadline: Deadline,
prioritized: bool,
reply: AsyncReply<Vec<LumpId>>,
}
impl ListLumpRange {
#[allow(clippy::new_ret_no_self)]
pub fn new(
range: Range<LumpId>,
deadline: Deadline,
prioritized: bool,
) -> (Self, AsyncResult<Vec<LumpId>>) {
let (reply, result) = AsyncResult::new();
let command = ListLumpRange {
range,
deadline,
prioritized,
reply,
};
(command, result)
}
pub fn lump_range(&self) -> Range<LumpId> {
self.range.clone()
}
pub fn reply(self, result: Result<Vec<LumpId>>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct UsageLumpRange {
range: Range<LumpId>,
deadline: Deadline,
prioritized: bool,
reply: AsyncReply<StorageUsage>,
}
impl UsageLumpRange {
#[allow(clippy::new_ret_no_self)]
pub fn new(
range: Range<LumpId>,
deadline: Deadline,
prioritized: bool,
) -> (Self, AsyncResult<StorageUsage>) {
let (reply, result) = AsyncResult::new();
let command = UsageLumpRange {
range,
deadline,
prioritized,
reply,
};
(command, result)
}
pub fn lump_range(&self) -> Range<LumpId> {
self.range.clone()
}
pub fn reply(self, result: Result<StorageUsage>) {
self.reply.send(result);
}
}
#[derive(Debug)]
pub struct StopDevice {
deadline: Deadline,
prioritized: bool,
}
impl StopDevice {
pub fn new(deadline: Deadline, prioritized: bool) -> Self {
StopDevice {
deadline,
prioritized,
}
}
}