#![doc(html_root_url = "https://docs.rs/halt/latest")]
#![deny(
bad_style,
bare_trait_objects,
missing_docs,
unused_import_braces,
unused_qualifications,
unsafe_code,
unstable_features
)]
use std::error;
use std::fmt::{self, Display, Formatter};
use std::io::{self, Read, Write};
use std::sync::{Arc, Condvar, Mutex, Weak};
pub type Result = std::result::Result<(), Error>;
#[derive(Copy, Clone, Debug)]
pub enum Error {
HaltIsDropped,
FailedToLock,
}
impl Display for Error {
#[inline]
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Error::HaltIsDropped => f.write_str("the halt wrapper has been dropped"),
Error::FailedToLock => f.write_str("failed to take a lock on the mutex"),
}
}
}
impl error::Error for Error {}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
enum State {
Paused,
Running,
Stopped,
}
#[derive(Debug)]
struct Notify {
state: Mutex<State>,
condvar: Condvar,
}
impl Default for Notify {
#[inline]
fn default() -> Self {
Notify {
state: Mutex::new(State::Running),
condvar: Condvar::new(),
}
}
}
#[derive(Clone, Debug)]
pub struct Remote {
notify: Weak<Notify>,
}
impl Remote {
#[inline]
pub fn pause(&self) -> Result {
self.set(State::Paused)
}
#[inline]
pub fn resume(&self) -> Result {
self.set_and_notify(State::Running)
}
#[inline]
pub fn stop(&self) -> Result {
self.set_and_notify(State::Stopped)
}
#[inline]
fn set(&self, state: State) -> Result {
let notify = self.notify.upgrade().ok_or(Error::HaltIsDropped)?;
let mut guard = notify.state.lock().map_err(|_| Error::FailedToLock)?;
*guard = state;
Ok(())
}
#[inline]
fn set_and_notify(&self, state: State) -> Result {
let notify = self.notify.upgrade().ok_or(Error::HaltIsDropped)?;
let mut guard = notify.state.lock().map_err(|_| Error::FailedToLock)?;
let paused = *guard == State::Paused;
*guard = state;
if paused {
notify.condvar.notify_all();
}
Ok(())
}
}
#[derive(Debug)]
pub struct Halt<T> {
inner: T,
notify: Arc<Notify>,
}
impl<T> Halt<T> {
#[inline]
pub fn new(inner: T) -> Halt<T> {
Halt {
inner,
notify: Arc::new(Notify::default()),
}
}
#[inline]
pub fn remote(&self) -> Remote {
Remote {
notify: Arc::downgrade(&self.notify),
}
}
#[inline]
pub fn get_ref(&self) -> &T {
&self.inner
}
#[inline]
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
#[inline]
pub fn into_inner(self) -> T {
self.inner
}
#[inline]
fn stopped(&self) -> bool {
self.notify
.state
.lock()
.map(|guard| *guard == State::Stopped)
.unwrap_or(false)
}
#[inline]
fn wait_if_paused(&self) -> Result {
let mut guard = self.notify.state.lock().map_err(|_| Error::FailedToLock)?;
while *guard == State::Paused {
guard = self
.notify
.condvar
.wait(guard)
.map_err(|_| Error::FailedToLock)?;
}
Ok(())
}
}
impl<T> From<T> for Halt<T> {
#[inline]
fn from(inner: T) -> Self {
Halt::new(inner)
}
}
impl<I: Iterator> Iterator for Halt<I> {
type Item = I::Item;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let _ = self.wait_if_paused();
if self.stopped() {
None
} else {
self.inner.next()
}
}
}
impl<I: DoubleEndedIterator> DoubleEndedIterator for Halt<I> {
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
let _ = self.wait_if_paused();
if self.stopped() {
None
} else {
self.inner.next_back()
}
}
}
impl<I: Extend<A>, A> Extend<A> for Halt<I> {
#[inline]
fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
self.inner.extend(iter);
}
}
impl<R: Read> Read for Halt<R> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let _ = self.wait_if_paused();
if self.stopped() {
Ok(0)
} else {
self.inner.read(buf)
}
}
}
impl<W: Write> Write for Halt<W> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let _ = self.wait_if_paused();
if self.stopped() {
Ok(0)
} else {
self.inner.write(buf)
}
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}