#![doc(html_root_url = "https://docs.rs/halt/latest")]
#![deny(
bad_style,
bare_trait_objects,
missing_docs,
unused_import_braces,
unused_qualifications,
unsafe_code,
unstable_features
)]
use std::io::{self, Read, Write};
use std::sync::{Arc, Condvar, Mutex, Weak};
#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
enum State {
Running,
Paused,
Stopped,
}
impl Default for State {
#[inline]
fn default() -> Self {
State::Running
}
}
#[derive(Debug, Default)]
struct Notify {
state: Mutex<State>,
condvar: Condvar,
}
impl Notify {
#[inline]
fn is_paused(&self) -> bool {
self.state
.lock()
.map(|state| *state == State::Paused)
.unwrap_or(false)
}
#[inline]
fn is_running(&self) -> bool {
self.state
.lock()
.map(|state| *state == State::Running)
.unwrap_or(false)
}
#[inline]
fn is_stopped(&self) -> bool {
self.state
.lock()
.map(|state| *state == State::Stopped)
.unwrap_or(false)
}
}
#[derive(Clone, Debug)]
pub struct Remote {
notify: Weak<Notify>,
}
impl Remote {
#[inline]
pub fn pause(&self) {
self.set_and_notify(State::Paused);
}
#[inline]
pub fn resume(&self) {
self.set_and_notify(State::Running);
}
#[inline]
pub fn stop(&self) {
self.set_and_notify(State::Stopped);
}
#[inline]
pub fn is_paused(&self) -> bool {
self.notify
.upgrade()
.map(|notify| notify.is_paused())
.unwrap_or(false)
}
#[inline]
pub fn is_running(&self) -> bool {
self.notify
.upgrade()
.map(|notify| notify.is_running())
.unwrap_or(false)
}
#[inline]
pub fn is_stopped(&self) -> bool {
self.notify
.upgrade()
.map(|notify| notify.is_stopped())
.unwrap_or(false)
}
#[inline]
pub fn is_valid(&self) -> bool {
self.notify.upgrade().is_some()
}
#[inline]
fn set_and_notify(&self, new: State) {
let notify = self.notify.upgrade().expect("invalid remote");
let mut guard = notify.state.lock().unwrap();
let state = &mut *guard;
let need_to_notify = *state == State::Paused && *state != new;
*state = new;
drop(guard);
if need_to_notify {
notify.condvar.notify_all();
}
}
}
#[derive(Debug, Default)]
pub struct Halt<T> {
inner: T,
notify: Arc<Notify>,
}
impl<T> Halt<T> {
#[inline]
pub fn new(inner: T) -> Halt<T> {
Halt {
inner,
notify: Arc::new(Notify::default()),
}
}
#[inline]
pub fn remote(&self) -> Remote {
Remote {
notify: Arc::downgrade(&self.notify),
}
}
#[inline]
pub fn is_paused(&self) -> bool {
self.notify.is_paused()
}
#[inline]
pub fn is_running(&self) -> bool {
self.notify.is_running()
}
#[inline]
pub fn is_stopped(&self) -> bool {
self.notify.is_stopped()
}
#[inline]
pub fn get_ref(&self) -> &T {
&self.inner
}
#[inline]
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
#[inline]
pub fn into_inner(self) -> T {
self.inner
}
#[inline]
fn wait_if_paused(&self) {
let mut guard = self.notify.state.lock().unwrap();
while *guard == State::Paused {
guard = self.notify.condvar.wait(guard).unwrap();
}
}
}
impl<T> From<T> for Halt<T> {
#[inline]
fn from(inner: T) -> Self {
Halt::new(inner)
}
}
impl<T> AsRef<T> for Halt<T> {
#[inline]
fn as_ref(&self) -> &T {
self.get_ref()
}
}
impl<T> AsMut<T> for Halt<T> {
#[inline]
fn as_mut(&mut self) -> &mut T {
self.get_mut()
}
}
impl<I: Iterator> Iterator for Halt<I> {
type Item = I::Item;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.wait_if_paused();
if self.is_stopped() {
None
} else {
self.inner.next()
}
}
}
impl<I: DoubleEndedIterator> DoubleEndedIterator for Halt<I> {
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
self.wait_if_paused();
if self.is_stopped() {
None
} else {
self.inner.next_back()
}
}
}
impl<A, I: Extend<A>> Extend<A> for Halt<I> {
#[inline]
fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
self.inner.extend(iter);
}
}
impl<R: Read> Read for Halt<R> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.wait_if_paused();
if self.is_stopped() {
Ok(0)
} else {
self.inner.read(buf)
}
}
}
impl<W: Write> Write for Halt<W> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.wait_if_paused();
if self.is_stopped() {
Ok(0)
} else {
self.inner.write(buf)
}
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}