1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3#![warn(missing_docs)]
4
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, MutexGuard};
8use std::task::{Context, Poll, Waker};
9
10use pin_project_lite::pin_project;
11
12pin_project! {
13 pub struct Pausable<F> {
15 #[pin]
16 inner: F,
17 controller: Controller,
18 }
19}
20
21impl<I> Pausable<I> {
22 pub fn new(inner: I) -> Self {
24 Self {
25 inner,
26 controller: Controller(Arc::new(Mutex::new(ControllerInner {
27 paused: false,
28 cx: None
29 }))),
30 }
31 }
32
33 pub fn controller(&self) -> Controller {
35 self.controller.clone()
36 }
37}
38
39#[derive(Debug, Clone)]
40pub struct Controller(Arc<Mutex<ControllerInner>>);
42
43#[derive(Debug)]
44struct ControllerInner {
45 paused: bool,
46 cx: Option<Waker>,
47}
48
49impl Controller {
50 fn inner(&self) -> MutexGuard<ControllerInner> {
51 self.0.lock().unwrap_or_else(|e| e.into_inner())
52 }
53
54 pub fn is_paused(&self) -> bool {
56 self.inner().paused
57 }
58
59 pub fn pause(&self) {
61 self.inner().paused = true;
62 }
63
64 pub fn resume(&self) {
66 let mut me = self.inner();
67 me.paused = false;
68 if let Some(cx) = me.cx.take() {
69 cx.wake();
70 }
71 }
72}
73
74impl<F: Future> Future for Pausable<F> {
75 type Output = F::Output;
76
77 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78 let me = self.project();
79 let mut controller = me.controller.inner();
80 if !controller.paused {
81 drop(controller);
82 return me.inner.poll(cx);
83 }
84 let cx = cx.waker().clone();
85 controller.cx.replace(cx);
86 Poll::Pending
87 }
88}
89
90#[cfg(feature = "stream")]
91impl<S: futures::Stream> futures::Stream for Pausable<S> {
92 type Item = S::Item;
93
94 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 let me = self.project();
96 let mut controller = me.controller.inner();
97 if !controller.paused {
98 drop(controller);
99 return me.inner.poll_next(cx);
100 }
101 let cx = cx.waker().clone();
102 controller.cx.replace(cx);
103 Poll::Pending
104 }
105}