1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
#![forbid(unsafe_code, future_incompatible)]
#![deny(
missing_debug_implementations,
nonstandard_style,
missing_copy_implementations,
unused_qualifications,
missing_docs,
rustdoc::missing_crate_level_docs
)]
#![warn(clippy::pedantic)]
//! # Stopper
//!
//! The primary type for this crate is [`Stopper`], which provides a
//! synchronized mechanism for canceling Futures and Streams.
use event_listener::{Event, IntoNotification};
use futures_lite::Stream;
use std::{
fmt::{Debug, Formatter, Result},
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
mod stopped;
pub use stopped::Stopped;
mod stream_stopper;
pub use stream_stopper::StreamStopper;
mod future_stopper;
pub use future_stopper::FutureStopper;
/// This struct provides a synchronized mechanism for canceling
/// Futures and Streams.
///
/// Stoppers are cheap to clone.
///
/// A clone of the Stopper can be awaited and will be pending until the Stopper is stopped. If the
/// Stopper is stopped before it is awaited, it will be ready immediately.
#[derive(Clone)]
pub struct Stopper(Arc<StopperInner>);
impl From<StopperInner> for Stopper {
fn from(value: StopperInner) -> Self {
Self(Arc::new(value))
}
}
impl Stopper {
/// Initialize a stopper that is not yet stopped and that has zero
/// registered wakers. Any clone of this stopper represents the
/// same internal state. This is identical to `Stopper::default()`
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Stop all futures and streams that have been registered to this
/// Stopper or any clone representing the same initial stopper.
///
pub fn stop(&self) {
if !self.0.stopped.swap(true, Ordering::SeqCst) {
self.0.event.notify(usize::MAX.relaxed());
}
}
/// Returns whether this stopper (or any clone of it) has been
/// stopped.
#[must_use]
pub fn is_stopped(&self) -> bool {
self.0.stopped.load(Ordering::SeqCst)
}
/// This function returns a new stream which will poll None
/// (indicating a completed stream) when this Stopper has been
/// stopped. The Stream's Item is unchanged.
pub fn stop_stream<S: Stream>(&self, stream: S) -> StreamStopper<S> {
StreamStopper::new(self, stream)
}
/// This function returns a Future which wraps the provided future
/// and stops it when this Stopper has been stopped. Note that the
/// Output of the returned future is wrapped with an Option. If
/// the future resolves to None, that indicates that it was
/// stopped instead of polling to completion.
pub fn stop_future<F: Future>(&self, future: F) -> FutureStopper<F> {
FutureStopper::new(self, future)
}
}
impl Default for Stopper {
fn default() -> Self {
Self::from(StopperInner {
stopped: false.into(),
event: Event::new(),
})
}
}
impl Debug for Stopper {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
f.debug_struct("Stopper")
.field("stopped", &self.is_stopped())
.finish()
}
}
struct StopperInner {
stopped: AtomicBool,
event: Event,
}