#![forbid(unsafe_code, future_incompatible)]
#![deny(
missing_debug_implementations,
nonstandard_style,
missing_copy_implementations,
unused_qualifications,
missing_docs,
rustdoc::missing_crate_level_docs
)]
#![warn(clippy::pedantic)]
use event_listener::{Event, EventListener, IntoNotification};
use futures_lite::Stream;
use std::{
fmt::{Debug, Formatter, Result},
future::Future,
};
#[cfg(all(loom, feature = "loom"))]
use loom::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
#[cfg(not(all(loom, feature = "loom")))]
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
mod stopped;
pub use stopped::Stopped;
mod stream_stopper;
pub use stream_stopper::StreamStopper;
mod future_stopper;
pub use future_stopper::FutureStopper;
#[derive(Clone)]
pub struct Stopper(Arc<StopperInner>);
impl From<StopperInner> for Stopper {
fn from(value: StopperInner) -> Self {
Self(Arc::new(value))
}
}
impl Stopper {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn stop(&self) {
if !self.0.stopped.swap(true, Ordering::SeqCst) {
self.0.event.notify(usize::MAX.relaxed());
}
}
#[must_use]
pub fn is_stopped(&self) -> bool {
self.0.stopped.load(Ordering::SeqCst)
}
pub(crate) fn is_stopped_relaxed(&self) -> bool {
self.0.stopped.load(Ordering::Relaxed)
}
pub(crate) fn listener(&self) -> EventListener {
self.0.event.listen()
}
pub fn stop_stream<S: Stream>(&self, stream: S) -> StreamStopper<S> {
StreamStopper::new(self, stream)
}
pub fn stop_future<F: Future>(&self, future: F) -> FutureStopper<F> {
FutureStopper::new(self, future)
}
}
impl Default for Stopper {
fn default() -> Self {
Self::from(StopperInner {
stopped: false.into(),
event: Event::new(),
})
}
}
impl Debug for Stopper {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
f.debug_struct("Stopper")
.field("stopped", &self.is_stopped())
.finish()
}
}
struct StopperInner {
stopped: AtomicBool,
event: Event,
}