use std::mem;
use futures_util::FutureExt as _;
use tokio_sync::{mpsc, watch};
use pin_project::pin_project;
use super::{Future, Never, Poll, Pin, task};
enum Action {
Open,
}
pub fn channel() -> (Signal, Watch) {
let (tx, rx) = watch::channel(Action::Open);
let (drained_tx, drained_rx) = mpsc::channel(1);
(
Signal {
drained_rx,
_tx: tx,
},
Watch {
drained_tx,
rx,
},
)
}
pub struct Signal {
drained_rx: mpsc::Receiver<Never>,
_tx: watch::Sender<Action>,
}
pub struct Draining {
drained_rx: mpsc::Receiver<Never>,
}
#[derive(Clone)]
pub struct Watch {
drained_tx: mpsc::Sender<Never>,
rx: watch::Receiver<Action>,
}
#[allow(missing_debug_implementations)]
#[pin_project]
pub struct Watching<F, FN> {
#[pin]
future: F,
state: State<FN>,
watch: Watch,
}
enum State<F> {
Watch(F),
Draining,
}
impl Signal {
pub fn drain(self) -> Draining {
Draining {
drained_rx: self.drained_rx,
}
}
}
impl Future for Draining {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(self.drained_rx.poll_recv(cx)) {
Some(never) => match never {},
None => Poll::Ready(()),
}
}
}
impl Watch {
pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
where
F: Future,
FN: FnOnce(Pin<&mut F>),
{
Watching {
future,
state: State::Watch(on_drain),
watch: self,
}
}
}
impl<F, FN> Future for Watching<F, FN>
where
F: Future,
FN: FnOnce(Pin<&mut F>),
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
loop {
let me = self.project();
match mem::replace(me.state, State::Draining) {
State::Watch(on_drain) => {
let recv = me.watch.rx.recv_ref();
futures_util::pin_mut!(recv);
match recv.poll_unpin(cx) {
Poll::Ready(None) => {
on_drain(me.future);
},
Poll::Ready(Some(_)) |
Poll::Pending => {
*me.state = State::Watch(on_drain);
return me.future.poll(cx);
},
}
},
State::Draining => {
return me.future.poll(cx)
},
}
}
}
}
#[cfg(test)]
mod tests {
use crate::Error;
}