use super::*;
#[derive(Clone)]
pub struct Cancellable {
pub(super) cancelled: Arc<AtomicBool>,
_keep_alive: Option<Arc<dyn Send + Sync>>,
}
impl Cancellable {
pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
_keep_alive: keep_alive,
}
}
pub fn cancel(&self) -> bool {
!self.cancelled.swap(true, Ordering::SeqCst)
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
}
impl fmt::Debug for Cancellable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Cancellable").finish_non_exhaustive()
}
}
#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
pub struct StreamCompletion<T> {
state: StreamCompletionState<T>,
cancel_on_drop: Option<StreamCancellation>,
}
enum StreamCompletionState<T> {
Ready(Option<StreamResult<T>>),
Receiver(oneshot::Receiver<StreamResult<T>>),
}
#[derive(Clone)]
pub(crate) struct StreamCancellation {
cancelled: Arc<AtomicBool>,
worker: Arc<Mutex<Option<thread::Thread>>>,
}
pub(super) struct RegisteredStreamWorker {
cancellation: StreamCancellation,
}
impl StreamCancellation {
pub(super) fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
worker: Arc::new(Mutex::new(None)),
}
}
pub(crate) fn for_external_completion() -> Self {
Self::new()
}
pub(crate) fn cancelled(&self) -> Arc<AtomicBool> {
Arc::clone(&self.cancelled)
}
pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
*self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
RegisteredStreamWorker {
cancellation: self.clone(),
}
}
fn cancel(&self) {
self.cancelled.store(true, Ordering::SeqCst);
let worker = self
.worker
.lock()
.expect("stream worker slot poisoned")
.clone();
if let Some(worker) = worker {
worker.unpark();
}
}
}
impl Drop for RegisteredStreamWorker {
fn drop(&mut self) {
*self
.cancellation
.worker
.lock()
.expect("stream worker slot poisoned") = None;
}
}
impl<T> StreamCompletion<T> {
pub(crate) fn from_receiver(
receiver: oneshot::Receiver<StreamResult<T>>,
cancel_on_drop: Option<StreamCancellation>,
) -> Self {
Self {
state: StreamCompletionState::Receiver(receiver),
cancel_on_drop,
}
}
pub(crate) fn ready(result: StreamResult<T>) -> Self {
Self {
state: StreamCompletionState::Ready(Some(result)),
cancel_on_drop: None,
}
}
pub fn wait(self) -> StreamResult<T> {
block_on(self)
}
#[must_use]
pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
match &mut self.state {
StreamCompletionState::Ready(result) => result.take(),
StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
Ok(Some(result)) => Some(result),
Ok(None) => None,
Err(_) => Some(Err(StreamError::AbruptTermination)),
},
}
}
}
impl<T> Future for StreamCompletion<T> {
type Output = StreamResult<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut self.state {
StreamCompletionState::Ready(result) => {
Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
}
StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
Poll::Pending => Poll::Pending,
},
}
}
}
impl<T> Unpin for StreamCompletion<T> {}
impl<T> Drop for StreamCompletion<T> {
fn drop(&mut self) {
if let Some(cancellation) = &self.cancel_on_drop {
cancellation.cancel();
}
}
}
impl<T> fmt::Debug for StreamCompletion<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamCompletion").finish_non_exhaustive()
}
}