use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use futures_util::future::FutureExt;
use futures_util::task::AtomicWaker;
use jsonrpsee_core::Error;
use tokio::time::{self, Duration, Interval};
const STOP_MONITOR_POLLING_INTERVAL: u64 = 1000;
pub(crate) struct FutureDriver<F> {
futures: Vec<F>,
stop_monitor_heartbeat: Interval,
}
impl<F> Default for FutureDriver<F> {
fn default() -> Self {
let mut heartbeat = time::interval(Duration::from_millis(STOP_MONITOR_POLLING_INTERVAL));
heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat }
}
}
impl<F> FutureDriver<F> {
pub(crate) fn count(&self) -> usize {
self.futures.len()
}
pub(crate) fn add(&mut self, future: F) {
self.futures.push(future);
}
}
impl<F> FutureDriver<F>
where
F: Future + Unpin,
{
pub(crate) async fn select_with<S: Future>(&mut self, selector: S) -> S::Output {
tokio::pin!(selector);
DriverSelect { selector, driver: self }.await
}
fn drive(&mut self, cx: &mut Context) {
let mut i = 0;
while i < self.futures.len() {
if self.futures[i].poll_unpin(cx).is_ready() {
self.futures.swap_remove(i);
} else {
i += 1;
}
}
}
fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context) {
let _ = self.stop_monitor_heartbeat.poll_tick(cx);
}
}
impl<F> Future for FutureDriver<F>
where
F: Future + Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
this.drive(cx);
if this.futures.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
struct DriverSelect<'a, S, F> {
selector: S,
driver: &'a mut FutureDriver<F>,
}
impl<'a, R, F> Future for DriverSelect<'a, R, F>
where
R: Future + Unpin,
F: Future + Unpin,
{
type Output = R::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
this.driver.drive(cx);
this.driver.poll_stop_monitor_heartbeat(cx);
this.selector.poll_unpin(cx)
}
}
#[derive(Debug)]
struct MonitorInner {
shutdown_requested: AtomicBool,
waker: AtomicWaker,
}
#[derive(Debug, Clone)]
pub(crate) struct StopMonitor(Arc<MonitorInner>);
impl Drop for StopMonitor {
fn drop(&mut self) {
if Arc::strong_count(&self.0) == 1 {
self.0.waker.wake();
}
}
}
impl StopMonitor {
pub(crate) fn new() -> Self {
StopMonitor(Arc::new(MonitorInner { shutdown_requested: AtomicBool::new(false), waker: AtomicWaker::new() }))
}
pub(crate) fn shutdown_requested(&self) -> bool {
self.0.shutdown_requested.load(Ordering::Relaxed)
}
pub(crate) fn handle(&self) -> ServerHandle {
ServerHandle(Arc::downgrade(&self.0))
}
}
#[derive(Debug, Clone)]
pub struct ServerHandle(Weak<MonitorInner>);
impl ServerHandle {
pub fn stop(self) -> Result<ShutdownWaiter, Error> {
if let Some(arc) = Weak::upgrade(&self.0) {
if !arc.shutdown_requested.swap(true, Ordering::Relaxed) {
return Ok(ShutdownWaiter(self.0));
}
}
Err(Error::AlreadyStopped)
}
}
impl Future for ServerHandle {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shutdown_waiter = ShutdownWaiter(self.0.clone());
shutdown_waiter.poll_unpin(cx)
}
}
#[derive(Debug)]
pub struct ShutdownWaiter(Weak<MonitorInner>);
impl Future for ShutdownWaiter {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Weak::upgrade(&self.0) {
None => return Poll::Ready(()),
Some(arc) => {
arc.waker.register(cx.waker());
drop(arc);
}
}
match Weak::strong_count(&self.0) {
0 => Poll::Ready(()),
_ => Poll::Pending,
}
}
}