use futures::Future;
use std::ops::Range;
use trackable::error::ErrorKindExt;
use super::thread::DeviceThreadHandle;
use deadline::Deadline;
use device::command::{self, Command};
use device::DeviceStatus;
use lump::{LumpData, LumpHeader, LumpId};
use storage::StorageUsage;
use {Error, ErrorKind, Result};
#[derive(Debug)]
pub struct DeviceRequest<'a> {
device: &'a DeviceThreadHandle,
deadline: Option<Deadline>,
max_queue_len: Option<usize>,
wait_for_running: bool,
enforce_journal_sync: bool,
prioritized: bool,
}
impl<'a> DeviceRequest<'a> {
pub(crate) fn new(device: &'a DeviceThreadHandle) -> Self {
DeviceRequest {
device,
deadline: None,
max_queue_len: None,
wait_for_running: false,
enforce_journal_sync: false,
prioritized: false,
}
}
pub fn put(
&self,
lump_id: LumpId,
lump_data: LumpData,
) -> impl Future<Item = bool, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) = command::PutLump::new(
lump_id,
lump_data,
deadline,
prioritized,
self.enforce_journal_sync,
);
self.send_command(Command::Put(command));
response
}
pub fn get(&self, lump_id: LumpId) -> impl Future<Item = Option<LumpData>, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) = command::GetLump::new(lump_id, deadline, prioritized);
self.send_command(Command::Get(command));
response
}
pub fn head(&self, lump_id: LumpId) -> impl Future<Item = Option<LumpHeader>, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) = command::HeadLump::new(lump_id, deadline, prioritized);
self.send_command(Command::Head(command));
response
}
pub fn delete(&self, lump_id: LumpId) -> impl Future<Item = bool, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) =
command::DeleteLump::new(lump_id, deadline, prioritized, self.enforce_journal_sync);
self.send_command(Command::Delete(command));
response
}
pub fn delete_range(
&self,
range: Range<LumpId>,
) -> impl Future<Item = Vec<LumpId>, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) =
command::DeleteLumpRange::new(range, deadline, prioritized, self.enforce_journal_sync);
self.send_command(Command::DeleteRange(command));
response
}
pub fn list(&self) -> impl Future<Item = Vec<LumpId>, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) = command::ListLump::new(deadline, prioritized);
self.send_command(Command::List(command));
response
}
pub fn list_range(
&self,
range: Range<LumpId>,
) -> impl Future<Item = Vec<LumpId>, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) = command::ListLumpRange::new(range, deadline, prioritized);
self.send_command(Command::ListRange(command));
response
}
pub fn usage_range(
&self,
range: Range<LumpId>,
) -> impl Future<Item = StorageUsage, Error = Error> {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let (command, response) = command::UsageLumpRange::new(range, deadline, prioritized);
self.send_command(Command::UsageRange(command));
response
}
pub(crate) fn stop(&self) {
let deadline = self.deadline.unwrap_or_default();
let prioritized = self.prioritized;
let command = command::StopDevice::new(deadline, prioritized);
self.send_command(Command::Stop(command));
}
pub fn deadline(&mut self, deadline: Deadline) -> &mut Self {
self.deadline = Some(deadline);
self
}
pub fn journal_sync(&mut self) -> &mut Self {
self.enforce_journal_sync = true;
self
}
pub fn max_queue_len(&mut self, max: usize) -> &mut Self {
self.max_queue_len = Some(max);
self
}
pub fn wait_for_running(&mut self) -> &mut Self {
self.wait_for_running = true;
self
}
pub fn prioritized(&mut self) -> &mut Self {
self.prioritized = true;
self
}
fn send_command(&self, command: Command) {
if !self.wait_for_running && self.device.metrics().status() == DeviceStatus::Starting {
let e = track!(ErrorKind::DeviceBusy.cause("The device is starting up"));
command.failed(e.into());
return;
}
if let Err(e) = track!(self.check_limit()) {
self.device.metrics().busy_commands.increment(&command);
command.failed(e)
} else {
self.device.send_command(command);
}
}
fn check_limit(&self) -> Result<()> {
let metrics = self.device.metrics();
if let Some(max) = self.max_queue_len {
track_assert!(
metrics.queue_len() <= max,
ErrorKind::DeviceBusy,
"value={}, max={}",
metrics.queue_len(),
max
);
}
Ok(())
}
}