pausable_future/
lib.rs

1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3#![warn(missing_docs)]
4
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, MutexGuard};
8use std::task::{Context, Poll, Waker};
9
10use pin_project_lite::pin_project;
11
12pin_project! {
13    /// A future or a stream that can be paused and resumed.
14    pub struct Pausable<F> {
15        #[pin]
16        inner: F,
17        controller: Controller,
18    }
19}
20
21impl<I> Pausable<I> {
22    /// Create a new `Pausable` future/stream.
23    pub fn new(inner: I) -> Self {
24        Self {
25            inner,
26            controller: Controller(Arc::new(Mutex::new(ControllerInner {
27                paused: false,
28                cx: None
29            }))),
30        }
31    }
32
33    /// Get the controller.
34    pub fn controller(&self) -> Controller {
35        self.controller.clone()
36    }
37}
38
39#[derive(Debug, Clone)]
40/// The controller of a `Pausable` future/stream.
41pub struct Controller(Arc<Mutex<ControllerInner>>);
42
43#[derive(Debug)]
44struct ControllerInner {
45    paused: bool,
46    cx: Option<Waker>,
47}
48
49impl Controller {
50    fn inner(&self) -> MutexGuard<ControllerInner> {
51        self.0.lock().unwrap_or_else(|e| e.into_inner())
52    }
53
54    /// Is the future/stream paused?
55    pub fn is_paused(&self) -> bool {
56        self.inner().paused
57    }
58
59    /// Pause the future/stream.
60    pub fn pause(&self) {
61        self.inner().paused = true;
62    }
63
64    /// Resume the future/stream.
65    pub fn resume(&self) {
66        let mut me = self.inner();
67        me.paused = false;
68        if let Some(cx) = me.cx.take() {
69            cx.wake();
70        }
71    }
72}
73
74impl<F: Future> Future for Pausable<F> {
75    type Output = F::Output;
76
77    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78        let me = self.project();
79        let mut controller = me.controller.inner();
80        if !controller.paused {
81            drop(controller);
82            return me.inner.poll(cx);
83        }
84        let cx = cx.waker().clone();
85        controller.cx.replace(cx);
86        Poll::Pending
87    }
88}
89
90#[cfg(feature = "stream")]
91impl<S: futures::Stream> futures::Stream for Pausable<S> {
92    type Item = S::Item;
93
94    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        let me = self.project();
96        let mut controller = me.controller.inner();
97        if !controller.paused {
98            drop(controller);
99            return me.inner.poll_next(cx);
100        }
101        let cx = cx.waker().clone();
102        controller.cx.replace(cx);
103        Poll::Pending
104    }
105}