#![cfg_attr(not(feature = "full"), allow(dead_code))]
use std::cell::Cell;
thread_local! {
static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
}
#[derive(Debug, Copy, Clone)]
pub(crate) struct Budget(Option<u8>);
impl Budget {
const fn initial() -> Budget {
Budget(Some(128))
}
const fn unconstrained() -> Budget {
Budget(None)
}
}
cfg_rt_multi_thread! {
impl Budget {
fn has_remaining(self) -> bool {
self.0.map(|budget| budget > 0).unwrap_or(true)
}
}
}
#[inline(always)]
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}
#[inline(always)]
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::unconstrained(), f)
}
#[inline(always)]
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
struct ResetGuard<'a> {
cell: &'a Cell<Budget>,
prev: Budget,
}
impl<'a> Drop for ResetGuard<'a> {
fn drop(&mut self) {
self.cell.set(self.prev);
}
}
CURRENT.with(move |cell| {
let prev = cell.get();
cell.set(budget);
let _guard = ResetGuard { cell, prev };
f()
})
}
cfg_rt_multi_thread! {
pub(crate) fn set(budget: Budget) {
CURRENT.with(|cell| cell.set(budget))
}
#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
CURRENT.with(|cell| cell.get().has_remaining())
}
}
cfg_rt! {
pub(crate) fn stop() -> Budget {
CURRENT.with(|cell| {
let prev = cell.get();
cell.set(Budget::unconstrained());
prev
})
}
}
cfg_coop! {
use std::task::{Context, Poll};
#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);
impl RestoreOnPending {
pub(crate) fn made_progress(&self) {
self.0.set(Budget::unconstrained());
}
}
impl Drop for RestoreOnPending {
fn drop(&mut self) {
let budget = self.0.get();
if !budget.is_unconstrained() {
CURRENT.with(|cell| {
cell.set(budget);
});
}
}
}
#[inline]
pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
CURRENT.with(|cell| {
let mut budget = cell.get();
if budget.decrement() {
let restore = RestoreOnPending(Cell::new(cell.get()));
cell.set(budget);
Poll::Ready(restore)
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
})
}
impl Budget {
fn decrement(&mut self) -> bool {
if let Some(num) = &mut self.0 {
if *num > 0 {
*num -= 1;
true
} else {
false
}
} else {
true
}
}
fn is_unconstrained(self) -> bool {
self.0.is_none()
}
}
}
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
fn get() -> Budget {
CURRENT.with(|cell| cell.get())
}
#[test]
fn bugeting() {
use futures::future::poll_fn;
use tokio_test::*;
assert!(get().0.is_none());
let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
assert!(get().0.is_none());
drop(coop);
assert!(get().0.is_none());
budget(|| {
assert_eq!(get().0, Budget::initial().0);
let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
drop(coop);
assert_eq!(get().0, Budget::initial().0);
let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
coop.made_progress();
drop(coop);
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
coop.made_progress();
drop(coop);
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
budget(|| {
assert_eq!(get().0, Budget::initial().0);
let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
coop.made_progress();
drop(coop);
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
});
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
});
assert!(get().0.is_none());
budget(|| {
let n = get().0.unwrap();
for _ in 0..n {
let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
coop.made_progress();
}
let mut task = task::spawn(poll_fn(|cx| {
let coop = ready!(poll_proceed(cx));
coop.made_progress();
Poll::Ready(())
}));
assert_pending!(task.poll());
});
}
}