#![doc = include_str!("README.md")]
#![forbid(unsafe_code)]
#![cfg_attr(any(nightly, feature="nightly"), feature(doc_auto_cfg))]
#![cfg_attr(not(test), warn(missing_docs))]
#![cfg_attr(feature="std-mpmc", feature(mpmc_channel))]
pub mod executor;
pub use executor::*;
pub mod defer;
pub use defer::*;
pub mod util;
pub use util::*;
pub mod id;
pub use id::*;
#[cfg(test)]
mod tests;
use core::{
future::Future,
ops::{Deref, Range},
pin::Pin,
task::{Poll, Context},
};
use std::{
sync::Arc,
time::{Instant, Duration},
};
#[cfg(feature="flume")]
use flume::{Sender, Receiver};
#[cfg(feature="crossbeam-channel")]
use crossbeam_channel::{Sender, Receiver};
#[cfg(feature="kanal")]
use kanal::{Sender, Receiver};
#[cfg(feature="crossbeam-deque")]
use crate::util::injector::{InjectorChannel, Sender, Receiver};
#[cfg(feature="std-mpmc")]
use std::sync::mpmc::{Sender, Receiver};
use async_task::ScheduleInfo;
use portable_atomic::{
AtomicBool,
AtomicU8,
AtomicU64,
AtomicUsize,
AtomicF64,
Ordering::Relaxed
};
use once_cell::sync::Lazy;
#[derive(Debug)]
pub struct TaskInfoInner {
dropped: bool,
id: TaskId,
nice: i8, run_count: AtomicU64,
run_took: AtomicDuration,
}
impl TaskInfoInner {
#[inline(always)]
fn new(nice: i8) -> Self {
let mut this =
Self {
dropped: true,
id: gen_task_id().expect("Task ID exhausted!"),
nice,
run_count: AtomicU64::new(0),
run_took: AtomicDuration::zero(),
};
if ProfileConfig::global().is_enabled() {
if RunnableProfile::global().alive_count.checked_add(1).is_some() {
this.dropped = false;
}
}
this
}
}
impl Drop for TaskInfoInner {
#[inline(always)]
fn drop(&mut self) {
if self.dropped {
return;
}
self.dropped = true;
RunnableProfile::global().alive_count.checked_sub(1);
}
}
#[derive(Debug, Clone)]
pub struct TaskInfo(Arc<TaskInfoInner>);
impl TaskInfo {
#[inline(always)]
fn new(nice: i8) -> Self {
Self (
Arc::new(TaskInfoInner::new(nice))
)
}
}
impl Deref for TaskInfo {
type Target = TaskInfoInner;
#[inline(always)]
fn deref(&self) -> &TaskInfoInner {
self.0.as_ref()
}
}
pub type Task<T> = async_task::Task<T, TaskInfo>;
pub type Runnable = async_task::Runnable<TaskInfo>;
pub type ExecutorId = u128;
pub type TaskId = u128;
static EXECUTOR_INDEX: Lazy<scc2::HashIndex<ExecutorId, Arc<ExecutorState>, ahash::RandomState>> = Lazy::new(Default::default);
static MONITOR_THREAD_JH: scc2::Atom<std::thread::JoinHandle<()>> = scc2::Atom::init();
pub(crate) struct RunInfo {
runnable: Runnable,
info: ScheduleInfo,
}
pub const RUNINFO_CHANNEL_CAPACITY: usize = 1048576;
static RUNINFO_CHANNEL: Lazy<(Sender<RunInfo>, Receiver<RunInfo>)> =
Lazy::new(|| {
#[cfg(feature="flume")]
return flume::bounded(RUNINFO_CHANNEL_CAPACITY);
#[cfg(feature="crossbeam-channel")]
return crossbeam_channel::bounded(RUNINFO_CHANNEL_CAPACITY);
#[cfg(feature="kanal")]
return kanal::bounded(RUNINFO_CHANNEL_CAPACITY);
#[cfg(feature="crossbeam-deque")]
return InjectorChannel::bounded_split(RUNINFO_CHANNEL_CAPACITY);
#[cfg(feature="std-mpmc")]
return std::sync::mpmc::sync_channel(RUNINFO_CHANNEL_CAPACITY);
});
#[inline(always)]
fn get_runinfo_channel() -> &'static (Sender<RunInfo>, Receiver<RunInfo>) {
&*RUNINFO_CHANNEL
}
#[inline(always)]
fn get_runinfo_tx() -> &'static Sender<RunInfo> {
&(get_runinfo_channel().0)
}
#[inline(always)]
fn get_runinfo_rx() -> &'static Receiver<RunInfo> {
&(get_runinfo_channel().1)
}
#[inline(always)]
pub fn cpu_count() -> usize {
static CPUS: AtomicUsize = AtomicUsize::new(0);
let mut count = CPUS.load(Relaxed);
if count > 0 {
return count;
}
count = std::thread::available_parallelism().map(|nz| { nz.get() }).unwrap_or(1);
if count == 0 {
count = 1;
}
CPUS.store(count, Relaxed);
count
}
pub struct RunnableProfile {
last_update: AtomicInstant,
alive_count: AtomicU64,
run_count: AtomicU64,
run_frequency: AtomicF64,
queue_count: AtomicU64,
queue_frequency: AtomicF64,
}
impl RunnableProfile {
pub const UPDATE_INTERVAL: Duration = Duration::new(5, 0);
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: RunnableProfile =
RunnableProfile {
last_update: AtomicInstant::init(),
alive_count: AtomicU64::new(0),
run_count: AtomicU64::new(0),
run_frequency: AtomicF64::new(0.0),
queue_count: AtomicU64::new(0),
queue_frequency: AtomicF64::new(0.0),
};
&GLOBAL
}
#[inline(always)]
pub fn update(&self) -> bool {
let elapsed_secs =
if let Some(started) = Profile::global().started() {
started.elapsed().as_secs_f64()
} else {
return false;
};
if self.last_update.get().elapsed() < Self::UPDATE_INTERVAL {
return false;
}
self.last_update.set(Instant::now());
if elapsed_secs == 0.0 {
self.run_frequency.store(0.0, Relaxed);
self.queue_frequency.store(0.0, Relaxed);
} else {
let runs = self.run_count() as f64;
self.run_frequency.store(runs / elapsed_secs, Relaxed);
let queues = self.queue_count() as f64;
self.queue_frequency.store(queues / elapsed_secs, Relaxed);
}
true
}
#[inline(always)]
pub fn alive_count(&self) -> u64 {
self.alive_count.load(Relaxed)
}
#[inline(always)]
pub fn run_count(&self) -> u64 {
self.run_count.load(Relaxed)
}
#[inline(always)]
pub fn run_frequency(&self) -> f64 {
self.run_frequency.load(Relaxed)
}
#[inline(always)]
pub fn queue_count(&self) -> u64 {
self.queue_count.load(Relaxed)
}
#[inline(always)]
pub fn queue_frequency(&self) -> f64 {
self.queue_frequency.load(Relaxed)
}
}
pub struct FutureProfile {
alive_count: AtomicU64,
poll_count: AtomicU64,
pending_count: AtomicU64,
ready_count: AtomicU64,
}
impl FutureProfile {
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: FutureProfile =
FutureProfile {
alive_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
pending_count: AtomicU64::new(0),
ready_count: AtomicU64::new(0),
};
&GLOBAL
}
#[inline(always)]
pub fn alive_count(&self) -> u64 {
self.alive_count.load(Relaxed)
}
#[inline(always)]
pub fn poll_count(&self) -> u64 {
self.poll_count.load(Relaxed)
}
#[inline(always)]
pub fn pending_count(&self) -> u64 {
self.pending_count.load(Relaxed)
}
#[inline(always)]
pub fn ready_count(&self) -> u64 {
self.ready_count.load(Relaxed)
}
}
pub struct Profile {
started: AtomicInstant,
pub runnable: &'static RunnableProfile,
pub future: &'static FutureProfile,
}
impl Profile {
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: Profile =
Profile {
started: AtomicInstant::init_now(),
runnable: RunnableProfile::global(),
future: FutureProfile::global(),
};
&GLOBAL
}
#[inline(always)]
pub fn start(&self) -> bool {
ProfileConfig::global().enable()
}
#[inline(always)]
pub fn stop(&self) {
ProfileConfig::global().disable();
}
#[inline(always)]
pub fn started(&self) -> Option<Instant> {
if ProfileConfig::global().is_enabled() {
self.started.peek()
} else {
None
}
}
}
#[derive(Debug)]
pub struct ProfileConfig {
pub enabled: AtomicBool,
pub remote: AtomicSocketAddr,
}
impl ProfileConfig {
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: ProfileConfig =
ProfileConfig {
enabled: AtomicBool::new(false),
remote: AtomicSocketAddr::default(),
};
&GLOBAL
}
#[inline(always)]
pub fn enable(&self) -> bool {
if self.is_enabled() {
return false;
}
Profile::global().started.set(Instant::now());
self.enabled.store(true, Relaxed);
true
}
#[inline(always)]
pub fn disable(&self) {
self.enabled.store(false, Relaxed);
}
#[inline(always)]
pub fn is_enabled(&self) -> bool {
self.enabled.load(Relaxed)
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ExecutorSpawnPolicy {
OnDemand = Self::ON_DEMAND,
Proactive = Self::PROACTIVE,
Fixed = Self::FIXED,
}
impl ExecutorSpawnPolicy {
pub const ON_DEMAND: u8 = b'd';
pub const PROACTIVE: u8 = b'p';
pub const FIXED: u8 = b'f';
#[inline(always)]
pub const fn new(val: u8) -> Option<Self> {
match val {
Self::ON_DEMAND => Some(Self::OnDemand),
Self::PROACTIVE => Some(Self::Proactive),
Self::FIXED => Some(Self::Fixed),
_ => None
}
}
#[inline(always)]
pub const fn value(self) -> u8 {
match self {
Self::OnDemand => Self::ON_DEMAND,
Self::Proactive => Self::PROACTIVE,
Self::Fixed => Self::FIXED,
}
}
#[inline(always)]
pub const fn is_ondemand(self) -> bool {
match self {
Self::OnDemand => true,
_ => false
}
}
#[inline(always)]
pub const fn is_proactive(self) -> bool {
match self {
Self::Proactive => true,
_ => false
}
}
#[inline(always)]
pub const fn is_fixed(self) -> bool {
match self {
Self::Fixed => true,
_ => false
}
}
#[inline(always)]
pub const fn to_atomic(self) -> AtomicU8 {
AtomicU8::new(self.value())
}
#[inline(always)]
pub fn from_atomic(atom: &AtomicU8) -> Option<Self> {
Self::new(atom.load(Relaxed))
}
#[inline(always)]
fn spawn_temporary_executor(self, status: &ExecutorStatus) -> bool {
if self.is_fixed() {
return false;
}
let range = ExecutorConfig::global().temporary_threads_range.range();
let len = status.temporary.len();
if len < range.start {
log::info!("spawn new exitable executor due to temporary threads <= minimum number.");
spawn_executor(true);
return true;
}
if len >= range.end {
let mut ids = status.temporary.clone();
let mut id;
let g = scc2::ebr::Guard::new();
while ids.len() >= range.end {
id = match ids.pop() {
Some(v) => v,
_ => {
break;
}
};
if let Some(state) = EXECUTOR_INDEX.peek(&id, &g) {
assert!(state.exitable);
log::info!("request {} to exit (it exceeds the limit of temporary executors)", id.count());
state.exit().unwrap();
}
}
return false;
}
let overload_threshold = ExecutorConfig::global().overload_threshold.load(Relaxed);
let mut all_overload = true;
for (id, load) in status.work_load.iter() {
if (*load) < overload_threshold {
all_overload = false;
} else {
log::warn!("executor {} is overloading: {load}", id.count());
}
}
let backlog = get_runinfo_tx().len();
if
status.idle.len() <= 1
||
all_overload
||
backlog > (status.running.len() * 10)
||
backlog > 1000
{
log::info!("monitored high overload in all executors: spawn new exitable executor");
spawn_executor(true);
return true;
}
false
}
}
#[derive(Debug)]
pub struct ExecutorConfig {
interval: AtomicDuration,
pub total_threads_range: AtomicRangeStrict<AtomicUsize>,
pub temporary_threads_range: AtomicRangeStrict<AtomicUsize>,
spawn_policy: AtomicU8,
overload_threshold: AtomicF64,
standby_threshold: AtomicF64,
}
impl ExecutorConfig {
pub const MIN_INTERVAL: Duration = Duration::from_millis(100);
pub const MAX_INTERVAL: Duration = Duration::new(5, 0);
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: ExecutorConfig =
ExecutorConfig {
interval: AtomicDuration::new_with_trace(1, 0),
total_threads_range: AtomicRangeStrict::<AtomicUsize>::new(1, usize::MAX),
temporary_threads_range: AtomicRangeStrict::<AtomicUsize>::new(0, usize::MAX),
spawn_policy: ExecutorSpawnPolicy::OnDemand.to_atomic(),
overload_threshold: AtomicF64::new(0.85),
standby_threshold: AtomicF64::new(0.5),
};
&GLOBAL
}
pub fn interval(&self) -> Duration {
self.interval.get()
}
pub fn set_interval(&self, val: Duration) -> bool {
if val < Self::MIN_INTERVAL || val > Self::MAX_INTERVAL {
return false;
}
self.interval.set(val);
true
}
#[inline(always)]
pub fn spawn_policy(&self) -> ExecutorSpawnPolicy {
ExecutorSpawnPolicy::from_atomic(&self.spawn_policy).expect("unexpected unknown variant of ExecutorSpawnPolicy")
}
#[inline(always)]
pub fn set_spawn_policy(&self, policy: ExecutorSpawnPolicy) {
self.spawn_policy.store(policy.value(), Relaxed);
}
#[inline(always)]
pub fn overload_threshold(&self) -> f64 {
let val = self.overload_threshold.load(Relaxed);
assert!(val >= 0.0 && val <= 1.0);
val
}
#[inline(always)]
pub fn set_overload_threshold(&self, val: f64) -> bool {
if val.is_nan() || val.is_infinite() || val.is_subnormal() {
return false;
}
if val < 0.0 || val > 1.0 {
return false;
}
self.overload_threshold.store(val, Relaxed);
true
}
#[inline(always)]
pub fn standby_threshold(&self) -> f64 {
let val = self.standby_threshold.load(Relaxed);
assert!(val >= 0.0 && val <= 1.0);
val
}
#[inline(always)]
pub fn set_standby_threshold(&self, val: f64) -> bool {
if val.is_nan() || val.is_infinite() || val.is_subnormal() {
return false;
}
if val < 0.0 || val > 1.0 {
return false;
}
self.overload_threshold.store(val, Relaxed);
true
}
}
#[derive(Debug)]
pub struct MonitorConfig {
interval: AtomicDuration,
}
impl MonitorConfig {
pub const INTERVAL_MIN: Duration = Duration::new(1, 0);
pub const INTERVAL_MAX: Duration = Duration::new(10, 0);
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: MonitorConfig =
MonitorConfig {
interval: AtomicDuration::new_with_trace(3, 0),
};
&GLOBAL
}
#[inline(always)]
pub fn interval(&self) -> Duration {
self.interval.get()
}
#[inline(always)]
pub fn set_interval(&self, mut interval: Duration) -> Duration {
interval = interval.clamp(Self::INTERVAL_MIN, Self::INTERVAL_MAX);
self.interval.set(interval);
interval
}
}
#[derive(Debug)]
pub struct Config {
pub profile: &'static ProfileConfig,
pub executor: &'static ExecutorConfig,
pub monitor: &'static MonitorConfig,
}
impl Config {
#[inline(always)]
pub const fn global() -> &'static Self {
static GLOBAL: Config =
Config {
profile: ProfileConfig::global(),
executor: ExecutorConfig::global(),
monitor: MonitorConfig::global(),
};
&GLOBAL
}
#[inline(always)]
pub fn set_threads(&self, range: Range<usize>) -> bool {
self.executor.total_threads_range.set_range(range)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ExecutorStatus {
last_update: Option<Instant>,
pub total: Vec<ExecutorId>,
pub working: Vec<ExecutorId>,
pub idle: Vec<ExecutorId>,
pub running: Vec<ExecutorId>,
pub remove: Vec<ExecutorId>,
pub temporary: Vec<ExecutorId>,
pub persist: Vec<ExecutorId>,
pub work_load: Vec<(ExecutorId, f64)>,
}
impl ExecutorStatus {
#[inline(always)]
pub const fn new() -> Self {
Self {
last_update: None,
total: Vec::new(),
working: Vec::new(),
idle: Vec::new(),
running: Vec::new(),
remove: Vec::new(),
temporary: Vec::new(),
persist: Vec::new(),
work_load: Vec::new(),
}
}
#[inline(always)]
pub const fn last_update(&self) -> Option<Instant> {
self.last_update
}
#[inline(always)]
pub fn current() -> Self {
let mut this = Self::new();
this.update();
this
}
#[inline(always)]
pub fn update(&mut self) -> &mut Self {
self.total.clear();
self.working.clear();
self.idle.clear();
self.running.clear();
self.remove.clear();
self.temporary.clear();
self.persist.clear();
self.work_load.clear();
let g = scc2::ebr::Guard::new();
for (id, state) in EXECUTOR_INDEX.iter(&g) {
if self.total.contains(id) {
continue;
}
self.total.push(*id);
self.work_load.push((*id, state.working_ratio()));
if state.exitable {
self.temporary.push(*id);
} else {
self.persist.push(*id);
}
if let Some(jh) =
state.join_handle.get()
{
if jh.is_finished() {
self.remove.push(*id);
} else {
self.running.push(*id);
if state.is_working() {
self.working.push(*id);
} else {
self.idle.push(*id);
}
}
}
}
drop(g);
for id in self.remove.iter() {
EXECUTOR_INDEX.remove(id);
}
self.last_update = Some(Instant::now());
self
}
}
#[inline(always)]
pub fn spawn_executor(exitable: bool) {
let id = gen_executor_id().expect("Executor ID exhausted!");
let mut state = ExecutorState::new(id);
state.exitable = exitable;
let state = Arc::new(state);
let jh = {
let state = state.clone();
let rx = get_runinfo_rx().clone();
let flag: &'static str =
if exitable {
"t" } else {
"p" };
let parent = std::thread::current();
let idc = id.count();
std::thread::Builder::new()
.name(format!("ac-exec-{idc}-{flag}"))
.stack_size(1048576 * 20)
.spawn(move || {
parent.unpark();
let executor = Executor::new(state, rx);
executor.run().expect("executor error");
}).expect("unable to spawn thread for new executor!")
};
state.join_handle.set(Arc::new(jh)).expect("unable to set join handle for ExecutorState");
EXECUTOR_INDEX.insert(id, state).expect("Executor ID should be unique but duplicated!");
std::thread::park_timeout(Duration::from_secs(3));
}
#[inline(always)]
#[deprecated(since="0.0.3", note="please use `Config::global().monitor.interval()` instead.")]
pub fn get_monitor_interval() -> Duration {
MonitorConfig::global().interval()
}
#[inline(always)]
#[deprecated(since="0.0.3", note="please use `Config::global().monitor.set_interval()` instead.")]
pub fn set_monitor_interval(interval: Duration) -> Duration {
MonitorConfig::global().set_interval(interval)
}
#[inline(always)]
fn monitor_loop() {
std::thread::park_timeout(Duration::from_secs(15));
let current_thread_id = std::thread::current().id();
let mut status = ExecutorStatus::new();
let cpus: usize = cpu_count();
let config = Config::global();
let mut interval = config.monitor.interval();
loop {
if config.monitor.interval.changed() {
interval = config.monitor.interval();
}
if let Some(jh) = MONITOR_THREAD_JH.get() {
if jh.is_finished() {
start_monitor().unwrap();
return;
}
if jh.thread().id() != current_thread_id {
start_monitor().unwrap();
return;
}
} else {
start_monitor().unwrap();
return;
}
while status.update().running.len() < cpus {
log::info!("monitor spawn new persist executor");
spawn_executor(false);
}
log::trace!("monitor status = {:?}", &status);
config.executor.spawn_policy().spawn_temporary_executor(&status);
if ProfileConfig::global().is_enabled() {
RunnableProfile::global().update();
}
std::thread::park_timeout(interval);
}
}
#[inline(always)]
pub fn is_monitor_running() -> bool {
if let Some(jh) = MONITOR_THREAD_JH.get() {
! jh.is_finished()
} else {
false
}
}
#[inline(always)]
pub fn wake_monitor() -> bool {
if let Some(jh) = MONITOR_THREAD_JH.get() {
if ! jh.is_finished() {
jh.thread().unpark();
return true;
}
}
false
}
#[inline(always)]
pub fn start_monitor() -> std::io::Result<()> {
static STARTING: AtomicBool = AtomicBool::new(false);
let mut defer = Defer::new(|| {
let _ = STARTING.compare_exchange(true, false, Relaxed, Relaxed);
});
if STARTING.compare_exchange(false, true, Relaxed, Relaxed).is_err() {
defer.cancel();
return Err(std::io::Error::new(std::io::ErrorKind::ResourceBusy, "monitor is starting by another caller"));
}
if is_monitor_running() {
defer.run();
return Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, "another monitor running."));
}
let jh =
std::thread::Builder::new()
.name(String::from("ac-monitor"))
.spawn(monitor_loop)?;
let jh = scc2::ebr::Shared::new(jh);
MONITOR_THREAD_JH.set_shared(jh.clone());
jh.thread().unpark();
defer.run();
Ok(())
}
#[inline(always)]
pub fn scheduler(
runnable: Runnable,
info: ScheduleInfo
) {
#[cfg(test)]
log::trace!("scheduler called");
let tx = get_runinfo_tx();
#[cfg(feature="crossbeam-deque")]
tx.send(RunInfo { runnable, info });
#[cfg(not(feature="crossbeam-deque"))]
tx.send(RunInfo { runnable, info })
.expect("unable to send Runnable to executors: channel closed");
#[cfg(test)]
log::trace!("scheduler sent. tx.len()={}", tx.len());
if tx.len() > 100 || ExecutorConfig::global().spawn_policy().is_proactive() {
wake_monitor();
}
if ProfileConfig::global().is_enabled() {
let p = Profile::global();
p.started();
p.runnable.queue_count.checked_add(1);
}
}
const SCHEDULER: async_task::WithInfo<fn(Runnable, ScheduleInfo)> = async_task::WithInfo(scheduler);
#[inline(always)]
pub fn spawn<F>(f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let _ = start_monitor();
struct WrappedFuture<T, F: Future<Output=T>> {
dropped: bool,
id: TaskId,
fut: Pin<Box<F>>,
}
impl<T, F: Future<Output=T>> WrappedFuture<T, F> {
#[inline(always)]
fn new(id: TaskId, f: F) -> Self {
let mut this =
Self {
dropped: true,
id,
fut: Box::pin(f),
};
if ProfileConfig::global().is_enabled() {
let fp = FutureProfile::global();
if fp.alive_count.checked_add(1).is_some() {
this.dropped = false;
}
}
this
}
}
impl<T, F: Future<Output=T>> Future for WrappedFuture<T, F> {
type Output = T;
#[inline(always)]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<T> {
let res = self.fut.as_mut().poll(ctx);
if ProfileConfig::global().is_enabled() {
let fp = FutureProfile::global();
fp.poll_count.checked_add(1);
if res.is_pending() {
fp.pending_count.checked_add(1);
} else if res.is_ready() {
fp.ready_count.checked_add(1);
}
}
res
}
}
impl<T, F: Future<Output=T>> Drop for WrappedFuture<T, F> {
#[inline(always)]
fn drop(&mut self) {
if self.dropped {
return;
}
self.dropped = true;
FutureProfile::global().alive_count.checked_sub(1);
}
}
let (runnable, task) = {
let b =
async_task::Builder::new()
.propagate_panic(true)
.metadata(TaskInfo::new(0));
if ProfileConfig::global().is_enabled() {
b.spawn(move |taskinfo| { WrappedFuture::new(taskinfo.id, f) }, SCHEDULER)
} else {
b.spawn(move |_| { f }, SCHEDULER)
}
};
runnable.schedule();
task
}