use futures_util::task::AtomicWaker;
use std::fmt::Debug;
use std::future::Future;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::Poll,
};
use crate::error::type_name;
use log::debug;
use pin_project::pin_project;
#[allow(unreachable_code)]
pub(crate) fn spawn_task<O>(name: String, fut: impl Future<Output = O> + Send + 'static) -> Lifeline
where
O: Debug + Send + 'static,
{
let inner = Arc::new(LifelineInner::new());
let service = LifelineFuture::new(name, fut, inner.clone());
#[cfg(feature = "tokio-executor")]
{
spawn_task_tokio(service);
return Lifeline::new(inner);
}
#[cfg(feature = "async-std-executor")]
{
spawn_task_async_std(service);
return Lifeline::new(inner);
}
}
pub(crate) fn task_name<S>(name: &str) -> String {
type_name::<S>().to_string() + "/" + name
}
#[cfg(feature = "tokio-executor")]
fn spawn_task_tokio<F, O>(task: F)
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
tokio::spawn(task);
}
#[cfg(feature = "async-std-executor")]
fn spawn_task_async_std<F, O>(task: F)
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
async_std::task::spawn(task);
}
#[pin_project]
struct LifelineFuture<F: Future> {
#[pin]
future: F,
name: String,
inner: Arc<LifelineInner>,
}
impl<F: Future + Send> LifelineFuture<F> {
pub fn new(name: String, future: F, inner: Arc<LifelineInner>) -> Self {
debug!("START {}", &name);
Self {
name,
future,
inner,
}
}
}
impl<F: Future> Future for LifelineFuture<F>
where
F::Output: Debug,
{
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if self.inner.complete.load(Ordering::Relaxed) {
debug!("CANCEL {}", self.name);
return Poll::Ready(());
}
if let Poll::Ready(result) = self.as_mut().project().future.poll(cx) {
debug!("END {} {:?}", self.name, result);
self.inner.complete();
return Poll::Ready(());
}
self.inner.task_waker.register(cx.waker());
if self.inner.complete.load(Ordering::Relaxed) {
debug!("CANCEL {}", self.name);
return Poll::Ready(());
}
Poll::Pending
}
}
#[derive(Debug)]
#[must_use = "if unused the service will immediately be cancelled"]
pub struct Lifeline {
inner: Arc<LifelineInner>,
}
impl Lifeline {
pub(crate) fn new(inner: Arc<LifelineInner>) -> Self {
Self { inner }
}
}
impl Future for Lifeline {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.inner.complete.load(Ordering::Relaxed) {
return Poll::Ready(());
}
self.inner.lifeline_waker.register(cx.waker());
if self.inner.complete.load(Ordering::Relaxed) {
return Poll::Ready(());
}
Poll::Pending
}
}
impl Drop for Lifeline {
fn drop(&mut self) {
self.inner.abort();
}
}
#[derive(Debug)]
pub(crate) struct LifelineInner {
task_waker: AtomicWaker,
lifeline_waker: AtomicWaker,
complete: AtomicBool,
}
impl LifelineInner {
pub fn new() -> Self {
LifelineInner {
task_waker: AtomicWaker::new(),
lifeline_waker: AtomicWaker::new(),
complete: AtomicBool::new(false),
}
}
pub fn abort(&self) {
self.complete.store(true, Ordering::Relaxed);
self.task_waker.wake();
}
pub fn complete(&self) {
self.complete.store(true, Ordering::Relaxed);
self.lifeline_waker.wake();
}
}
#[cfg(test)]
mod tests {
use std::{future::Future, task::Poll};
use super::spawn_task;
use crate::{assert_completes, assert_times_out};
struct Pending {}
impl Future for Pending {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
Poll::Pending
}
}
#[tokio::test]
async fn lifeline_running_await_times_out() {
let lifeline = spawn_task("test_complete".to_string(), Pending {});
assert_times_out!(async move {
lifeline.await;
});
}
#[tokio::test]
async fn lifeline_running_completes() {
let lifeline = spawn_task("test_complete".to_string(), async move {});
assert_completes!(async move {
lifeline.await;
});
}
}